Data Science Engineering

Building a Streaming OpenAI App in Python

May 12, 2023

author:

Building a Streaming OpenAI App in Python

In a world where technology is advancing at breakneck speed, OpenAI APIs stand out as a game-changing innovation. In this blog, I’ll take you on a journey to harness the power of OpenAI’s APIs and build a Rest API using FastAPI in Python

Find the entire code here: https://github.com/SidJain1412/StreamingFastAPI

FastAPI is:

  • Easy to use
  • Faster than Flask 
  • Comes with built-in documentation
  • Adheres to OpenAPI (not to be confused with OpenAI) standards 

Recently, we at CleverTap released Scribe, and we built the API behind it using FastAPI, using many of its functionalities.

In this blog, I’ll go over the basics of FastAPI, Query validations and Streaming APIs , and we’ll build an API that generates a marketing campaign strategy for the brand given in the input!

What’s a Streaming API?

If you’re familiar with ChatGPT (and who isn’t?), you’ll know that it generates responses on-the-fly, rather than waiting until the entire output is ready before displaying it. This clever technique keeps users engaged by allowing them to read the response as it’s being formed.

As we build our own API on top of OpenAI’s APIs, we should take advantage of this feature to create a seamless and satisfying experience for end-users.

FastAPI Magic

Building an API with FastAPI is very easy. A few lines of code and we have an API ready, with documentation and validations

# Run using `uvicorn simple_app:app –reload`
# Visit 127.0.0.1:8000/docs for documentation and testing the API
from fastapi import FastAPI, Query
app = FastAPI(
title="FastAPI Example",
description="### Hello world",
version=1.0,
)
@app.get("/echo/", tags=["APIs"], response_model=str)
def echo(input_value: str = Query(…, max_length=20)):
return input_value
view raw simple_app.py hosted with ❤ by GitHub

Run this app using uvicorn simple_app:app –reload and go to http://127.0.0.1:8000/docs to find documentation and test your API there.

The interactive docs page generated by FastAPI

Most people who have used Flask know that this is much simpler, and the improved performance is an added bonus!

Integrating OpenAI

You need an OpenAI API key for this step forward, find/create it here

from fastapi import FastAPI, Query
import openai
import os
import sys
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
if not len(OPENAI_API_KEY):
print("Please set OPENAI_API_KEY environment variable. Exiting.")
sys.exit(1)
openai.api_key = OPENAI_API_KEY
app = FastAPI(
title="Simple API",
)
def get_response_openai(prompt):
try:
prompt = prompt
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
n=1,
top_p=1,
frequency_penalty=0,
presence_penalty=0,
messages=[
{"role": "system", "content": "You are an expert creative marketer. Create a campaign for the brand the user enters."},
{"role": "user", "content": prompt},
],
)
except Exception as e:
print("Error in creating campaigns from openAI:", str(e))
return 503
return response["choices"][0]["message"]["content"]
@app.get(
"/campaign/",
tags=["APIs"],
response_model=str,
)
def campaign(prompt: str = Query(…, max_length=20)):
return get_response_openai(prompt)

In this code, we are simple getting the API Key from environment variables (can be set using export OPENAI_API_KEY=<your_api_key> in your terminal), then creating a method using FastAPI decorators (@app.get). The method get_response_openai simply fires a query and returns the generated text (which is a marketing campaign outline in this case). Check out OpenAI Documentation for more about this method.

The input for the API is defined in prompt: str = Query(..., max_length=20). The ... tells FastAPI that prompt is a required parameter. We have kept the max_length short because all we need from the user is a brand name.

FastAPI automatically handles data validation (i.e. if the input is more than 20 characters long, it will throw an appropriate error message and code)

Streaming!

Now that we know how to create a simple API, let’s convert it to a streaming one.

If you tested the above example, you would notice that generating the output takes 20-30 seconds. Even though streaming won’t accelerate the output, providing real-time feedback to the end user that the output is being generated keeps them engaged and interested.

First, we import from fastapi.responses import StreamingResponse, which allows us to return chunks of data from our API.

OpenAI allows us to get streaming responses just by adding a parameter.

We can iterate through this response using this code:

Notice that we have replaced return with yield. Yield converts the function to a generator function, which returns an iterable object, allowing us to iterate over the response from the function as it gets generated. A good read if you want to understand further: https://www.guru99.com/python-yield-return-generator.html 

Now, we wrap our FastAPI method with StreamingResponse to convert it to a streaming API.

Specifying the media_type prevents browsers from buffering the output (ref this answer)

And that’s it! Our API can now stream responses directly from OpenAI’s API.

Refer to this commit to see the changes all together.

Testing the API:

Let’s see the API in action. We can do this with the requests library in Python, or even with a simple cURL command.

import requests
url = "http://127.0.0.1:8000/campaign/?prompt=Pepsi&quot;
response = requests.get(
url,
stream=True,
headers={"accept": "application/json"},
)
for chunk in response.iter_content(chunk_size=1024):
if chunk:
print(str(chunk, encoding="utf-8"), end="")

Running the code gives us this result:

Live Streaming in Jupyter

Good FastAPI Practices

What happens when OpenAI throws an error because too many people are using ChatGPT to solve homework problems?

A good API throws good, understandable errors, with correct error codes (not everything is a 500 error!).

Let’s import from fastapi import HTTPException and define our error message in case we get a 503 error from OpenAI. 

error503 = "OpenAI server is busy, try again later"

In case of an error when calling OpenAI APIs, we simply raise an HTTPException using raise HTTPException(503, error503).

Now if and when OpenAI models are down, our API end user will get a nice, consumable error message.

We can add this error to our API documentation by adding this line to our API definition:

And that’s it, we have our streaming API with error handling ready to ship!

Next Steps

In Part 2 of this blog, I will go over logging, caching, monitoring and deployment!

Leave a comment

Leave a Reply

%d