pip install uvicorn

啟動fastapi測試接口

from fastapi import FastAPI, Request  

app = FastAPI()

@app.get("/")
async def root():
return {"message": "Hello World"}

使用命令?uvicorn server:app?來啟動服務,訪問?http://127.0.0.1:8000/??會得到{"message": "Hello World"}??的json 響應即可。

添加 SSE 響應支持

sse_starlette是一個擴展,可以很方便的生成SSE響應流, 使用pip install sse-starlette  來安裝這個包。

在sse_starlette.sse中有個EventSourceResponse類,這個類可以響應SSE。

from fastapi import FastAPI, Request  
from sse_starlette.sse import EventSourceResponse
import asyncio

app = FastAPI()

@app.get("/")
async def root(request: Request):
async def event_generator(request: Request):
res_str = "七夕情人節即將來臨,我們為您準備了精美的鮮花和美味的蛋糕"
for i in res_str:
if await request.is_disconnected():
print("連接已中斷")
break
yield {
"event": "message",
"retry": 15000,
"data": i
}

await asyncio.sleep(0.1)
g = event_generator(request)
return EventSourceResponse(g)

EventSourceResponse類可以傳入異步生成器(generator),這里為什么要傳一個生成器呢?由于采用SSE響應的數據一般是一點一點的返回給客戶端,不是一次性的返回,像上面的代碼,EventSourceResponse對象每次從g這個生成器中獲取到數據,再將數據組裝成sse的標準格式。

SSE的標準格式

理論上作為SSE響應,我們可以任意的定義數據字段和值,但是一般情況下,為了和前端數據兼容,我們會用以下格式定義SSE響應內容。

event: message\r\ndata: \xe4\xb8\x83\r\nretry: 15000\r\n\r\n
event: message\r\ndata: \xe5\xa4\x95\r\nretry: 15000\r\n\r\n
  1. event: 表示事件,一般為message,如果有錯誤的話,也可以設置為error。message和error在前端會分別觸發onmessage或onerror事件。
  2. retry: 重試時間,當出錯以后,或者event為error時,后端可以定義這個時間,讓客戶端在retry時間后進行重試,單位是毫秒。
  3. data: 具體的數據。

這些字段之間使用\r\n 分割,每個sse數據使用兩個\r\n, 也就是數據結尾處是兩個\r\n。

當然這個不是必須的,只是一種標準,尤其是前端調用的時候,會對event值有一些不同的處理邏輯。最好統一下標準。

Python 客戶端接收數據

在使用Python調用接口時,使用最多的庫為 requests庫,異步庫使用aiohttp比較多,我分別使用這兩個庫進行演示。

使用 requests 庫調用接口得到SSE響應。

import requests  

def test():
url = r"http://127.0.0.1:8000/"
headers = {'Content-Type': 'text/event-stream'}
response = requests.get(url, headers=headers, stream=True)
for chunk in response.iter_content(chunk_size=1024, decode_unicode=True):
print(chunk)

if __name__ == '__main__':
test()

這段代碼中使用了 response = requests.get(url, headers=headers, stream=True) 來獲取sse的內容,這里有一個比較重要的參數,stream=True, 使用了這個參數以后才可以達到SSE輸出的效果。這里的header可以設置也可以不設置。

之后調用response.iter_content() 函數來打印數據。

chunk_size: 默認為1,正常情況下我們要設置一個比較大的值,否則獲取到一個字節數據就會走到下面的處理邏輯。decode_unicode: iter_content() 函數遍歷的數據是bytes類型的,這個參數可以控制是否將bytes轉為str。

注意,這里的chunk即使被轉換為字符串,也不是json格式的,我們看到服務端返回的數據像是一個json格式的:

yield {  
"event": "message",
"retry": 15000,
"data": i
}

但客戶端得到的中下面的這樣的格式,如果客戶端想要轉為json,需要再單獨處理一下。

event: message
data: 七
retry: 15000

event: message
data: 夕
retry: 15000

event: message
data: 情
retry: 15000

使用aiohttp調用接口獲取SSE返回。

aiohttp 作為異步調用接口常用的庫,使用它調用SSE響應也很方便的。

async def test():  
headers = {'Content-Type': 'text/event-stream'}
sseresp = aiohttp.request("GET", r"http://127.0.0.1:8000/", headers=headers)
async with sseresp as r:
async for chunk in r.content.iter_any():
print(chunk.decode())

if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(test())

先使用aiohttp.request("GET", r"http://127.0.0.1:8000/", headers=headers) 構造一個請求對象,注意這里沒有requests庫中的stream=True 參數,如果加了會報錯!之后開始遍歷數據,注意這里是用的async with 和 async for r.content.iter_any(),這里一定要調用r.content.iter_any() 方法,否則達不到SSE的效果。

這里也沒有像requests庫中的decode_unicode=True?參數,所以需要客戶端自己來decode數據。

FastAPI使用POST接收參數

FastAPI 本身在處理SSE請求與響應時,GETPOST方法是都支持的。我們來看一下POST方法。

from fastapi import FastAPI, Request  
from sse_starlette.sse import EventSourceResponse
import asyncio
from pydantic import BaseModel

app = FastAPI()

class Message(BaseModel):
message: str

@app.post("/sse")
async def indexpost(msg: Message, req: Request):
async def event_generator(request: Request):
res_str = msg.message
for i in res_str:
if await request.is_disconnected():
print("連接已中斷")
break
yield {
"event": "message",
"retry": 15000, # milisecond
"data": i
}

await asyncio.sleep(0.1)

return EventSourceResponse(event_generator(req))

代碼和上文的GET很像,只不過在GET方法中,是使用的默認的一句話,“七夕情人節即將來臨,我們為您準備了精美的鮮花和美味的蛋糕”,而這里是由客戶端通過參數傳過來。

我們再來使用aiohttp來使用POST方法調用一下接口。

import aiohttp  
import asyncio
import json

async def test_post():
headers = {'Content-Type': 'application/json'}
data = {"message": "七夕情人節快樂!"}
sseresp = aiohttp.request("POST", r"http://127.0.0.1:8000/sse", headers=headers, data=json.dumps(data))
async with sseresp as r:
async for chunk in r.content.iter_any():
print(chunk.decode())

if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(test_post())

注意這里的headers就一定要設置了。

前端通過POST方式調用SSE接口

一般的瀏覽器是支持SSE調用的

const eventSource = new EventSource('http_api_url', { withCredentials: true })

上面也有提到SSE響應數據的標準格式也是為了兼容這里的前端瀏覽器調用,但是這里的前端調用有個比較麻煩的問題是,只能使用GET方法,有個開源項目,是Azure的,可以讓前端使用POST方式調用,有興趣的可以嘗試一下。

https://github.com/Azure/fetch-event-source

文章轉自微信公眾號@序語程言

上一篇:

使用gin搭建api后臺系統之MySQL初步CURD

下一篇:

優化REST API資源跨域請求:啟用CORS的簡明步驟
#你可能也喜歡這些API文章!

我們有何不同?

API服務商零注冊

多API并行試用

數據驅動選型,提升決策效率

查看全部API→
??

熱門場景實測,選對API

#AI文本生成大模型API

對比大模型API的內容創意新穎性、情感共鳴力、商業轉化潛力

25個渠道
一鍵對比試用API 限時免費

#AI深度推理大模型API

對比大模型API的邏輯推理準確性、分析深度、可視化建議合理性

10個渠道
一鍵對比試用API 限時免費