How (and why) to implement streaming in your LLM application
There’s enough talk on how LLMs are changing software development. We’re going to keep it practical. Using LLMs in your applications produces specific, never-seen-before challenges, and trade-offs in product building. One such problem is how to manage the latency of text generation using LLMs.
Suppose you’re building an application that writes performance reviews for engineers. An average performance review is 300-400 words long according to this discussion. Using GPT-4, generating one review will take approximately 45 to 50 seconds. Now it’s not hard to imagine what will happen if you show a loading indicator for 50 seconds to a user. Only the most patient users will stick around to see the result – doesn’t matter how good the result will be.
This is drastically different from traditional applications where any data shown on UI is retrieved from a database that hardly takes 100s of milliseconds to a few seconds in the worst case. In comparison, 50 seconds is an eternity and will make the app feel like it’s something from the dial-up internet era. Latency is directly related to how good/bad the UX is and that in turn decides the bottom line. According to Amazon, every 100ms of added latency resulted in 1% reduction in sales.
So how do you solve this? Most LLMs have an option to stream the response token by token. Thus, you can start showing results to users immediately instead of making them wait until the whole generation is completed.
There are 3 factors to consider while implementing streaming:
- How do you receive the streaming response from LLMs
- How do you deliver a streaming response to client-side
- How do you receive the streaming response on the client side
Receiving streaming response from LLMs:
This part is pretty straightforward. If you’re using an API like OpenAI or Anthropic, they use server-sent events (SSE) for this. Here’s an example in Python:
import json
import sseclient
import requests
API_KEY = "XXXX" # your api key goes here
MODEL = "gpt-4"
MAX_TOKENS = 500
headers = {"Accept": "text/event-stream", "Authorization": "Bearer " + API_KEY}
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Write a sample 400 word performance review for John Doe who is a senior software engineer."},
]
body = {
"model": MODEL,
"messages": messages,
"max_tokens": MAX_TOKENS,
"temperature": 0.9,
"top_p": 1,
"frequency_penalty": 0,
"presence_penalty": 0,
"stream": True,
}
request = requests.post(
"https://api.openai.com/v1/chat/completions",
stream=True,
headers=headers,
json=body,
)
client = sseclient.SSEClient(request)
for event in client.events():
# print(event.data)
if event.data != "[DONE]":
event_data = json.loads(event.data)
# use yield here to create a generator if you implement this as a utils function
print(event_data["choices"][0]["delta"].get("content", ""))
else:
print("[DONE]")
This blog is by Sourabh, our CTO who spends most of his time building an AI agent and gives the best restaurant recommendations. If you like this post, try KushoAI today, and start shipping bug-free code faster!
Delivering a streaming response to client-side:
Once you start receiving tokens from your LLM and you’re done with post-processing, you need to stream these to the client side. Here you have multiple options depending on how you want to trade off between realtime-ness and complexity - Polling, SSE, and WebSockets.
Polling
This approach can be considered as faux-streaming. This will give users the look and feel of streaming without actually having to stream the response from the backend. This is also the simplest of the 3 options for both backend and frontend implementation. Simply put, in polling, the client periodically checks with the backend for new tokens and displays them on the UI. There are 2 ways to implement this.
Long polling
In long polling, the client sends a request to the backend and the connection is kept open until some tokens are generated which are returned to the client. After receiving the tokens, the client sends another request, and so on.
There will be some timeout for these calls so that they don't go on indefinitely, but normally the timeout is very high (20-30s).
To stop the polling, send a special stop event from the backend once the generation is completed so that the client knows that polling needs to stop.
Short polling
This is the most straightforward approach and works pretty well when you don’t need real-time updates. In this approach, the client sends requests to the server periodically asking for new tokens.
Some additional things to consider while implementing polling:
- You’ll need some way to keep track of how many tokens have already been sent so that the backend can send only the newly generated tokens since the last call something like offset in MySQL queries.
- You need to send a special stop response/event so that the client knows that generation is completed and stop the polling.
Server-Sent Events (SSE)
Another approach for streaming generated tokens to UI is by sending server-sent events (SSE). This is the same approach as what we discussed above for receiving streaming responses from LLM APIs but this time you’ll be implementing the server-side component for this. Here’s how you do it in Flask:
@app.post("/generate")
@use_args({
"employee_name": fields.Str(required=True),
"employee_review_points": fields.Str(required=True),
}, location="json")
def generate_performance_review():
employee_name = request.json["employee_name"]
employee_review_points = request.json["employee_review_points"]
def generate():
for tokens in perf_utils.generate_review(employee_name, employee_review_points):
yield f"event:data\ndata:{tokens}\n\n"
yield "[DONE]"
response = Response(generate(), mimetype="text/event-stream")
# Enable the server-sent events extension
response.headers.add("X-Accel-Buffering", "no")
return response
You can refer to this blog for more implementation details.
That’s pretty much it! One good thing about this is if you already have a normal API, converting it to SSE can be done with minimal change. Just create a generator that yields the response to be streamed a few tokens at a time, change the mimetype, and you’re done!
Few additional things to consider while implementing this -
You’ll need to set threaded
as True
in development or else your Flask dev server will get blocked. You can read more about it in this discussion.
# Start app in dev
if __name__ == "__main__":
flask_app.run(host='0.0.0.0', port=8080, debug=True, threaded=True)
To test server-sent events, you can use curl with --no-buffer flag set like this:
curl –no-buffer https://www.example.com/streaming_endpoint
Websockets
Websockets allow the client and server to maintain a persistent bi-directional communication channel for sending/receiving from both ends. This implementation makes sense when you’re building an application that needs real-time transfer of packets from both the client and server side – like a chat application. But in the context of applications using LLMs, this seems like an overkill because these are most of the time apps for generating some type of text/code that doesn’t need real-time bi-directional communication.
Receiving streaming response on the client side
Example using Axios
const response = await axios.get('https://stream.example.com', {
headers: {Authorization: `Bearer ${token}`,
responseType: 'stream'
});
const stream = response.data;
stream.on('data', data => {
console.log(data);
});
stream.on('end', () => {
console.log("stream done");
});
Example using Fetch API
fetch('/stream')
.then(response => {
const stream = response.body;
const reader = stream.getReader();
const readChunk = () => {
reader.read()
.then(({
value,
done
}) => {
if (done) {
console.log('Stream finished');
return;
}
const chunkString = new TextDecoder().decode(value);
console.log(chunkString);
readChunk();
})
.catch(error => {
console.error(error);
});
};
readChunk();
})
.catch(error => {
console.error(error);
});
Conclusion
If you’re building an application that uses LLMs and you want your users to not drop off while the generation is happening. Thus, streaming is an essential feature.
Since most LLM applications generate some kind of text/code that is not really realtime, short polling is the simplest approach which works sufficiently well for these kinds of apps, in our opinion.
Know of another way to implement this? Write to us at hello@kusho.ai
Member discussion