
2024年在線市場平臺的11大最佳支付解決方案
pip install uvicorn
啟動fastapi測試接口
from fastapi import FastAPI, Request
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World"}
使用命令?uvicorn server:app
?來啟動服務,訪問?http://127.0.0.1:8000/
??會得到{"message": "Hello World"}
??的json 響應即可。
sse_starlette是一個擴展,可以很方便的生成SSE響應流, 使用pip install sse-starlette
來安裝這個包。
在sse_starlette.sse中有個EventSourceResponse類,這個類可以響應SSE。
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse
import asyncio
app = FastAPI()
@app.get("/")
async def root(request: Request):
async def event_generator(request: Request):
res_str = "七夕情人節即將來臨,我們為您準備了精美的鮮花和美味的蛋糕"
for i in res_str:
if await request.is_disconnected():
print("連接已中斷")
break
yield {
"event": "message",
"retry": 15000,
"data": i
}
await asyncio.sleep(0.1)
g = event_generator(request)
return EventSourceResponse(g)
EventSourceResponse類可以傳入異步生成器(generator),這里為什么要傳一個生成器呢?由于采用SSE響應的數據一般是一點一點的返回給客戶端,不是一次性的返回,像上面的代碼,EventSourceResponse對象每次從g這個生成器中獲取到數據,再將數據組裝成sse的標準格式。
理論上作為SSE響應,我們可以任意的定義數據字段和值,但是一般情況下,為了和前端數據兼容,我們會用以下格式定義SSE響應內容。
event: message\r\ndata: \xe4\xb8\x83\r\nretry: 15000\r\n\r\n
event: message\r\ndata: \xe5\xa4\x95\r\nretry: 15000\r\n\r\n
這些字段之間使用\r\n
分割,每個sse數據使用兩個\r\n
, 也就是數據結尾處是兩個\r\n
。
當然這個不是必須的,只是一種標準,尤其是前端調用的時候,會對event值有一些不同的處理邏輯。最好統一下標準。
在使用Python調用接口時,使用最多的庫為 requests庫,異步庫使用aiohttp比較多,我分別使用這兩個庫進行演示。
import requests
def test():
url = r"http://127.0.0.1:8000/"
headers = {'Content-Type': 'text/event-stream'}
response = requests.get(url, headers=headers, stream=True)
for chunk in response.iter_content(chunk_size=1024, decode_unicode=True):
print(chunk)
if __name__ == '__main__':
test()
這段代碼中使用了 response = requests.get(url, headers=headers, stream=True)
來獲取sse的內容,這里有一個比較重要的參數,stream=True
, 使用了這個參數以后才可以達到SSE輸出的效果。這里的header可以設置也可以不設置。
之后調用response.iter_content()
函數來打印數據。
chunk_size: 默認為1,正常情況下我們要設置一個比較大的值,否則獲取到一個字節數據就會走到下面的處理邏輯。decode_unicode: iter_content() 函數遍歷的數據是bytes類型的,這個參數可以控制是否將bytes轉為str。
注意,這里的chunk即使被轉換為字符串,也不是json格式的,我們看到服務端返回的數據像是一個json格式的:
yield {
"event": "message",
"retry": 15000,
"data": i
}
但客戶端得到的中下面的這樣的格式,如果客戶端想要轉為json,需要再單獨處理一下。
event: message
data: 七
retry: 15000
event: message
data: 夕
retry: 15000
event: message
data: 情
retry: 15000
aiohttp 作為異步調用接口常用的庫,使用它調用SSE響應也很方便的。
async def test():
headers = {'Content-Type': 'text/event-stream'}
sseresp = aiohttp.request("GET", r"http://127.0.0.1:8000/", headers=headers)
async with sseresp as r:
async for chunk in r.content.iter_any():
print(chunk.decode())
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(test())
先使用aiohttp.request("GET", r"http://127.0.0.1:8000/", headers=headers)
構造一個請求對象,注意這里沒有requests庫中的stream=True
參數,如果加了會報錯!之后開始遍歷數據,注意這里是用的async with
和 async for r.content.iter_any()
,這里一定要調用r.content.iter_any()
方法,否則達不到SSE的效果。
這里也沒有像requests庫中的decode_unicode=True
?參數,所以需要客戶端自己來decode數據。
FastAPI 本身在處理SSE請求與響應時,GET和POST方法是都支持的。我們來看一下POST方法。
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse
import asyncio
from pydantic import BaseModel
app = FastAPI()
class Message(BaseModel):
message: str
@app.post("/sse")
async def indexpost(msg: Message, req: Request):
async def event_generator(request: Request):
res_str = msg.message
for i in res_str:
if await request.is_disconnected():
print("連接已中斷")
break
yield {
"event": "message",
"retry": 15000, # milisecond
"data": i
}
await asyncio.sleep(0.1)
return EventSourceResponse(event_generator(req))
代碼和上文的GET很像,只不過在GET方法中,是使用的默認的一句話,“七夕情人節即將來臨,我們為您準備了精美的鮮花和美味的蛋糕”,而這里是由客戶端通過參數傳過來。
我們再來使用aiohttp來使用POST方法調用一下接口。
import aiohttp
import asyncio
import json
async def test_post():
headers = {'Content-Type': 'application/json'}
data = {"message": "七夕情人節快樂!"}
sseresp = aiohttp.request("POST", r"http://127.0.0.1:8000/sse", headers=headers, data=json.dumps(data))
async with sseresp as r:
async for chunk in r.content.iter_any():
print(chunk.decode())
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(test_post())
注意這里的headers就一定要設置了。
一般的瀏覽器是支持SSE調用的
const eventSource = new EventSource('http_api_url', { withCredentials: true })
上面也有提到SSE響應數據的標準格式也是為了兼容這里的前端瀏覽器調用,但是這里的前端調用有個比較麻煩的問題是,只能使用GET方法,有個開源項目,是Azure的,可以讓前端使用POST方式調用,有興趣的可以嘗試一下。
https://github.com/Azure/fetch-event-source
文章轉自微信公眾號@序語程言