pip install uvicorn

啟動(dòng)fastapi測試接口

from fastapi import FastAPI, Request  

app = FastAPI()

@app.get("/")
async def root():
return {"message": "Hello World"}

使用命令?uvicorn server:app?來啟動(dòng)服務(wù),訪問?http://127.0.0.1:8000/??會(huì)得到{"message": "Hello World"}??的json 響應(yīng)即可。

添加 SSE 響應(yīng)支持

sse_starlette是一個(gè)擴(kuò)展,可以很方便的生成SSE響應(yīng)流, 使用pip install sse-starlette  來安裝這個(gè)包。

在sse_starlette.sse中有個(gè)EventSourceResponse類,這個(gè)類可以響應(yīng)SSE。

from fastapi import FastAPI, Request  
from sse_starlette.sse import EventSourceResponse
import asyncio

app = FastAPI()

@app.get("/")
async def root(request: Request):
async def event_generator(request: Request):
res_str = "七夕情人節(jié)即將來臨,我們?yōu)槟鷾?zhǔn)備了精美的鮮花和美味的蛋糕"
for i in res_str:
if await request.is_disconnected():
print("連接已中斷")
break
yield {
"event": "message",
"retry": 15000,
"data": i
}

await asyncio.sleep(0.1)
g = event_generator(request)
return EventSourceResponse(g)

EventSourceResponse類可以傳入異步生成器(generator),這里為什么要傳一個(gè)生成器呢?由于采用SSE響應(yīng)的數(shù)據(jù)一般是一點(diǎn)一點(diǎn)的返回給客戶端,不是一次性的返回,像上面的代碼,EventSourceResponse對(duì)象每次從g這個(gè)生成器中獲取到數(shù)據(jù),再將數(shù)據(jù)組裝成sse的標(biāo)準(zhǔn)格式。

SSE的標(biāo)準(zhǔn)格式

理論上作為SSE響應(yīng),我們可以任意的定義數(shù)據(jù)字段和值,但是一般情況下,為了和前端數(shù)據(jù)兼容,我們會(huì)用以下格式定義SSE響應(yīng)內(nèi)容。

event: message\r\ndata: \xe4\xb8\x83\r\nretry: 15000\r\n\r\n
event: message\r\ndata: \xe5\xa4\x95\r\nretry: 15000\r\n\r\n
  1. event: 表示事件,一般為message,如果有錯(cuò)誤的話,也可以設(shè)置為error。message和error在前端會(huì)分別觸發(fā)onmessage或onerror事件。
  2. retry: 重試時(shí)間,當(dāng)出錯(cuò)以后,或者event為error時(shí),后端可以定義這個(gè)時(shí)間,讓客戶端在retry時(shí)間后進(jìn)行重試,單位是毫秒。
  3. data: 具體的數(shù)據(jù)。

這些字段之間使用\r\n 分割,每個(gè)sse數(shù)據(jù)使用兩個(gè)\r\n, 也就是數(shù)據(jù)結(jié)尾處是兩個(gè)\r\n

當(dāng)然這個(gè)不是必須的,只是一種標(biāo)準(zhǔn),尤其是前端調(diào)用的時(shí)候,會(huì)對(duì)event值有一些不同的處理邏輯。最好統(tǒng)一下標(biāo)準(zhǔn)。

Python 客戶端接收數(shù)據(jù)

在使用Python調(diào)用接口時(shí),使用最多的庫為 requests庫,異步庫使用aiohttp比較多,我分別使用這兩個(gè)庫進(jìn)行演示。

使用 requests 庫調(diào)用接口得到SSE響應(yīng)。

import requests  

def test():
url = r"http://127.0.0.1:8000/"
headers = {'Content-Type': 'text/event-stream'}
response = requests.get(url, headers=headers, stream=True)
for chunk in response.iter_content(chunk_size=1024, decode_unicode=True):
print(chunk)

if __name__ == '__main__':
test()

這段代碼中使用了 response = requests.get(url, headers=headers, stream=True) 來獲取sse的內(nèi)容,這里有一個(gè)比較重要的參數(shù),stream=True, 使用了這個(gè)參數(shù)以后才可以達(dá)到SSE輸出的效果。這里的header可以設(shè)置也可以不設(shè)置。

之后調(diào)用response.iter_content() 函數(shù)來打印數(shù)據(jù)。

chunk_size: 默認(rèn)為1,正常情況下我們要設(shè)置一個(gè)比較大的值,否則獲取到一個(gè)字節(jié)數(shù)據(jù)就會(huì)走到下面的處理邏輯。decode_unicode: iter_content() 函數(shù)遍歷的數(shù)據(jù)是bytes類型的,這個(gè)參數(shù)可以控制是否將bytes轉(zhuǎn)為str。

注意,這里的chunk即使被轉(zhuǎn)換為字符串,也不是json格式的,我們看到服務(wù)端返回的數(shù)據(jù)像是一個(gè)json格式的:

yield {  
"event": "message",
"retry": 15000,
"data": i
}

但客戶端得到的中下面的這樣的格式,如果客戶端想要轉(zhuǎn)為json,需要再單獨(dú)處理一下。

event: message
data: 七
retry: 15000

event: message
data: 夕
retry: 15000

event: message
data: 情
retry: 15000

使用aiohttp調(diào)用接口獲取SSE返回。

aiohttp 作為異步調(diào)用接口常用的庫,使用它調(diào)用SSE響應(yīng)也很方便的。

async def test():  
headers = {'Content-Type': 'text/event-stream'}
sseresp = aiohttp.request("GET", r"http://127.0.0.1:8000/", headers=headers)
async with sseresp as r:
async for chunk in r.content.iter_any():
print(chunk.decode())

if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(test())

先使用aiohttp.request("GET", r"http://127.0.0.1:8000/", headers=headers) 構(gòu)造一個(gè)請(qǐng)求對(duì)象,注意這里沒有requests庫中的stream=True 參數(shù),如果加了會(huì)報(bào)錯(cuò)!之后開始遍歷數(shù)據(jù),注意這里是用的async with 和 async for r.content.iter_any(),這里一定要調(diào)用r.content.iter_any() 方法,否則達(dá)不到SSE的效果。

這里也沒有像requests庫中的decode_unicode=True?參數(shù),所以需要客戶端自己來decode數(shù)據(jù)。

FastAPI使用POST接收參數(shù)

FastAPI 本身在處理SSE請(qǐng)求與響應(yīng)時(shí),GETPOST方法是都支持的。我們來看一下POST方法。

from fastapi import FastAPI, Request  
from sse_starlette.sse import EventSourceResponse
import asyncio
from pydantic import BaseModel

app = FastAPI()

class Message(BaseModel):
message: str

@app.post("/sse")
async def indexpost(msg: Message, req: Request):
async def event_generator(request: Request):
res_str = msg.message
for i in res_str:
if await request.is_disconnected():
print("連接已中斷")
break
yield {
"event": "message",
"retry": 15000, # milisecond
"data": i
}

await asyncio.sleep(0.1)

return EventSourceResponse(event_generator(req))

代碼和上文的GET很像,只不過在GET方法中,是使用的默認(rèn)的一句話,“七夕情人節(jié)即將來臨,我們?yōu)槟鷾?zhǔn)備了精美的鮮花和美味的蛋糕”,而這里是由客戶端通過參數(shù)傳過來。

我們?cè)賮硎褂胊iohttp來使用POST方法調(diào)用一下接口。

import aiohttp  
import asyncio
import json

async def test_post():
headers = {'Content-Type': 'application/json'}
data = {"message": "七夕情人節(jié)快樂!"}
sseresp = aiohttp.request("POST", r"http://127.0.0.1:8000/sse", headers=headers, data=json.dumps(data))
async with sseresp as r:
async for chunk in r.content.iter_any():
print(chunk.decode())

if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(test_post())

注意這里的headers就一定要設(shè)置了。

前端通過POST方式調(diào)用SSE接口

一般的瀏覽器是支持SSE調(diào)用的

const eventSource = new EventSource('http_api_url', { withCredentials: true })

上面也有提到SSE響應(yīng)數(shù)據(jù)的標(biāo)準(zhǔn)格式也是為了兼容這里的前端瀏覽器調(diào)用,但是這里的前端調(diào)用有個(gè)比較麻煩的問題是,只能使用GET方法,有個(gè)開源項(xiàng)目,是Azure的,可以讓前端使用POST方式調(diào)用,有興趣的可以嘗試一下。

https://github.com/Azure/fetch-event-source

文章轉(zhuǎn)自微信公眾號(hào)@序語程言

上一篇:

使用gin搭建api后臺(tái)系統(tǒng)之MySQL初步CURD

下一篇:

優(yōu)化REST API資源跨域請(qǐng)求:啟用CORS的簡明步驟
#你可能也喜歡這些API文章!

我們有何不同?

API服務(wù)商零注冊(cè)

多API并行試用

數(shù)據(jù)驅(qū)動(dòng)選型,提升決策效率

查看全部API→
??

熱門場景實(shí)測,選對(duì)API

#AI文本生成大模型API

對(duì)比大模型API的內(nèi)容創(chuàng)意新穎性、情感共鳴力、商業(yè)轉(zhuǎn)化潛力

25個(gè)渠道
一鍵對(duì)比試用API 限時(shí)免費(fèi)

#AI深度推理大模型API

對(duì)比大模型API的邏輯推理準(zhǔn)確性、分析深度、可視化建議合理性

10個(gè)渠道
一鍵對(duì)比試用API 限時(shí)免費(fèi)