π― What Youβll Learn:
- What is SSE (Server-Sent Events)?
- How to build a simple SSE server in Python using FastAPI
.
- How to build an SSE client to receive real-time updates.
- How SSE compares to WebSockets & Polling.
β
SSE (Server-Sent Events) allows a server to push updates to a client over HTTP in real-time.
β
Unlike WebSockets, SSE is one-way (server β client).
β
Uses HTTP Streaming to continuously send data to the client.
πΉ Best for: Real-time notifications, live dashboards, stock prices, etc.
πΉ Not for: Full-duplex (bidirectional) communication (use WebSockets for that).
Before running the code, install the required packages:
sh
pip install fastapi uvicorn aiohttp
sse_server.py
)This server streams messages to clients using SSE.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
async def event_stream():
"""Sends a new event every 2 seconds."""
counter = 0
while True:
counter += 1
yield f"data: Message {counter}\n\n" # SSE format
await asyncio.sleep(2)
@app.get("/sse")
async def sse_endpoint():
"""SSE endpoint that streams real-time events."""
return StreamingResponse(event_stream(), media_type="text/event-stream")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
πΉ How It Works:
- event_stream()
: Generates messages every 2 seconds.
- StreamingResponse()
: Streams the data to clients over HTTP.
- Sends messages in SSE format (data: message \n\n
).
sse_client.py
)This client listens for messages from the server.
import aiohttp
import asyncio
async def sse_client():
url = "http://localhost:8000/sse"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
async for line in response.content:
if line:
print(f"π© Received from Server: {line.decode().strip()}")
asyncio.run(sse_client())
πΉ How It Works:
- aiohttp.ClientSession()
: Creates an HTTP session.
- session.get(url)
: Opens an SSE connection.
- async for line in response.content:
Listens for new messages.
Run the SSE server in one terminal:
sh
python sse_server.py
You should see:
INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
Run the client in another terminal:
sh
python sse_client.py
You will receive:
π© Received from Server: data: Message 1
π© Received from Server: data: Message 2
π© Received from Server: data: Message 3
...
π Now you have a working SSE example!
curl
Instead of a Python client, you can use curl
to listen for SSE events:
sh
curl -N -H "Accept: text/event-stream" http://localhost:8000/sse
Youβll see:
data: Message 1
data: Message 2
data: Message 3
...
Feature | SSE (Server-Sent Events) | WebSockets | Polling |
---|---|---|---|
Connection | One-way (Server β Client) | Two-way (Full Duplex) | Repeated HTTP Requests |
Use Case | Live updates, notifications | Chat, gaming, real-time apps | Basic updates |
Efficiency | β Efficient | β Efficient | β High Latency & Load |
Browser Support | β Native Support | β Requires JS API | β Simple but slow |
πΉ SSE is perfect for real-time server updates where the client only needs to receive data.
πΉ WebSockets are better for bidirectional communication (e.g., chat apps).
πΉ Polling is inefficient and should be avoided when real-time data is needed.
π Now you fully understand Server-Sent Events! π―π₯
Let me know if you need more modifications or explanations! ππ
This version includes:
β
SSE Server (FastAPI)
β
SSE Client (Python aiohttp
)
β
Frontend (Streamlit) to display real-time events
β
Docker Compose to containerize everything
sse_server.py
)This FastAPI server streams messages to connected clients.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
async def event_stream():
"""Sends a new event every 2 seconds."""
counter = 0
while True:
counter += 1
yield f"data: Message {counter}\n\n" # SSE format
await asyncio.sleep(2)
@app.get("/sse")
async def sse_endpoint():
"""SSE endpoint that streams real-time events."""
return StreamingResponse(event_stream(), media_type="text/event-stream")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
sse_client.py
)This listens to the SSE server and prints received messages.
import aiohttp
import asyncio
async def sse_client():
url = "http://sse-server:8000/sse" # Docker container hostname
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
async for line in response.content:
if line:
print(f"π© Received from Server: {line.decode().strip()}")
asyncio.run(sse_client())
frontend.py
)This shows real-time SSE events in a web UI.
import streamlit as st
import requests
import time
st.title("π‘ Real-Time SSE Events")
st.markdown("### Incoming Messages:")
# Container for messages
messages = st.empty()
def get_sse_messages():
url = "http://sse-server:8000/sse"
with requests.get(url, stream=True) as response:
for line in response.iter_lines():
if line:
yield line.decode()
# Display messages in real-time
for msg in get_sse_messages():
messages.text(msg)
time.sleep(0.5) # Smooth UI update
Dockerfile
for the SSE Server# FastAPI SSE Server
FROM python:3.11-slim
WORKDIR /app
COPY sse_server.py /app/
RUN pip install fastapi uvicorn
CMD ["python", "sse_server.py"]
Dockerfile
for the Streamlit Frontend# Streamlit Frontend
FROM python:3.11-slim
WORKDIR /app
COPY frontend.py /app/
RUN pip install streamlit requests
CMD ["streamlit", "run", "frontend.py", "--server.port=8501", "--server.address=0.0.0.0"]
docker-compose.yml
This defines three services:
1οΈβ£ sse-server
(FastAPI SSE server)
2οΈβ£ sse-client
(Python SSE listener)
3οΈβ£ frontend
(Streamlit UI to display messages)
version: '3.8'
services:
sse-server:
build: .
container_name: sse-server
ports:
- "8000:8000"
sse-client:
build: .
container_name: sse-client
depends_on:
- sse-server
entrypoint: ["python"]
command: ["sse_client.py"]
frontend:
build: .
container_name: sse-frontend
depends_on:
- sse-server
ports:
- "8501:8501"
entrypoint: ["python"]
command: ["frontend.py"]
docker-compose up --build
β
This will:
- Start the SSE Server on http://localhost:8000/sse
- Run the SSE Client to listen for messages
- Launch the Streamlit UI at http://localhost:8501
1οΈβ£ Check the logs for the SSE Client
sh
docker logs -f sse-client
π’ Output:
π© Received from Server: data: Message 1
π© Received from Server: data: Message 2
...
2οΈβ£ Visit the Frontend:
π Go to http://localhost:8501
πΉ You will see messages appearing in real-time!
β
SSE Server: Pushes real-time messages.
β
SSE Client: Listens & prints updates.
β
Frontend UI: Displays messages live using Streamlit.
β
Dockerized & scalable!
π Now you have a fully working, containerized SSE example! π―π₯
Let me know if you need refinements! ππ
This upgrade includes:
β
SSE Server (FastAPI) with JSON Input Handling
β
Streamlit Frontend with a Button & Text Input for Sending Messages
β
Dockerized Setup for Easy Deployment
sse_server.py
)POST /messages
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import json
app = FastAPI()
message_queue = asyncio.Queue() # Stores messages from clients
async def event_stream():
"""Sends events, including user acknowledgments."""
while True:
message = await message_queue.get() # Wait for a new message
yield f"data: Acknowledgment - Received: {message}\n\n" # SSE format
@app.get("/sse")
async def sse_endpoint():
"""SSE endpoint for real-time streaming."""
return StreamingResponse(event_stream(), media_type="text/event-stream")
@app.post("/messages")
async def post_message(request: Request):
"""Receives JSON messages and adds them to the queue."""
data = await request.json()
message = data.get("message", "No message provided")
await message_queue.put(json.dumps({"status": "Received", "message": message}))
return {"status": "Message received", "message": message}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
β
New Feature: Messages sent via POST /messages
are acknowledged in the SSE stream.
frontend.py
)import streamlit as st
import requests
import threading
st.title("π‘ Real-Time SSE with Acknowledgments")
st.markdown("### Incoming Messages:")
messages_container = st.empty()
# Function to listen for SSE events
def get_sse_messages():
url = "http://sse-server:8000/sse"
with requests.get(url, stream=True) as response:
for line in response.iter_lines():
if line:
messages_container.text(line.decode())
# Start SSE listener in a separate thread (to keep UI responsive)
thread = threading.Thread(target=get_sse_messages, daemon=True)
thread.start()
st.markdown("### Send a Message to the Server:")
user_input = st.text_input("Enter your message:")
if st.button("Send"):
response = requests.post("http://sse-server:8000/messages", json={"message": user_input})
st.success(f"Server Response: {response.json()}")
β New Feature: Sends messages via POST, and SSE stream confirms receipt.
docker-compose.yml
Adds three services:
1οΈβ£ sse-server
β FastAPI SSE backend
2οΈβ£ sse-client
β Background Python SSE listener
3οΈβ£ frontend
β Streamlit UI for sending messages & receiving responses
version: '3.8'
services:
sse-server:
build: .
container_name: sse-server
ports:
- "8000:8000"
sse-client:
build: .
container_name: sse-client
depends_on:
- sse-server
entrypoint: ["python"]
command: ["sse_client.py"]
frontend:
build: .
container_name: sse-frontend
depends_on:
- sse-server
ports:
- "8501:8501"
entrypoint: ["python"]
command: ["frontend.py"]
docker-compose up --build
β
This starts:
- SSE Server (http://localhost:8000/sse
)
- Frontend UI (http://localhost:8501
)
1οΈβ£ Go to http://localhost:8501
2οΈβ£ Enter a message & click Send
3οΈβ£ Check the "Incoming Messages" section β Youβll see:
data: Acknowledgment - Received: {"status": "Received", "message": "Hello Server"}
β
Full interactive SSE system
β
Messages sent via Streamlit UI get acknowledged live
β
Dockerized for easy deployment
π Now you have a fully working two-way SSE system! π―π₯
Let me know if you need further modifications! ππ
β REST (Representational State Transfer) and SSE (Server-Sent Events) are both HTTP-based communication methods, but they serve different purposes.
Feature | REST (HTTP APIs) | SSE (Server-Sent Events) |
---|---|---|
Type of Communication | Request-Response | One-Way Streaming (Server β Client) |
Initiated By | Client | Server |
Use Case | CRUD operations (Create, Read, Update, Delete) | Real-time notifications, live updates |
Connection Type | Short-lived | Persistent (long-lived) |
Efficiency | Higher latency due to repeated requests | Low latency with efficient updates |
State Management | Stateless | Stateful connection |
Example | Fetch user details, submit forms | Live stock updates, real-time chat notifications |
1οΈβ£ Client sends a request (e.g., GET /users/123
)
2οΈβ£ Server processes the request
3οΈβ£ Server sends a response (e.g., user details in JSON format)
4οΈβ£ Connection closes after response
πΉ REST Example:
http
GET /users/123 HTTP/1.1
Host: api.example.com
πΉ Response:
json
{
"id": 123,
"name": "John Doe",
"email": "john@example.com"
}
β
Best for: Fetching and modifying data on demand.
β Not good for: Real-time updates, as the client must continuously poll the server.
1οΈβ£ Client makes an initial connection (GET /sse
).
2οΈβ£ Server keeps the connection open and pushes updates as they happen.
3οΈβ£ Client passively listens and receives messages without making new requests.
πΉ SSE Example:
http
GET /sse HTTP/1.1
Host: api.example.com
Accept: text/event-stream
πΉ Server Response (Continuous Messages):
```
data: {"message": "Stock price updated: $120"}
data: {"message": "Breaking news: New product launched!"} ```
β
Best for: Live feeds, notifications, financial updates.
β Not good for: Two-way communication (use WebSockets instead).
Scenario | Use REST? | Use SSE? |
---|---|---|
Fetching data once | β Yes | β No |
Submitting a form | β Yes | β No |
Live notifications | β No | β Yes |
Real-time stock prices | β No | β Yes |
IoT sensor updates | β No | β Yes |
Chat application | β No | β No (Use WebSockets) |
Feature | REST | SSE |
---|---|---|
Best for | Standard API interactions | Real-time updates |
Efficiency | Less efficient for real-time | Optimized for live data |
Scalability | Easy to scale | Can require extra load balancing |
Alternative | WebSockets for bidirectional communication | WebSockets for full-duplex communication |
π Now you fully understand when to use REST and when to use SSE! π―π₯
Let me know if you need more details! ππ
This update includes:
β
A RESTful API endpoint (/send_rest
) to send messages
β
A "Send as REST" button in the frontend
β
Backend acknowledges REST messages and pushes them to SSE clients
sse_server.py
)POST /send_rest
)message_queue
so SSE clients get notifiedfrom fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import json
app = FastAPI()
message_queue = asyncio.Queue() # Stores messages from clients
async def event_stream():
"""Streams messages, including REST and UI acknowledgments."""
while True:
message = await message_queue.get() # Wait for a new message
yield f"data: {message}\n\n" # SSE format
@app.get("/sse")
async def sse_endpoint():
"""SSE endpoint for real-time streaming."""
return StreamingResponse(event_stream(), media_type="text/event-stream")
@app.post("/messages")
async def post_message(request: Request):
"""Receives messages from the UI and adds them to the SSE queue."""
data = await request.json()
message = data.get("message", "No message provided")
response = {"status": "Received", "message": message}
await message_queue.put(json.dumps(response))
return response
@app.post("/send_rest")
async def send_rest_message(request: Request):
"""Receives messages from REST API and adds them to SSE queue."""
data = await request.json()
message = data.get("message", "No REST message provided")
response = {"status": "Received via REST", "message": message}
await message_queue.put(json.dumps(response)) # Send to SSE
return response
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
β
New Feature: Messages sent via POST /send_rest
are acknowledged in the SSE stream.
frontend.py
)"Send as SSE"
β Uses /messages
"Send as REST"
β Uses /send_rest
import streamlit as st
import requests
import threading
st.title("π‘ Real-Time SSE with REST Integration")
st.markdown("### Incoming Messages:")
messages_container = st.empty()
# Function to listen for SSE events
def get_sse_messages():
url = "http://sse-server:8000/sse"
with requests.get(url, stream=True) as response:
for line in response.iter_lines():
if line:
messages_container.text(line.decode())
# Start SSE listener in a separate thread
thread = threading.Thread(target=get_sse_messages, daemon=True)
thread.start()
st.markdown("### Send a Message to the Server:")
user_input = st.text_input("Enter your message:")
col1, col2 = st.columns(2)
# Button to send message via SSE
if col1.button("Send as SSE"):
response = requests.post("http://sse-server:8000/messages", json={"message": user_input})
st.success(f"SSE Response: {response.json()}")
# Button to send message via REST API
if col2.button("Send as REST"):
response = requests.post("http://sse-server:8000/send_rest", json={"message": user_input})
st.success(f"REST Response: {response.json()}")
β
New Feature: "Send as REST"
button sends a message via REST API (POST /send_rest
) and receives SSE acknowledgment.
docker-compose.yml
version: '3.8'
services:
sse-server:
build: .
container_name: sse-server
ports:
- "8000:8000"
sse-client:
build: .
container_name: sse-client
depends_on:
- sse-server
entrypoint: ["python"]
command: ["sse_client.py"]
frontend:
build: .
container_name: sse-frontend
depends_on:
- sse-server
ports:
- "8501:8501"
entrypoint: ["python"]
command: ["frontend.py"]
docker-compose up --build
β
This starts:
- SSE Server (http://localhost:8000/sse
)
- Streamlit Frontend (http://localhost:8501
)
1οΈβ£ Go to http://localhost:8501
2οΈβ£ Enter a message & click "Send as SSE"
3οΈβ£ Enter another message & click "Send as REST"
4οΈβ£ Check the Incoming Messages section β Youβll see:
data: {"status": "Received", "message": "Hello from SSE"}
data: {"status": "Received via REST", "message": "Hello from REST"}
β
SSE Server now supports both SSE & REST API!
β
Streamlit UI allows sending messages via both methods!
β
REST API messages are acknowledged via SSE stream!
β
Fully containerized & scalable!
π Now you have a complete real-time SSE system with REST support! π―π₯
Let me know if you need any refinements! ππ
β
Yes, a standard REST API should respond immediately to the request, instead of relying on the SSE queue.
β
However, if we want REST messages to also notify SSE clients, we need to both return a direct response and push the message into the SSE queue.
sse_server.py
)β
REST /send_rest
now returns an immediate response.
β
It still pushes the message into the SSE queue for real-time updates.
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import json
app = FastAPI()
message_queue = asyncio.Queue() # Stores messages for SSE clients
async def event_stream():
"""Streams messages in real-time to SSE clients."""
while True:
message = await message_queue.get()
yield f"data: {message}\n\n" # SSE format
@app.get("/sse")
async def sse_endpoint():
"""SSE endpoint for real-time streaming."""
return StreamingResponse(event_stream(), media_type="text/event-stream")
@app.post("/messages")
async def post_message(request: Request):
"""Receives messages from UI and adds them to the SSE queue."""
data = await request.json()
message = data.get("message", "No message provided")
response = {"status": "Received", "message": message}
# β
SSE clients get notified
await message_queue.put(json.dumps(response))
return response # β
REST immediately returns a response
@app.post("/send_rest")
async def send_rest_message(request: Request):
"""Receives REST messages, responds immediately, and notifies SSE clients."""
data = await request.json()
message = data.get("message", "No REST message provided")
response = {"status": "Received via REST", "message": message}
# β
Immediate REST response
immediate_response = response.copy()
# β
Push to SSE queue so real-time clients get notified
await message_queue.put(json.dumps(response))
return immediate_response # β
REST returns response instantly
Feature | Before (β Bad) | Now (β Fixed) |
---|---|---|
REST /send_rest response |
Delayed until SSE processed | Returns immediately |
SSE Notifications for REST messages | β Not always sent | β Now also sent to SSE clients |
β Now, REST clients get an instant response, while SSE clients still receive real-time updates!
frontend.py
)import streamlit as st
import requests
import threading
st.title("π‘ Real-Time SSE with REST Support")
st.markdown("### Incoming Messages:")
messages_container = st.empty()
# Function to listen for SSE events
def get_sse_messages():
url = "http://sse-server:8000/sse"
with requests.get(url, stream=True) as response:
for line in response.iter_lines():
if line:
messages_container.text(line.decode())
# Start SSE listener in a separate thread
thread = threading.Thread(target=get_sse_messages, daemon=True)
thread.start()
st.markdown("### Send a Message to the Server:")
user_input = st.text_input("Enter your message:")
col1, col2 = st.columns(2)
if col1.button("Send as SSE"):
response = requests.post("http://sse-server:8000/messages", json={"message": user_input})
st.success(f"SSE Response: {response.json()}")
if col2.button("Send as REST"):
response = requests.post("http://sse-server:8000/send_rest", json={"message": user_input})
st.success(f"REST Response: {response.json()}") # β
Shows immediate REST response
curl -X POST -H "Content-Type: application/json" -d '{"message": "REST says hello"}' http://localhost:8000/send_rest
Expected Immediate Response:
json
{
"status": "Received via REST",
"message": "REST says hello"
}
curl -N -H "Accept: text/event-stream" http://localhost:8000/sse
Expected SSE Event:
data: {"status": "Received via REST", "message": "REST says hello"}
β
REST API now returns an instant response.
β
SSE clients still receive live updates from REST messages.
β
Perfect balance between synchronous & real-time updates!
π Now you have a fully working system with instant REST responses + SSE live updates! π―π₯
Let me know if you need any refinements! ππ
π Transform your business with cutting-edge AI chatbot solutions and expert consultancy! π Contact us today at https://www.schogini.com and take the first step toward smarter automation and unparalleled growth!