
2024年在線市場平臺(tái)的11大最佳支付解決方案
pip install uvicorn
啟動(dòng)fastapi測試接口
from fastapi import FastAPI, Request
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World"}
使用命令?uvicorn server:app
?來啟動(dòng)服務(wù),訪問?http://127.0.0.1:8000/
??會(huì)得到{"message": "Hello World"}
??的json 響應(yīng)即可。
sse_starlette是一個(gè)擴(kuò)展,可以很方便的生成SSE響應(yīng)流, 使用pip install sse-starlette
來安裝這個(gè)包。
在sse_starlette.sse中有個(gè)EventSourceResponse類,這個(gè)類可以響應(yīng)SSE。
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse
import asyncio
app = FastAPI()
@app.get("/")
async def root(request: Request):
async def event_generator(request: Request):
res_str = "七夕情人節(jié)即將來臨,我們?yōu)槟鷾?zhǔn)備了精美的鮮花和美味的蛋糕"
for i in res_str:
if await request.is_disconnected():
print("連接已中斷")
break
yield {
"event": "message",
"retry": 15000,
"data": i
}
await asyncio.sleep(0.1)
g = event_generator(request)
return EventSourceResponse(g)
EventSourceResponse類可以傳入異步生成器(generator),這里為什么要傳一個(gè)生成器呢?由于采用SSE響應(yīng)的數(shù)據(jù)一般是一點(diǎn)一點(diǎn)的返回給客戶端,不是一次性的返回,像上面的代碼,EventSourceResponse對(duì)象每次從g這個(gè)生成器中獲取到數(shù)據(jù),再將數(shù)據(jù)組裝成sse的標(biāo)準(zhǔn)格式。
理論上作為SSE響應(yīng),我們可以任意的定義數(shù)據(jù)字段和值,但是一般情況下,為了和前端數(shù)據(jù)兼容,我們會(huì)用以下格式定義SSE響應(yīng)內(nèi)容。
event: message\r\ndata: \xe4\xb8\x83\r\nretry: 15000\r\n\r\n
event: message\r\ndata: \xe5\xa4\x95\r\nretry: 15000\r\n\r\n
這些字段之間使用\r\n
分割,每個(gè)sse數(shù)據(jù)使用兩個(gè)\r\n
, 也就是數(shù)據(jù)結(jié)尾處是兩個(gè)\r\n
。
當(dāng)然這個(gè)不是必須的,只是一種標(biāo)準(zhǔn),尤其是前端調(diào)用的時(shí)候,會(huì)對(duì)event值有一些不同的處理邏輯。最好統(tǒng)一下標(biāo)準(zhǔn)。
在使用Python調(diào)用接口時(shí),使用最多的庫為 requests庫,異步庫使用aiohttp比較多,我分別使用這兩個(gè)庫進(jìn)行演示。
import requests
def test():
url = r"http://127.0.0.1:8000/"
headers = {'Content-Type': 'text/event-stream'}
response = requests.get(url, headers=headers, stream=True)
for chunk in response.iter_content(chunk_size=1024, decode_unicode=True):
print(chunk)
if __name__ == '__main__':
test()
這段代碼中使用了 response = requests.get(url, headers=headers, stream=True)
來獲取sse的內(nèi)容,這里有一個(gè)比較重要的參數(shù),stream=True
, 使用了這個(gè)參數(shù)以后才可以達(dá)到SSE輸出的效果。這里的header可以設(shè)置也可以不設(shè)置。
之后調(diào)用response.iter_content()
函數(shù)來打印數(shù)據(jù)。
chunk_size: 默認(rèn)為1,正常情況下我們要設(shè)置一個(gè)比較大的值,否則獲取到一個(gè)字節(jié)數(shù)據(jù)就會(huì)走到下面的處理邏輯。decode_unicode: iter_content() 函數(shù)遍歷的數(shù)據(jù)是bytes類型的,這個(gè)參數(shù)可以控制是否將bytes轉(zhuǎn)為str。
注意,這里的chunk即使被轉(zhuǎn)換為字符串,也不是json格式的,我們看到服務(wù)端返回的數(shù)據(jù)像是一個(gè)json格式的:
yield {
"event": "message",
"retry": 15000,
"data": i
}
但客戶端得到的中下面的這樣的格式,如果客戶端想要轉(zhuǎn)為json,需要再單獨(dú)處理一下。
event: message
data: 七
retry: 15000
event: message
data: 夕
retry: 15000
event: message
data: 情
retry: 15000
aiohttp 作為異步調(diào)用接口常用的庫,使用它調(diào)用SSE響應(yīng)也很方便的。
async def test():
headers = {'Content-Type': 'text/event-stream'}
sseresp = aiohttp.request("GET", r"http://127.0.0.1:8000/", headers=headers)
async with sseresp as r:
async for chunk in r.content.iter_any():
print(chunk.decode())
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(test())
先使用aiohttp.request("GET", r"http://127.0.0.1:8000/", headers=headers)
構(gòu)造一個(gè)請(qǐng)求對(duì)象,注意這里沒有requests庫中的stream=True
參數(shù),如果加了會(huì)報(bào)錯(cuò)!之后開始遍歷數(shù)據(jù),注意這里是用的async with
和 async for r.content.iter_any()
,這里一定要調(diào)用r.content.iter_any()
方法,否則達(dá)不到SSE的效果。
這里也沒有像requests庫中的decode_unicode=True
?參數(shù),所以需要客戶端自己來decode數(shù)據(jù)。
FastAPI 本身在處理SSE請(qǐng)求與響應(yīng)時(shí),GET和POST方法是都支持的。我們來看一下POST方法。
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse
import asyncio
from pydantic import BaseModel
app = FastAPI()
class Message(BaseModel):
message: str
@app.post("/sse")
async def indexpost(msg: Message, req: Request):
async def event_generator(request: Request):
res_str = msg.message
for i in res_str:
if await request.is_disconnected():
print("連接已中斷")
break
yield {
"event": "message",
"retry": 15000, # milisecond
"data": i
}
await asyncio.sleep(0.1)
return EventSourceResponse(event_generator(req))
代碼和上文的GET很像,只不過在GET方法中,是使用的默認(rèn)的一句話,“七夕情人節(jié)即將來臨,我們?yōu)槟鷾?zhǔn)備了精美的鮮花和美味的蛋糕”,而這里是由客戶端通過參數(shù)傳過來。
我們?cè)賮硎褂胊iohttp來使用POST方法調(diào)用一下接口。
import aiohttp
import asyncio
import json
async def test_post():
headers = {'Content-Type': 'application/json'}
data = {"message": "七夕情人節(jié)快樂!"}
sseresp = aiohttp.request("POST", r"http://127.0.0.1:8000/sse", headers=headers, data=json.dumps(data))
async with sseresp as r:
async for chunk in r.content.iter_any():
print(chunk.decode())
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(test_post())
注意這里的headers就一定要設(shè)置了。
一般的瀏覽器是支持SSE調(diào)用的
const eventSource = new EventSource('http_api_url', { withCredentials: true })
上面也有提到SSE響應(yīng)數(shù)據(jù)的標(biāo)準(zhǔn)格式也是為了兼容這里的前端瀏覽器調(diào)用,但是這里的前端調(diào)用有個(gè)比較麻煩的問題是,只能使用GET方法,有個(gè)開源項(xiàng)目,是Azure的,可以讓前端使用POST方式調(diào)用,有興趣的可以嘗試一下。
https://github.com/Azure/fetch-event-source
文章轉(zhuǎn)自微信公眾號(hào)@序語程言
2024年在線市場平臺(tái)的11大最佳支付解決方案
完整指南:如何在應(yīng)用程序中集成和使用ChatGPT API
選擇AI API的指南:ChatGPT、Gemini或Claude,哪一個(gè)最適合你?
用ASP.NET Core 2.1 建立規(guī)范的 REST API — 緩存和并發(fā)
企業(yè)工商數(shù)據(jù)API用哪種?
2024年創(chuàng)建社交媒體帖子的最佳圖像工具API
2024年小型企業(yè)的7個(gè)最佳短信應(yīng)用API
用gin寫簡單的crud后端API接口
最新LangChain+GLM4開發(fā)AI應(yīng)用程序系列(一):快速入門篇
對(duì)比大模型API的內(nèi)容創(chuàng)意新穎性、情感共鳴力、商業(yè)轉(zhuǎn)化潛力
一鍵對(duì)比試用API 限時(shí)免費(fèi)