1, we showed how we could leverage HTMX to add interactivity to our HTML elements. In other words, Javascript without Javascript. To illustrate that, we began building a simple chat that would return a simulated LLM response. In this article, we will extend the capabilities of our chatbot and add several features, among which streaming, which is a significant enhancement in terms of user experience compared to the synchronous chat built previously.
- ✅ Real-time streaming with SSE
- ✅ Session-based architecture for multiple users
- ✅ Async coordination with asyncio.Queue
- ✅ Clean HTMX patterns with dedicated SSE handling
- ✅ A Google Search Agent to answer queries with fresh data
- ✅ Almost Zero JavaScript
Here is what we will build today:

From sync communication to async
What we built previously leveraged very basic web functionalities leveraging forms.Our communication was synchronous, meaning we don’t get anything until the server is done. We issue a request, we wait for the full response, and we display it. Between the two, we just…wait.
But modern chatbots work differently, by providing asynchronous communication capabilities. This is done using streaming: we get updates and partial responses instead of waiting for the full response. This is particularly helpful when the response process takes time, which is typically the case for LLMs when the answer is long.
SSE vs Websockets
SSE (Server Side Events) and Websockets are two real-time data exchanges protocols between a client and a server.
Websockets allows for full-duplex connections: this means the browser and the server can both send and receive data simultaneously. This is typically used in online gaming, chat applications, and collaborative tools (google sheets).
SSE is unidirectional and only allows a one-way conversation, from server to client. This means that the client cannot send anything to the server via this protocol. If websockets is a two-way phone conversation where people can speak and listen at the same time, SSE is like listening to the radio. SSE are typically used to send notification, update charts in finance applications, or newsfeeds.
So why do we choose SSE? Well because in our use case we don’t need full duplex, and that simple HTTP (which is not how Websockets work) is enough for our use case: we send data, we receive data. SSE just means that we will receive data in a stream, nothing more is needed.
What we want to do
- User inputs a query
- Server receives the query and sends it to the LLM
- LLM starts producing content
- For each piece of content, the server returns it immediately
- Browser adds this piece of information to the DOM
We will separate our work into backend and frontend sections.
Backend
The backend will proceed in 2 steps:
- A POST endpoint that will receive the message, and return nothing
- A GET endpoint that will read a queue and produce an output stream.
In our demo, to begin with, we will create a fake LLM response by repeating the user input, meaning that the words of the stream will be exactly the same as the user input.
To keep things clean, we need to separate the message streams (the queues) by user session, otherwise we would end up mixing up conversations. We will therefore create a session dictionary to host our queues.
Next, we need to tell the backend to wait before the queue is filled before streaming our response. If we don’t, we will encounter concurrency run or timing issues: SSE starts on client side, queue is empty, SSE closes, user inputs a message but…it’s too late!
The solution: async queues! Using asynchronous queues has several advantages:
- If queue has data: Returns immediately
- If queue is empty: Suspends execution until queue.put() is called
- Multiple consumers: Each gets their own data
- Thread-safe: No race conditions
I know you are burning to know more, so here is the code below:
from fastapi import FastAPI, Request, Form
from fastapi.templating import Jinja2Templates
from fastapi.responses import HTMLResponse, StreamingResponse
import asyncio
import time
import uuid
app = FastAPI()
templates = Jinja2Templates("templates")
# This object will store session id and their corresponding value, an async queue.
sessions = dict()
@app.get("/")
async def root(request: Request):
session_id = str(uuid.uuid4())
sessions[session_id] = asyncio.Queue()
return templates.TemplateResponse(request, "index.html", context={"session_id": session_id})
@app.post("/chat")
async def chat(request: Request, query: str=Form(...), session_id: str=Form(...)):
""" Send message to session-based queue """
# Create the session if it does not exist
if session_id not in sessions:
sessions[session_id] = asyncio.Queue()
# Put the message in the queue
await sessions[session_id].put(query)
return {"status": "queued", "session_id": session_id}
@app.get("/stream/{session_id}")
async def stream(session_id: str):
async def response_stream():
if session_id not in sessions:
print(f"Session {session_id} not found!")
return
queue = sessions[session_id]
# This BLOCKS until data arrives
print(f"Waiting for message in session {session_id}")
data = await queue.get()
print(f"Got message: {data}")
message = ""
await asyncio.sleep(1)
for token in data.replace("\n", " ").split(" "):
message += token + " "
data = f"""data: AI
{message}
\n\n"""
yield data
await asyncio.sleep(0.03)
queue.task_done()
return StreamingResponse(response_stream(), media_type="text/event-stream")
Let’s explain a couple of key concepts here.
Session isolation
It is important that each users gets their own message queue, so as not to mix up conversations. The way to do that is by using the sessions dictionary. In real production apps, we would probably use Redis to store that. In the code below, we see that a new session id is created on page load, and stored in the sessions dictionary. Reloading the page will start a new session, we are not persisting the message queues but we could via a database for example. This topic is covered in part 3.
# This object will store session id and their corresponding value, an async queue.
sessions = dict()
@app.get("/")
async def root(request: Request):
session_id = str(uuid.uuid4())
sessions[session_id] = asyncio.Queue()
return templates.TemplateResponse(request, "index.html", context={"session_id": session_id})
Blocking coordination
We need to control the order in which SSE are sent and the user query is received. The order is, on the backend side:
- Receive user message
- Create a message queue and populate it
- Send messages from the queue in a Streaming Response
Failure to do so may lead to unwanted behavior, ie. first reading the (empty) message queue, then populating it with the user’s query.
The solution to control the order is to use asyncio.Queue. This object will be used twice:
- When we insert new messages in the queue. Inserting messages will “wake up” the polling in the SSE endpoint
await sessions[session_id].put(query)
- When we pull messages from the queue. In this line, the code is blocked until a signal from the queue arrives saying “hey, i have new data!”:
data = await queue.get()
This pattern offers several advantages:
- Each user has its own queue
- There is no risk of race conditions
Streaming simulation
In this article, we will simulate a LLM response by splitting the user’s query in words and return those words one by one. In part 3, we will actually plug a real LLM to that.
The streaming is handled via the StreamingResponse object from FastAPI. This object expects an asynchronous generator that will yield data until the generator is over. We have to use the yield keyword instead of the return keyword, otherwise our generator would just stop after the first iteration.
Let’s decompose our streaming function:
First, we need to ensure we have a queue for the current session from which we will pull messages:
if session_id not in sessions:
print(f"Session {session_id} not found!")
return
queue = sessions[session_id]
Next, once we have the queue, we will pull messages from the queue if it contains any, otherwise the code pauses and waits for messages to arrive. This is the most important part of our function:
# This BLOCKS until data arrives
print(f"Waiting for message in session {session_id}")
data = await queue.get()
print(f"Got message: {data}")
To simulate stream, we will now chunk the message in words (called tokens here), and add some time sleeps to simulate the text generation process from a LLM (the asyncio.sleep parts). Notice how the data we yield is actually HTML strings, encapsulated in a string starting with “data:”. This is how SSE messages are sent. You can also choose to flag your messages with the “event:” metadata. An example would be:
event: my_custom_event
data: Content to swap into your HTML page.
Let’s see how we implement it in Python (for the purists, use Jinja templates to render the HTML instead of a string:) ):
message = ""
# First pause to let the browser display "Thinking when the message is sent"
await asyncio.sleep(1)
# Simulate streaming by splitting message in words
for token in data.replace("\n", " ").split(" "):
# We append tokens to the message
message += token + " "
# We wrap the message in HTML tags with the "data" metadata
data = f"""data: AI
{message}
\n\n"""
yield data
# Pause to simulate the LLM generation process
await asyncio.sleep(0.03)
queue.task_done()
Frontend
Our frontend has 2 jobs: send user queries to the backend, and listen for SSE message on a specific channel (the session_id). To do that, we apply a concept called “Separation of concepts”, meaning each HTMX element is responsible for a single job only.
- the form sends a user input
- the sse listener handles the streaming
- the ul chat displays the message
To send messages, we will use a standard textarea input in a form. The HTMX magic is just below:
If you remember the article from part 1, we have several HTMX attributes which deserve explanations:
hx-post: The endpoint the form data will be submitted.hx-swap: Set to none, because in our case the endpoint does not return any data.hx-trigger: Specifies which event will trigger the requesthx-on::before-request: A very light part with javascript to add some snappiness to the app. We will append the user’s request to the list in the chat, and display a “Thinking” message to the user while we are waiting for the SSE messages to stream. This is nicer that having to stare at a blank page.
It is worth nothing that we actually send 2 parameters to the backend: the user’s input and the session id. This way, the message will be inserted in the right queue on the backend side.
Then, we define another component that is specifically dedicated to listening to SSE messages.
li:last-child"
style="display: none;"
>
This component will listen to the /stream endpoint and pass its session id to listen for messages for this session only. The hx-target tells the browser to add the data to the last li element of the chat. The hx-swap specifies that the data is actually meant to replace the entire current li element. This is how our streaming effect will work: replacing current message with the latest one.
Note: other methods could have been used to replace specific elements of the DOM, such as out-of-band (OOB) swaps. They work a little bit differently since they require a specific id to look for in the DOM. In our case, we chose on purpose not to assign ids to each written list elements
A Real Chatbot using Google Agent Development Kit
Now is the time to replace our dummy streaming endpoint with a real LLM. To achieve that, we will build an agent using Google ADK, equipped with tools and memory to fetch information and remember conversation details.
A very short introduction to agents
You probably already know what a LLM is, at least I assume you do. The main drawback of LLMs as of today is that LLMs alone cannot access real time information: their knowledge is frozen at the moment they were trained. The other drawback is their inability to access information that is outside their training scope (eg, your company’s internal data),
Agents are a type of AI applications that can reason, act and observe. The reasoning part is handled by the LLM, the “brain”. The “hands” of the agents are what we call “tools”, and can take several forms:
- a Python function, for example to fetch an API
- a MCP server, which is a standard that allows agents to connect to APIs through a standardized interface (eg accessing all the Gsuite tools without having to write yourself the API connectors)
- other agents (in that case, this pattern is called agent delegation were a router or master agents controls different sub-agents)
In our demo, to make things very simple, we will use a very simple agent that can use one tool: Google Search. This will allow us to get fresh information and ensure it is reliable (at least we hope that the Google Search results are…)
In the Google ADK world, agents need basic information:
- name and description, for documentation purposes mostly
- instructions: the prompt that defines the behavior of the agent (tools use, output format, steps to follow, etc)
- tools: the functions / MCP servers / agents the agent can use to fulfill its objective
There are also other concepts around memory and session management, but that are out of scope.
Without further ado, let’s define our agent!
A Streaming Google Search Agent
from google.adk.agents import Agent
from google.adk.agents.run_config import RunConfig, StreamingMode
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
from google.adk.tools import google_search
# Define constants for the agent
APP_NAME = "default" # Application
USER_ID = "default" # User
SESSION = "default" # Session
MODEL_NAME = "gemini-2.5-flash-lite"
# Step 1: Create the LLM Agent
root_agent = Agent(
model=MODEL_NAME,
name="text_chat_bot",
description="A text chatbot",
instruction="You are a helpful assistant. Your goal is to answer questions based on your knowledge. Use your Google Search tool to provide the latest and most accurate information",
tools=[google_search]
)
# Step 2: Set up Session Management
# InMemorySessionService stores conversations in RAM (temporary)
session_service = InMemorySessionService()
# Step 3: Create the Runner
runner = Runner(agent=root_agent, app_name=APP_NAME, session_service=session_service)
The `Runner` object acts as the orchestrator between you and the agent.
Next, we (re)define our `/stream` endpoint. We first check the session for the agent exists, otherwise we create it:
# Attempt to create a new session or retrieve an existing one
try:
session = await session_service.create_session(
app_name=APP_NAME, user_id=USER_ID, session_id=session_id
)
except:
session = await session_service.get_session(
app_name=APP_NAME, user_id=USER_ID, session_id=session_id
)
Then, we take the user query, pass it to the agent in an async fashion to get a stream back:
# Convert the query string to the ADK Content format
query = types.Content(role="user", parts=[types.Part(text=query)])
# Stream the agent's response asynchronously
async for event in runner.run_async(
user_id=USER_ID, session_id=session.id, new_message=query, run_config=RunConfig(streaming_mode=StreamingMode.SSE)
):
There is a subtlety next. When generating a response, the agent might output a double linebreak “\n\n”. This is problematic because SSE events end with this symbol. Having a double linebreak in your string therefore means:
- your current message will be truncated
- your next message will be incorrectly formatted and the SSE stream will stop
You can try it by yourself. To fix this, we will use a little hack, along with another little hack to format list elements (I use Tailwind CSS which overrides certain CSS rules). The hack is:
if event.partial:
message += event.content.parts[0].text
# Hack here
html_content = markdown.markdown(message, extensions=['fenced_code']).replace("\n", "
").replace("", " ").replace("", "")
full_html = f"""data: -
AI
{html_content}
\n\n"""
yield full_html
This way, we ensure that no double linebreaks will break our SSE stream.
Full code for the route is below:
@app.get("/stream/{session_id}")
async def stream(session_id: str):
async def response_stream():
if session_id not in sessions:
print(f"Session {session_id} not found!")
return
# Attempt to create a new session or retrieve an existing one
try:
session = await session_service.create_session(
app_name=APP_NAME, user_id=USER_ID, session_id=session_id
)
except:
session = await session_service.get_session(
app_name=APP_NAME, user_id=USER_ID, session_id=session_id
)
queue = sessions[session_id]
# This BLOCKS until data arrives
print(f"Waiting for message in session {session_id}")
query = await queue.get()
print(f"Got message: {query}")
message = ""
# Convert the query string to the ADK Content format
query = types.Content(role="user", parts=[types.Part(text=query)])
# Stream the agent's response asynchronously
async for event in runner.run_async(
user_id=USER_ID, session_id=session.id, new_message=query, run_config=RunConfig(streaming_mode=StreamingMode.SSE)
):
if event.partial:
message += event.content.parts[0].text
html_content = markdown.markdown(message, extensions=['fenced_code']).replace("\n", "
").replace("", " ").replace("", "")
full_html = f"""data: -
AI
{html_content}
\n\n"""
yield full_html
queue.task_done()
return StreamingResponse(response_stream(), media_type="text/event-stream")
And that’s it! You will be able to converse with your chat!
I add below a little CSS snippet to format code blocks. Indeed, if you ask your chat to produce code snippets, you want it properly formatted. Here is the HTML:
pre, code {
background-color: black;
color: lightgrey;
padding: 1%;
border-radius: 10px;
white-space: pre-wrap;
font-size: 0.8rem;
letter-spacing: -1px;
}
You can now also generate code snippets:

Mind = blown
Workflow recap
With less that 200 LoC, we were able to write a chat with the following worflow, stream a response from the server and display it very nicely by playing with SSE and HTMX.
User types "Hello World" → Submit
├── 1. Add "Me: Hello World" to chat
├── 2. Add "AI: Thinking..." to chat
├── 3. POST /chat with message
├── 4. Server queues message
├── 5. SSE stream produces a LLM response based on the query
├── 6. Stream "AI: This" (replaces "Thinking...")
├── 7. Stream "AI: This is the answer ..."
└── 8. Complete
Conclusion
In this series of articles, we showed how easy it could be to develop a chatbot app with almost no javascript and no heavy JS framework, just by using Python and HTML. We covered topics such as Server-side rendering, Server-sent Events (SSE), async streaming, agents, with the help of a magical library, HTMX.
The main purpose of these articles was to show that web applications is not inaccessible to non-Javascript developers. There is actually a very strong and valid reason not to use Javascript everytime for web development, and although Javascript is a powerful language, my feeling today is that is it sometimes overused in place of simpler, yet robust approaches. The server-side vs client-side applications debate is long-standing and not over yet, but I hope these articles were an eye-opener to some of you, and that it eventually taught you something
Stay tuned!



