πŸ”Ή Server-Sent Events (SSE) Explained with Python: A Simple & Elegant Tutorial

🎯 What You’ll Learn:
- What is SSE (Server-Sent Events)?
- How to build a simple SSE server in Python using FastAPI.
- How to build an SSE client to receive real-time updates.
- How SSE compares to WebSockets & Polling.


πŸ”Ή What is SSE (Server-Sent Events)?

βœ… SSE (Server-Sent Events) allows a server to push updates to a client over HTTP in real-time.
βœ… Unlike WebSockets, SSE is one-way (server β†’ client).
βœ… Uses HTTP Streaming to continuously send data to the client.

πŸ”Ή Best for: Real-time notifications, live dashboards, stock prices, etc.
πŸ”Ή Not for: Full-duplex (bidirectional) communication (use WebSockets for that).


πŸ“Œ Step 1: Install Dependencies

Before running the code, install the required packages: sh pip install fastapi uvicorn aiohttp


πŸ“Œ Step 2: Create the SSE Server (sse_server.py)

This server streams messages to clients using SSE.

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()

async def event_stream():
    """Sends a new event every 2 seconds."""
    counter = 0
    while True:
        counter += 1
        yield f"data: Message {counter}\n\n"  # SSE format
        await asyncio.sleep(2)

@app.get("/sse")
async def sse_endpoint():
    """SSE endpoint that streams real-time events."""
    return StreamingResponse(event_stream(), media_type="text/event-stream")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

πŸ”Ή How It Works:
- event_stream(): Generates messages every 2 seconds.
- StreamingResponse(): Streams the data to clients over HTTP.
- Sends messages in SSE format (data: message \n\n).


πŸ“Œ Step 3: Create the SSE Client (sse_client.py)

This client listens for messages from the server.

import aiohttp
import asyncio

async def sse_client():
    url = "http://localhost:8000/sse"
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            async for line in response.content:
                if line:
                    print(f"πŸ“© Received from Server: {line.decode().strip()}")

asyncio.run(sse_client())

πŸ”Ή How It Works:
- aiohttp.ClientSession(): Creates an HTTP session.
- session.get(url): Opens an SSE connection.
- async for line in response.content: Listens for new messages.


πŸ“Œ Step 4: Running the SSE Demo

βœ… Start the Server

Run the SSE server in one terminal: sh python sse_server.py You should see: INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)

βœ… Start the Client

Run the client in another terminal: sh python sse_client.py You will receive: πŸ“© Received from Server: data: Message 1 πŸ“© Received from Server: data: Message 2 πŸ“© Received from Server: data: Message 3 ...

πŸŽ‰ Now you have a working SSE example!


πŸ”Ή Step 5: Testing with curl

Instead of a Python client, you can use curl to listen for SSE events: sh curl -N -H "Accept: text/event-stream" http://localhost:8000/sse You’ll see: data: Message 1 data: Message 2 data: Message 3 ...


πŸ”Ή SSE vs WebSockets vs Polling

Feature SSE (Server-Sent Events) WebSockets Polling
Connection One-way (Server β†’ Client) Two-way (Full Duplex) Repeated HTTP Requests
Use Case Live updates, notifications Chat, gaming, real-time apps Basic updates
Efficiency βœ… Efficient βœ… Efficient ❌ High Latency & Load
Browser Support βœ… Native Support ❌ Requires JS API βœ… Simple but slow

πŸš€ Summary

πŸ”Ή SSE is perfect for real-time server updates where the client only needs to receive data.
πŸ”Ή WebSockets are better for bidirectional communication (e.g., chat apps).
πŸ”Ή Polling is inefficient and should be avoided when real-time data is needed.


πŸš€ Now you fully understand Server-Sent Events! 🎯πŸ”₯
Let me know if you need more modifications or explanations! πŸ˜ŠπŸš€

TUTORIAL -2

πŸš€ Full SSE Demo with Docker & Streamlit Frontend

This version includes:
βœ… SSE Server (FastAPI)
βœ… SSE Client (Python aiohttp)
βœ… Frontend (Streamlit) to display real-time events
βœ… Docker Compose to containerize everything


πŸ“Œ Step 1: Create the SSE Server (sse_server.py)

This FastAPI server streams messages to connected clients.

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()

async def event_stream():
    """Sends a new event every 2 seconds."""
    counter = 0
    while True:
        counter += 1
        yield f"data: Message {counter}\n\n"  # SSE format
        await asyncio.sleep(2)

@app.get("/sse")
async def sse_endpoint():
    """SSE endpoint that streams real-time events."""
    return StreamingResponse(event_stream(), media_type="text/event-stream")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

πŸ“Œ Step 2: Create the SSE Client (sse_client.py)

This listens to the SSE server and prints received messages.

import aiohttp
import asyncio

async def sse_client():
    url = "http://sse-server:8000/sse"  # Docker container hostname
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            async for line in response.content:
                if line:
                    print(f"πŸ“© Received from Server: {line.decode().strip()}")

asyncio.run(sse_client())

πŸ“Œ Step 3: Create the Streamlit Frontend (frontend.py)

This shows real-time SSE events in a web UI.

import streamlit as st
import requests
import time

st.title("πŸ“‘ Real-Time SSE Events")

st.markdown("### Incoming Messages:")

# Container for messages
messages = st.empty()

def get_sse_messages():
    url = "http://sse-server:8000/sse"
    with requests.get(url, stream=True) as response:
        for line in response.iter_lines():
            if line:
                yield line.decode()

# Display messages in real-time
for msg in get_sse_messages():
    messages.text(msg)
    time.sleep(0.5)  # Smooth UI update

πŸ“Œ Step 4: Containerize with Docker

βœ… Create a Dockerfile for the SSE Server

# FastAPI SSE Server
FROM python:3.11-slim

WORKDIR /app
COPY sse_server.py /app/
RUN pip install fastapi uvicorn

CMD ["python", "sse_server.py"]

βœ… Create a Dockerfile for the Streamlit Frontend

# Streamlit Frontend
FROM python:3.11-slim

WORKDIR /app
COPY frontend.py /app/
RUN pip install streamlit requests

CMD ["streamlit", "run", "frontend.py", "--server.port=8501", "--server.address=0.0.0.0"]

πŸ“Œ Step 5: Create docker-compose.yml

This defines three services:
1️⃣ sse-server (FastAPI SSE server)
2️⃣ sse-client (Python SSE listener)
3️⃣ frontend (Streamlit UI to display messages)

version: '3.8'

services:
  sse-server:
    build: .
    container_name: sse-server
    ports:
      - "8000:8000"

  sse-client:
    build: .
    container_name: sse-client
    depends_on:
      - sse-server
    entrypoint: ["python"]
    command: ["sse_client.py"]

  frontend:
    build: .
    container_name: sse-frontend
    depends_on:
      - sse-server
    ports:
      - "8501:8501"
    entrypoint: ["python"]
    command: ["frontend.py"]

πŸ“Œ Step 6: Run the Full Setup

βœ… Build and Start the Containers

docker-compose up --build

βœ… This will: - Start the SSE Server on http://localhost:8000/sse - Run the SSE Client to listen for messages - Launch the Streamlit UI at http://localhost:8501


πŸ“Œ Step 7: View the Live Events

1️⃣ Check the logs for the SSE Client
sh docker logs -f sse-client 🟒 Output: πŸ“© Received from Server: data: Message 1 πŸ“© Received from Server: data: Message 2 ...

2️⃣ Visit the Frontend:
πŸ“Œ Go to http://localhost:8501
πŸ”Ή You will see messages appearing in real-time!


πŸš€ Final Takeaway

βœ… SSE Server: Pushes real-time messages.
βœ… SSE Client: Listens & prints updates.
βœ… Frontend UI: Displays messages live using Streamlit.
βœ… Dockerized & scalable!

πŸš€ Now you have a fully working, containerized SSE example! 🎯πŸ”₯
Let me know if you need refinements! πŸ˜ŠπŸš€

TUTORIAL 3

πŸš€ Fully Interactive SSE System with Input & Acknowledgment

This upgrade includes:
βœ… SSE Server (FastAPI) with JSON Input Handling
βœ… Streamlit Frontend with a Button & Text Input for Sending Messages
βœ… Dockerized Setup for Easy Deployment


πŸ“Œ Step 1: Update the SSE Server (sse_server.py)

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import json

app = FastAPI()

message_queue = asyncio.Queue()  # Stores messages from clients

async def event_stream():
    """Sends events, including user acknowledgments."""
    while True:
        message = await message_queue.get()  # Wait for a new message
        yield f"data: Acknowledgment - Received: {message}\n\n"  # SSE format

@app.get("/sse")
async def sse_endpoint():
    """SSE endpoint for real-time streaming."""
    return StreamingResponse(event_stream(), media_type="text/event-stream")

@app.post("/messages")
async def post_message(request: Request):
    """Receives JSON messages and adds them to the queue."""
    data = await request.json()
    message = data.get("message", "No message provided")
    await message_queue.put(json.dumps({"status": "Received", "message": message}))
    return {"status": "Message received", "message": message}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

βœ… New Feature: Messages sent via POST /messages are acknowledged in the SSE stream.


πŸ“Œ Step 2: Update the Streamlit Frontend (frontend.py)

import streamlit as st
import requests
import threading

st.title("πŸ“‘ Real-Time SSE with Acknowledgments")

st.markdown("### Incoming Messages:")
messages_container = st.empty()

# Function to listen for SSE events
def get_sse_messages():
    url = "http://sse-server:8000/sse"
    with requests.get(url, stream=True) as response:
        for line in response.iter_lines():
            if line:
                messages_container.text(line.decode())

# Start SSE listener in a separate thread (to keep UI responsive)
thread = threading.Thread(target=get_sse_messages, daemon=True)
thread.start()

st.markdown("### Send a Message to the Server:")
user_input = st.text_input("Enter your message:")
if st.button("Send"):
    response = requests.post("http://sse-server:8000/messages", json={"message": user_input})
    st.success(f"Server Response: {response.json()}")

βœ… New Feature: Sends messages via POST, and SSE stream confirms receipt.


πŸ“Œ Step 3: Update docker-compose.yml

Adds three services: 1️⃣ sse-server β†’ FastAPI SSE backend
2️⃣ sse-client β†’ Background Python SSE listener
3️⃣ frontend β†’ Streamlit UI for sending messages & receiving responses

version: '3.8'

services:
  sse-server:
    build: .
    container_name: sse-server
    ports:
      - "8000:8000"

  sse-client:
    build: .
    container_name: sse-client
    depends_on:
      - sse-server
    entrypoint: ["python"]
    command: ["sse_client.py"]

  frontend:
    build: .
    container_name: sse-frontend
    depends_on:
      - sse-server
    ports:
      - "8501:8501"
    entrypoint: ["python"]
    command: ["frontend.py"]

πŸ“Œ Step 4: Running the Full System

βœ… Build & Start the Containers

docker-compose up --build

βœ… This starts: - SSE Server (http://localhost:8000/sse) - Frontend UI (http://localhost:8501)


πŸ“Œ Step 5: Testing

1️⃣ Go to http://localhost:8501
2️⃣ Enter a message & click Send
3️⃣ Check the "Incoming Messages" section β†’ You’ll see:
data: Acknowledgment - Received: {"status": "Received", "message": "Hello Server"}


πŸš€ Final Takeaway

βœ… Full interactive SSE system
βœ… Messages sent via Streamlit UI get acknowledged live
βœ… Dockerized for easy deployment

πŸš€ Now you have a fully working two-way SSE system! 🎯πŸ”₯
Let me know if you need further modifications! πŸ˜ŠπŸš€

REST vs SSE

πŸ”Ή REST vs SSE (Server-Sent Events) – Key Differences

βœ… REST (Representational State Transfer) and SSE (Server-Sent Events) are both HTTP-based communication methods, but they serve different purposes.


πŸ”Ή 1. Overview

Feature REST (HTTP APIs) SSE (Server-Sent Events)
Type of Communication Request-Response One-Way Streaming (Server β†’ Client)
Initiated By Client Server
Use Case CRUD operations (Create, Read, Update, Delete) Real-time notifications, live updates
Connection Type Short-lived Persistent (long-lived)
Efficiency Higher latency due to repeated requests Low latency with efficient updates
State Management Stateless Stateful connection
Example Fetch user details, submit forms Live stock updates, real-time chat notifications

πŸ”Ή 2. How They Work

βœ… REST (Traditional APIs)

1️⃣ Client sends a request (e.g., GET /users/123)
2️⃣ Server processes the request
3️⃣ Server sends a response (e.g., user details in JSON format)
4️⃣ Connection closes after response

πŸ”Ή REST Example: http GET /users/123 HTTP/1.1 Host: api.example.com πŸ”Ή Response: json { "id": 123, "name": "John Doe", "email": "john@example.com" }

βœ… Best for: Fetching and modifying data on demand.
❌ Not good for: Real-time updates, as the client must continuously poll the server.


βœ… SSE (Real-Time Data Streaming)

1️⃣ Client makes an initial connection (GET /sse).
2️⃣ Server keeps the connection open and pushes updates as they happen.
3️⃣ Client passively listens and receives messages without making new requests.

πŸ”Ή SSE Example: http GET /sse HTTP/1.1 Host: api.example.com Accept: text/event-stream πŸ”Ή Server Response (Continuous Messages): ``` data: {"message": "Stock price updated: $120"}

data: {"message": "Breaking news: New product launched!"} ```

βœ… Best for: Live feeds, notifications, financial updates.
❌ Not good for: Two-way communication (use WebSockets instead).


πŸ”Ή 3. When to Use REST vs SSE?

Scenario Use REST? Use SSE?
Fetching data once βœ… Yes ❌ No
Submitting a form βœ… Yes ❌ No
Live notifications ❌ No βœ… Yes
Real-time stock prices ❌ No βœ… Yes
IoT sensor updates ❌ No βœ… Yes
Chat application ❌ No ❌ No (Use WebSockets)

πŸš€ Final Takeaway

Feature REST SSE
Best for Standard API interactions Real-time updates
Efficiency Less efficient for real-time Optimized for live data
Scalability Easy to scale Can require extra load balancing
Alternative WebSockets for bidirectional communication WebSockets for full-duplex communication

πŸš€ Now you fully understand when to use REST and when to use SSE! 🎯πŸ”₯
Let me know if you need more details! πŸ˜ŠπŸš€

πŸš€ Adding REST Endpoint & "Send as REST" Button in Frontend

This update includes:
βœ… A RESTful API endpoint (/send_rest) to send messages
βœ… A "Send as REST" button in the frontend
βœ… Backend acknowledges REST messages and pushes them to SSE clients


πŸ“Œ Step 1: Update the SSE Server (sse_server.py)

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import json

app = FastAPI()

message_queue = asyncio.Queue()  # Stores messages from clients

async def event_stream():
    """Streams messages, including REST and UI acknowledgments."""
    while True:
        message = await message_queue.get()  # Wait for a new message
        yield f"data: {message}\n\n"  # SSE format

@app.get("/sse")
async def sse_endpoint():
    """SSE endpoint for real-time streaming."""
    return StreamingResponse(event_stream(), media_type="text/event-stream")

@app.post("/messages")
async def post_message(request: Request):
    """Receives messages from the UI and adds them to the SSE queue."""
    data = await request.json()
    message = data.get("message", "No message provided")
    response = {"status": "Received", "message": message}
    await message_queue.put(json.dumps(response))
    return response

@app.post("/send_rest")
async def send_rest_message(request: Request):
    """Receives messages from REST API and adds them to SSE queue."""
    data = await request.json()
    message = data.get("message", "No REST message provided")
    response = {"status": "Received via REST", "message": message}
    await message_queue.put(json.dumps(response))  # Send to SSE
    return response

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

βœ… New Feature: Messages sent via POST /send_rest are acknowledged in the SSE stream.


πŸ“Œ Step 2: Update the Streamlit Frontend (frontend.py)

import streamlit as st
import requests
import threading

st.title("πŸ“‘ Real-Time SSE with REST Integration")

st.markdown("### Incoming Messages:")
messages_container = st.empty()

# Function to listen for SSE events
def get_sse_messages():
    url = "http://sse-server:8000/sse"
    with requests.get(url, stream=True) as response:
        for line in response.iter_lines():
            if line:
                messages_container.text(line.decode())

# Start SSE listener in a separate thread
thread = threading.Thread(target=get_sse_messages, daemon=True)
thread.start()

st.markdown("### Send a Message to the Server:")
user_input = st.text_input("Enter your message:")

col1, col2 = st.columns(2)

# Button to send message via SSE
if col1.button("Send as SSE"):
    response = requests.post("http://sse-server:8000/messages", json={"message": user_input})
    st.success(f"SSE Response: {response.json()}")

# Button to send message via REST API
if col2.button("Send as REST"):
    response = requests.post("http://sse-server:8000/send_rest", json={"message": user_input})
    st.success(f"REST Response: {response.json()}")

βœ… New Feature: "Send as REST" button sends a message via REST API (POST /send_rest) and receives SSE acknowledgment.


πŸ“Œ Step 3: Update docker-compose.yml

version: '3.8'

services:
  sse-server:
    build: .
    container_name: sse-server
    ports:
      - "8000:8000"

  sse-client:
    build: .
    container_name: sse-client
    depends_on:
      - sse-server
    entrypoint: ["python"]
    command: ["sse_client.py"]

  frontend:
    build: .
    container_name: sse-frontend
    depends_on:
      - sse-server
    ports:
      - "8501:8501"
    entrypoint: ["python"]
    command: ["frontend.py"]

πŸ“Œ Step 4: Running the System

βœ… Build & Start the Containers

docker-compose up --build

βœ… This starts: - SSE Server (http://localhost:8000/sse) - Streamlit Frontend (http://localhost:8501)


πŸ“Œ Step 5: Testing

1️⃣ Go to http://localhost:8501
2️⃣ Enter a message & click "Send as SSE"
3️⃣ Enter another message & click "Send as REST"
4️⃣ Check the Incoming Messages section β†’ You’ll see: data: {"status": "Received", "message": "Hello from SSE"} data: {"status": "Received via REST", "message": "Hello from REST"}


πŸš€ Final Takeaway

βœ… SSE Server now supports both SSE & REST API!
βœ… Streamlit UI allows sending messages via both methods!
βœ… REST API messages are acknowledged via SSE stream!
βœ… Fully containerized & scalable!

πŸš€ Now you have a complete real-time SSE system with REST support! 🎯πŸ”₯
Let me know if you need any refinements! πŸ˜ŠπŸš€

REST AMENDED

πŸ”Ή Should REST Respond Directly Instead of Using SSE Queue?

βœ… Yes, a standard REST API should respond immediately to the request, instead of relying on the SSE queue.
βœ… However, if we want REST messages to also notify SSE clients, we need to both return a direct response and push the message into the SSE queue.


πŸ“Œ Correcting the REST Endpoint (sse_server.py)

βœ… REST /send_rest now returns an immediate response.
βœ… It still pushes the message into the SSE queue for real-time updates.

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import json

app = FastAPI()

message_queue = asyncio.Queue()  # Stores messages for SSE clients

async def event_stream():
    """Streams messages in real-time to SSE clients."""
    while True:
        message = await message_queue.get()
        yield f"data: {message}\n\n"  # SSE format

@app.get("/sse")
async def sse_endpoint():
    """SSE endpoint for real-time streaming."""
    return StreamingResponse(event_stream(), media_type="text/event-stream")

@app.post("/messages")
async def post_message(request: Request):
    """Receives messages from UI and adds them to the SSE queue."""
    data = await request.json()
    message = data.get("message", "No message provided")
    response = {"status": "Received", "message": message}
    
    # βœ… SSE clients get notified
    await message_queue.put(json.dumps(response))  
    
    return response  # βœ… REST immediately returns a response

@app.post("/send_rest")
async def send_rest_message(request: Request):
    """Receives REST messages, responds immediately, and notifies SSE clients."""
    data = await request.json()
    message = data.get("message", "No REST message provided")
    
    response = {"status": "Received via REST", "message": message}
    
    # βœ… Immediate REST response
    immediate_response = response.copy()
    
    # βœ… Push to SSE queue so real-time clients get notified
    await message_queue.put(json.dumps(response))  

    return immediate_response  # βœ… REST returns response instantly

πŸ“Œ What’s Fixed?

Feature Before (❌ Bad) Now (βœ… Fixed)
REST /send_rest response Delayed until SSE processed Returns immediately
SSE Notifications for REST messages ❌ Not always sent βœ… Now also sent to SSE clients

βœ… Now, REST clients get an instant response, while SSE clients still receive real-time updates!


πŸ“Œ Corrected Streamlit Frontend (frontend.py)

import streamlit as st
import requests
import threading

st.title("πŸ“‘ Real-Time SSE with REST Support")

st.markdown("### Incoming Messages:")
messages_container = st.empty()

# Function to listen for SSE events
def get_sse_messages():
    url = "http://sse-server:8000/sse"
    with requests.get(url, stream=True) as response:
        for line in response.iter_lines():
            if line:
                messages_container.text(line.decode())

# Start SSE listener in a separate thread
thread = threading.Thread(target=get_sse_messages, daemon=True)
thread.start()

st.markdown("### Send a Message to the Server:")
user_input = st.text_input("Enter your message:")

col1, col2 = st.columns(2)

if col1.button("Send as SSE"):
    response = requests.post("http://sse-server:8000/messages", json={"message": user_input})
    st.success(f"SSE Response: {response.json()}")

if col2.button("Send as REST"):
    response = requests.post("http://sse-server:8000/send_rest", json={"message": user_input})
    st.success(f"REST Response: {response.json()}")  # βœ… Shows immediate REST response

πŸ“Œ Final Testing

βœ… Test REST API (Direct Response)

curl -X POST -H "Content-Type: application/json" -d '{"message": "REST says hello"}' http://localhost:8000/send_rest

Expected Immediate Response: json { "status": "Received via REST", "message": "REST says hello" }

βœ… Check Live SSE Stream

curl -N -H "Accept: text/event-stream" http://localhost:8000/sse

Expected SSE Event: data: {"status": "Received via REST", "message": "REST says hello"}


πŸš€ Final Takeaway

βœ… REST API now returns an instant response.
βœ… SSE clients still receive live updates from REST messages.
βœ… Perfect balance between synchronous & real-time updates!

πŸš€ Now you have a fully working system with instant REST responses + SSE live updates! 🎯πŸ”₯
Let me know if you need any refinements! πŸ˜ŠπŸš€


πŸš€ Transform your business with cutting-edge AI chatbot solutions and expert consultancy! 🌟 Contact us today at https://www.schogini.com and take the first step toward smarter automation and unparalleled growth!