touch .gitignore

在下一節中,我們將使用 FastAPI 和 Python 構建我們的聊天 Web 服務器。

如何使用 Python、FastAPI 和 WebSockets 構建聊天服務器

在本節中,我們將使用 FastAPI 構建聊天服務器來與用戶通信。我們將使用 WebSockets 來確保客戶端和服務器之間的雙向通信,以便我們可以實時向用戶發送響應。

如何設置 Python 環境

要啟動我們的服務器,我們需要設置我們的 Python 環境。在 VS Code 中打開項目文件夾,然后打開終端。

從項目根目錄進入服務器目錄,并運行以下命令:

python3.8 -m venv env

這將為我們的Python項目創建一個名為“env”的虛擬環境。要激活虛擬環境,請運行以下命令:

source env/bin/activate

接下來,在 Python 環境中安裝幾個庫。

pip install fastapi uuid uvicorn gunicorn WebSockets python-dotenv aioredis

接下來,在終端中運行“touch .env”命令來創建一個環境文件。我們將在.env文件中定義應用程序變量和秘密變量。

添加應用程序環境變量,并將其設置為“development”,如下所示:export APP_ENV=development。接下來,我們將使用FastAPI服務器設置一個開發服務器。

FastAPI 服務器設置

在服務器目錄的根目錄中,創建一個名為 然后粘貼以下開發服務器代碼的新文件:main.py

from fastapi import FastAPI, Requestimport uvicornimport os
from dotenv import load_dotenv
load_dotenv()
api = FastAPI()
@api.get("/test")
async def root():
return {"msg": "API is Online"}
if __name__ == "__main__":
if os.environ.get('APP_ENV') == "development":
uvicorn.run("main:api", host="0.0.0.0", port=3500,
workers=4, reload=True) else:
pass

首先,我們導入FastAPI并將其初始化為api。然后我們從python-dotenv庫中導入load_dotenv,并初始化它以加載.env文件中的變量,

然后,我們創建一個簡單的測試路由來測試 API。測試路由將返回一個簡單的 JSON 響應,告訴我們 API 已聯機。

最后,我們通過使用uvicorn.run并提供所需的參數來設置開發服務器。API將在端口3500上運行。

最后,在終端中使用python main.py運行服務器。一旦你在終端上看到應用程序啟動完成,在瀏覽器上導航到URL http://localhost:3500/test,你應該會得到一個這樣的網頁:

如何將路由添加到 API

在本節中,我們將為API添加路由。首先,創建一個名為“src”的新文件夾,這個文件夾將作為我們存放所有API代碼的主要目錄。

創建一個名為routes的子文件夾,cd到該文件夾中,創建一個名為chat.py的新文件,然后添加以下代碼:

import os
from fastapi import APIRouter, FastAPI, WebSocket, Request

chat = APIRouter()

# @route POST /token
# @desc Route to generate chat token
# @access Public

@chat.post("/token")
async def token_generator(request: Request):
return None

# @route POST /refresh_token
# @desc Route to refresh token
# @access Public

@chat.post("/refresh_token")
async def refresh_token(request: Request):
return None

# @route Websocket /chat
# @desc Socket for chatbot
# @access Public

@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket = WebSocket):
return None

我們創建了三個終端節點:

接下來,將聊天路由連接到我們的主API。首先,我們需要從src導入chat。在main.py文件中聊天。然后,我們將通過在初始化的FastAPI類上調用include_router方法并傳遞chat作為參數來包含路由器。

更新您的代碼,如下所示:

from fastapi import FastAPI, Request
import uvicorn
import os
from dotenv import load_dotenv
from routes.chat import chat

load_dotenv()

api = FastAPI()
api.include_router(chat)

@api.get("/test")
async def root():
return {"msg": "API is Online"}

if __name__ == "__main__":
if os.environ.get('APP_ENV') == "development":
uvicorn.run("main:api", host="0.0.0.0", port=3500,
workers=4, reload=True)
else:
pass

如何使用 UUID 生成聊天會話令牌

我們將使用uuid4生成用戶令牌,并利用這個令牌為聊天終端節點創建動態路由。鑒于該終端節點是公開可訪問的,因此關于JWT和身份驗證的詳細內容,我們在此就不做深入探討了。

如果最初uuid未安裝,請pip install uuid .接下來,在 chat.py 中,導入 UUID,并使用以下/token代碼更新路由:

from fastapi import APIRouter, FastAPI, WebSocket,  Request, BackgroundTasks, HTTPExceptionimport uuid

# @route POST /token
# @desc Route generating chat token
# @access Public

@chat.post("/token")
async def token_generator(name: str, request: Request):

if name == "":
raise HTTPException(status_code=400, detail={
"loc": "name", "msg": "Enter a valid name"})

token = str(uuid.uuid4())

data = {"name": name, "token": token}

return data

在上面的代碼中,客戶端提供了他們的名稱,這是必需的。我們快速檢查以確保 name 字段不為空,然后使用 uuid4 生成一個 token。

會話數據是 name 和 token 的簡單字典。最終,我們需要持久保存此會話數據并設置超時,但現在我們只需將其返回給客戶端即可。

如何使用 Postman 測試 API

因為我們將測試 WebSocket 端點,所以我們需要使用像 Postman 這樣的工具來允許這樣做(因為 FastAPI 上的默認 swagger 文檔不支持 WebSockets)。

在Postman中,為您的開發環境創建一個集合,并向localhost:3500/token發送一個POST請求,指定名稱作為查詢參數并傳遞一個值。您應該得到如下所示的響應:

令牌生成器郵差

Websockets 和連接管理器

在src根目錄下,創建一個名為socket的新文件夾,并添加一個名為connection.py的文件。在這個文件中,我們將定義控制到WebSockets的連接的類,以及所有連接和斷開連接的助手方法。

connection.py添加以下代碼:

from fastapi import WebSocket

class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []

async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)

def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)

async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)

ConnectionManager類是用active_connections屬性初始化的,該屬性是一個活動連接列表。

然后,異步連接方法將接受WebSocket并將其添加到活動連接列表中,而disconnect方法將從活動連接列表中刪除WebSocket

最后,send_personal_message方法將接收消息和我們想要發送消息的Websocket,并異步發送消息。

WebSockets是一個相當廣泛的主題,我們這里所討論的只是冰山一角。但即便如此,所學的內容應該已經足夠您創建多個連接,并能夠異步地處理發送到這些連接的消息了。

你可以閱讀更多關于 FastAPI Websockets 和 Sockets 編程 的內容。

要使用ConnectionManager,請在src.routes.chat.py中導入并初始化它,并用下面的代碼更新/chat WebSocket路由:

from ..socket.connection import ConnectionManager

manager = ConnectionManager()

@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
print(data)
await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)

except WebSocketDisconnect:
manager.disconnect(websocket)

websocket_endpoint函數(它接受一個WebSocket)中,我們將新的WebSocket添加到連接管理器并運行一個while True循環,以確保套接字保持打開狀態。除非套接字斷開連接。

當連接打開時,我們使用websocket.receive_test()接收客戶端發送的任何消息,并將它們打印到終端。

然后,我們現在將硬編碼的響應發送回客戶端。最終,從客戶端收到的消息將發送到 AI 模型,而返回給客戶端的響應則將是AI模型生成的回復。

在Postman中,我們可以通過創建一個新的WebSocket請求來測試這個端點,并連接到WebSocket端點localhost:3500/chat

當您單擊 connect 時,Messages 窗格將顯示 API 客戶端已連接到 URL,并且套接字已打開。

要對此進行測試,請向聊天服務器發送消息 “Hello Bot”,您應該會立即收到測試響應 “Response: Simulating response from the GPT service” ,如下所示:

FastAPI 中的依賴注入

為了能夠區分兩個不同的客戶端會話并限制聊天會話,我們將使用定時令牌,作為查詢參數傳遞給 WebSocket 連接。

在 socket 文件夾中,創建一個名為utils.py然后添加以下代碼的文件:

from fastapi import WebSocket, status, Query
from typing import Optional

async def get_token(
websocket: WebSocket,
token: Optional[str] = Query(None),
):
if token is None or token == "":
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)

return token

get_token 函數接收 WebSocket 和令牌,然后檢查令牌是 None 還是 null。

如果滿足這種情況的話,該函數會返回策略沖突的狀態,如果可用,該函數將僅返回令牌。我們稍后將通過額外的令牌驗證來擴展此功能。

為了使用這個函數,我們將它注入到/chat路由中。FastAPI提供了一個Depends類來輕松地注入依賴項,因此我們不必修改裝飾器。

/chat路由更新為以下內容:

from ..socket.utils import get_token

@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
print(data)
await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)

except WebSocketDisconnect:
manager.disconnect(websocket)

現在,當您嘗試在 Postman 中連接到/chat終端節點時,您將收到 403 錯誤。目前,提供令牌作為查詢參數,并為令牌提供任何值。然后您應該能夠像以前一樣進行連接,只是現在連接需要一個令牌。

郵差聊天測試與令牌

恭喜你走到了這一步!您的chat.py文件現在應如下所示:

import os
from fastapi import APIRouter, FastAPI, WebSocket, WebSocketDisconnect, Request, Depends, HTTPException
import uuid
from ..socket.connection import ConnectionManager
from ..socket.utils import get_token

chat = APIRouter()

manager = ConnectionManager()

# @route POST /token
# @desc Route to generate chat token
# @access Public

@chat.post("/token")
async def token_generator(name: str, request: Request):
token = str(uuid.uuid4())

if name == "":
raise HTTPException(status_code=400, detail={
"loc": "name", "msg": "Enter a valid name"})

data = {"name": name, "token": token}

return data

# @route POST /refresh_token
# @desc Route to refresh token
# @access Public

@chat.post("/refresh_token")
async def refresh_token(request: Request):
return None

# @route Websocket /chat
# @desc Socket for chatbot
# @access Public

@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
print(data)
await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)

except WebSocketDisconnect:
manager.disconnect(websocket)

在本教程的下一部分,我們將重點介紹如何處理應用程序的狀態以及在客戶端和服務器之間傳遞數據。

如何使用 Redis 構建實時系統

我們的應用程序目前不存儲任何狀態,并且無法識別用戶或存儲和檢索聊天數據。我們還會在聊天會話期間向客戶端返回硬編碼響應。

在本教程的這一部分中,我們將介紹以下內容:

Redis 和分布式消息收發隊列

Redis 是一種開源內存數據存儲,您可以將其用作數據庫、緩存、消息代理和流式處理引擎。它支持多種數據結構,是具有實時功能的分布式應用程序的完美解決方案。

Redis Enterprise Cloud 是 Redis 提供的一項完全托管的云服務,可幫助我們無限大規模部署 Redis 集群,而無需擔心基礎設施。

在本教程中,我們將使用免費的 Redis Enterprise Cloud 實例。您可以在此處免費開始使用 Redis Cloud,并按照本教程設置 Redis 數據庫和 Redis Insight,這是一個與 Redis 交互的 GUI。

一旦你的Redis數據庫配置完成,請在項目的根目錄下(注意,這個位置是在服務器文件夾的外部)新建一個名為“worker”的文件夾。

我們將我們的 worker 環境與 Web 服務器隔離開來,這樣當客戶端向我們的 WebSocket 發送消息時,Web 服務器就不必處理對第三方服務的請求。此外,還可以為其他用戶釋放資源。

這個工作程序服務通過Redis來處理與推理API的后端通信。

來自所有已連接客戶端的請求將附加到消息隊列(生產者),而 Worker 則使用消息,將請求發送到推理 API,并將響應附加到響應隊列。

API 收到響應后,會將其發送回客戶端。

在生產者和消費者之間的傳輸過程中,客戶端可以發送多條消息,這些消息將排隊并按順序響應。

理想情況下,我們可以讓這個 worker 在完全不同的服務器上運行,在它自己的環境中,但現在,我們將在本地機器上創建自己的 Python 環境。

您可能想知道 – 為什么我們需要worker?想象一下這樣一個場景:Web 服務器還創建對第三方服務的請求。這意味著,在套接字連接期間等待第三方服務的響應時,服務器將被阻止,資源被占用,直到從 API 獲得響應。

您可以在發送硬編碼響應和發送新消息之前創建一個隨機的sleep time.sleep(10)來進行試驗。然后,您可以嘗試在新的Postman會話中使用不同的令牌來建立連接。

您會注意到,在隨機睡眠超時之前,聊天會話不會連接。

雖然我們可以在更注重生產的服務器設置中使用異步技術和工作線程池,但隨著并發用戶數量的增長,這也不夠。

最終,我們希望通過使用 Redis 來代理我們的聊天 API 和第三方 API 之間的通信,從而避免占用 Web 服務器資源。

接下來打開一個新終端,cd 進入 worker 文件夾,并創建并激活一個新的 Python 虛擬環境,類似于我們在第 1 部分中所做的。

接下來,安裝以下依賴項:

pip install aiohttp aioredis python-dotenv

如何使用 Redis 客戶端在 Python 中連接到 Redis 集群

我們將使用 aioredis 客戶端與 Redis 數據庫連接。我們還將使用 requests 庫向 Huggingface 推理 API 發送請求。

創建兩個文件.envmain.py。然后創建一個名為src的文件夾。另外,創建一個名為redis的文件夾,并添加一個名為config.py的新文件。

.env文件中,添加以下代碼,并確保使用 Redis 集群中提供的憑證更新字段。

export REDIS_URL=<REDIS URL PROVIDED IN REDIS CLOUD>
export REDIS_USER=<REDIS USER IN REDIS CLOUD>
export REDIS_PASSWORD=<DATABASE PASSWORD IN REDIS CLOUD>
export REDIS_HOST=<REDIS HOST IN REDIS CLOUD>
export REDIS_PORT=<REDIS PORT IN REDIS CLOUD>

在 config.py 中添加下面的 Redis 類:

import os
from dotenv import load_dotenv
import aioredis

load_dotenv()

class Redis():
def __init__(self):
"""initialize connection """
self.REDIS_URL = os.environ['REDIS_URL']
self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
self.REDIS_USER = os.environ['REDIS_USER']
self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"

async def create_connection(self):
self.connection = aioredis.from_url(
self.connection_url, db=0)

return self.connection

我們創建一個Redis對象,并從環境變量初始化所需的參數。然后我們創建一個異步方法create_connection來創建一個Redis連接,并返回從aioredis方法from_url獲得的連接池。

接下來,我們通過運行以下代碼在 main.py 中測試 Redis 連接。這將創建一個新的 Redis 連接池,設置一個簡單的鍵 “key”,并為其分配一個字符串 “value”。

from src.redis.config import Redis
import asyncio

async def main():
redis = Redis()
redis = await redis.create_connection()
print(redis)
await redis.set("key", "value")

if __name__ == "__main__":
asyncio.run(main())

現在打開 Redis Insight(如果您按照教程下載并安裝它)您應該會看到如下內容:

圖像Redis Insight 測試

如何使用 Redis Streams

現在我們已經設置了 worker 環境,我們可以在 Web 服務器上創建一個 producer,并在 worker 上創建一個 consumer。

首先,讓我們在服務器上再次創建Redis類。在服務器上。SRC創建一個名為redis的文件夾,并添加config.pyproducer.py兩個文件。

config.py中,添加以下代碼,就像我們對 worker 環境所做的那樣:

import os
from dotenv import load_dotenv
import aioredis

load_dotenv()

class Redis():
def __init__(self):
"""initialize connection """
self.REDIS_URL = os.environ['REDIS_URL']
self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
self.REDIS_USER = os.environ['REDIS_USER']
self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"

async def create_connection(self):
self.connection = aioredis.from_url(
self.connection_url, db=0)

return self.connection

在 .env 文件中,還要添加 Redis 憑證:

export REDIS_URL=<REDIS URL PROVIDED IN REDIS CLOUD>
export REDIS_USER=<REDIS USER IN REDIS CLOUD>
export REDIS_PASSWORD=<DATABASE PASSWORD IN REDIS CLOUD>
export REDIS_HOST=<REDIS HOST IN REDIS CLOUD>
export REDIS_PORT=<REDIS PORT IN REDIS CLOUD>

最后,在 server.src.redis.producer.py添加 the following code:

from .config import Redis

class Producer:
def __init__(self, redis_client):
self.redis_client = redis_client

async def add_to_stream(self, data: dict, stream_channel):
try:
msg_id = await self.redis_client.xadd(name=stream_channel, id="*", fields=data)
print(f"Message id {msg_id} added to {stream_channel} stream")
return msg_id

except Exception as e:
print(f"Error sending msg to stream => {e}")

我們創建了一個使用 Redis 客戶端初始化的 Producer 類。我們使用此客戶端通過add_to_stream該方法將數據添加到流中,該方法采用數據和 Redis 通道名稱。

用于向流通道添加數據的 Redis 命令是xadd,它在 aioredis 中同時具有高級和低級函數。

接下來,要運行我們新創建的 Producer,請更新 chat.py和WebSocket /chat終端節點,如下所示。請注意更新后的通道名稱 :message_channel

from ..redis.producer import Producer
from ..redis.config import Redis

chat = APIRouter()
manager = ConnectionManager()
redis = Redis()

@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
redis_client = await redis.create_connection()
producer = Producer(redis_client)

try:
while True:
data = await websocket.receive_text()
print(data)
stream_data = {}
stream_data[token] = data
await producer.add_to_stream(stream_data, "message_channel")
await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)

except WebSocketDisconnect:
manager.disconnect(websocket)

接下來,在 Postman 中,創建一個連接并發送任意數量的消息,這些消息顯示 Hello.您應該將流消息打印到終端,如下所示:

終端信道消息測試

在 Redis Insight 中,您將看到一個新創建的隊列mesage_channel和一個帶時間戳的隊列,其中填充了從客戶端發送的消息。此帶時間戳的隊列對于保持消息的順序非常重要。

Redis洞察頻道

如何對聊天數據進行建模

接下來,我們將為聊天消息創建一個模型。回想一下,我們通過 WebSockets 發送文本數據,但我們的聊天數據需要包含更多的信息,而不僅僅是文本。我們需要在發送聊天消息時為其加上時間戳,并為每條消息分配一個唯一的ID,同時收集關于聊天會話的相關數據,最后以JSON格式存儲這些數據。

我們可以將這些 JSON 數據存儲在 Redis 中,這樣就不會在連接丟失后丟失聊天記錄,因為我們的 WebSocket 不存儲狀態。

server.src中創建名為schema的新文件夾。然后創建一個名為chat.py in server.src.schema中添加 the following code 的文件:

from datetime import datetime
from pydantic import BaseModel
from typing import List, Optional
import uuid

class Message(BaseModel):
id = uuid.uuid4()
msg: str
timestamp = str(datetime.now())

class Chat(BaseModel):
token: str
messages: List[Message]
name: str
session_start = str(datetime.now())

我們使用Pydantic的BaseModel類對聊天數據建模。Chat類將保存單個Chat會話的數據。它將存儲令牌、用戶名以及使用datetime.now()為聊天會話開始時間自動生成的時間戳。

在這個聊天會話中發送和接收的消息存儲在一個Message類中,該類使用uid4動態地創建一個聊天id。初始化Message類時需要提供的唯一數據是消息文本。

如何使用 Redis JSON

為了使用 Redis JSON 的能力來存儲我們的聊天記錄,我們需要安裝 Redis labs 提供的 rejson。

在終端,cd到服務器并使用pip install rejson安裝rejson。然后更新server.src.redis.config.py中的Redis類,使其包含create_rejson_connection方法:

import os
from dotenv import load_dotenv
import aioredis
from rejson import Client

load_dotenv()

class Redis():
def __init__(self):
"""initialize connection """
self.REDIS_URL = os.environ['REDIS_URL']
self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
self.REDIS_USER = os.environ['REDIS_USER']
self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
self.REDIS_HOST = os.environ['REDIS_HOST']
self.REDIS_PORT = os.environ['REDIS_PORT']

async def create_connection(self):
self.connection = aioredis.from_url(
self.connection_url, db=0)

return self.connection

def create_rejson_connection(self):
self.redisJson = Client(host=self.REDIS_HOST,
port=self.REDIS_PORT, decode_responses=True, username=self.REDIS_USER, password=self.REDIS_PASSWORD)

return self.redisJson

我們正在添加create_rejson_connection方法,通過rejson Client連接到Redis。這為我們提供了在Redis中創建和操作JSON數據的方法,這在aioredis中是不可用的。

接下來,在server.src.routes.chat.py中,我們可以更新/token端點來創建一個新的Chat實例,并將會話數據存儲在Redis JSON中,如下所示:

@chat.post("/token")
async def token_generator(name: str, request: Request):
token = str(uuid.uuid4())

if name == "":
raise HTTPException(status_code=400, detail={
"loc": "name", "msg": "Enter a valid name"})

# Create new chat session
json_client = redis.create_rejson_connection()

chat_session = Chat(
token=token,
messages=[],
name=name
)

# Store chat session in redis JSON with the token as key
json_client.jsonset(str(token), Path.rootPath(), chat_session.dict())

# Set a timeout for redis data
redis_client = await redis.create_connection()
await redis_client.expire(str(token), 3600)

return chat_session.dict()

注意:因為這是一個演示應用程序,所以我不想在 Redis 中存儲聊天數據太久。因此,我使用 aioredis 客戶端在令牌上添加了 60 分鐘的超時(rejson 不實施超時)。這意味著 60 分鐘后,聊天會話數據將丟失。

這一步是必要的,因為我們沒有對用戶進行身份驗證,并且我們希望在特定的時間段后能夠清除聊天數據。不過,這個步驟是可選的,您可以根據自己的需求選擇是否包含它。

接下來,在 Postman 中,當您發送 POST 請求以創建新令牌時,您將獲得如下所示的結構化響應。您還可以檢查 Redis Insight,查看與令牌一起存儲為 JSON 鍵的聊天數據,以及以值形式存儲的數據。

令牌生成器更新

如何更新 Token 依賴項

現在我們已經生成并存儲了一個令牌,現在是更新/chat WebSocket中的get_token依賴項的好時機了。我們這樣做是為了在開始聊天會話之前檢查有效的令牌。

server.src.socket.utils.py中更新get_token函數來檢查是否在Redis實例中存在token。如果是,則返回令牌,這意味著套接字連接是有效的。如果它不存在,我們關閉連接。

/token創建的令牌將在60分鐘后停止存在。因此,如果在嘗試開始聊天時生成錯誤響應,我們可以在前端使用一些簡單的邏輯來重定向用戶以生成新的令牌。

from ..redis.config import Redis

async def get_token(
websocket: WebSocket,
token: Optional[str] = Query(None),
):

if token is None or token == "":
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)

redis_client = await redis.create_connection()
isexists = await redis_client.exists(token)

if isexists == 1:
return token
else:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Session not authenticated or expired token")

要測試依賴關系,請使用我們一直在使用的隨機令牌連接到聊天會話,您應該會收到 403 錯誤。(請注意,您必須在 Redis Insight 中手動刪除令牌。)

現在復制發送post請求到/token端點時生成的令牌(或創建一個新請求),并將其作為值粘貼到/chat WebSocket所需的令牌查詢參數中。然后連接。你應該能成功連接。

帶令牌的聊天會話

綜上所述,您的 chat.py 應如下所示。

import os
from fastapi import APIRouter, FastAPI, WebSocket, WebSocketDisconnect, Request, Depends
import uuid
from ..socket.connection import ConnectionManager
from ..socket.utils import get_token
import time
from ..redis.producer import Producer
from ..redis.config import Redis
from ..schema.chat import Chat
from rejson import Path

chat = APIRouter()
manager = ConnectionManager()
redis = Redis()

# @route POST /token
# @desc Route to generate chat token
# @access Public

@chat.post("/token")
async def token_generator(name: str, request: Request):
token = str(uuid.uuid4())

if name == "":
raise HTTPException(status_code=400, detail={
"loc": "name", "msg": "Enter a valid name"})

# Create nee chat session
json_client = redis.create_rejson_connection()
chat_session = Chat(
token=token,
messages=[],
name=name
)

print(chat_session.dict())

# Store chat session in redis JSON with the token as key
json_client.jsonset(str(token), Path.rootPath(), chat_session.dict())

# Set a timeout for redis data
redis_client = await redis.create_connection()
await redis_client.expire(str(token), 3600)

return chat_session.dict()

# @route POST /refresh_token
# @desc Route to refresh token
# @access Public

@chat.post("/refresh_token")
async def refresh_token(request: Request):
return None

# @route Websocket /chat
# @desc Socket for chat bot
# @access Public

@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
redis_client = await redis.create_connection()
producer = Producer(redis_client)
json_client = redis.create_rejson_connection()

try:
while True:
data = await websocket.receive_text()
stream_data = {}
stream_data[token] = data
await producer.add_to_stream(stream_data, "message_channel")
await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)

except WebSocketDisconnect:
manager.disconnect(websocket)

干得好,走到這一步!在下一節中,我們將重點介紹與 AI 模型通信以及處理客戶端、服務器、worker 和外部 API 之間的數據傳輸。

如何使用 AI 模型為聊天機器人添加智能

在本節中,我們將重點介紹如何構建一個包裝器來與 transformer 模型通信,以對話格式將提示從用戶發送到 API,以及接收和轉換聊天應用程序的響應。

如何開始使用 Huggingface

我們不會在 Hugginface 上構建或部署任何語言模型。相反,我們的重點將放在利用Huggingface的加速推理API來連接預先訓練好的模型上。

我們將使用的模型是 EleutherAI 提供的 GPT-J-6B 模型。這是一個生成語言模型,使用 60 億個參數進行了訓練。

Huggingface 為我們提供了一個按需受限的 API 來連接這個模型,它幾乎是免費的。

要開始使用 Huggingface,請創建一個免費帳戶。在您的設置中,生成新的訪問令牌。對于最多 30k 個令牌,Huggingface 免費提供對推理 API 的訪問。

您可以在這里監控API的使用情況。請務必確保這個令牌的安全,切勿將其公開。

注意:我們將使用 HTTP 連接與 API 通信,因為我們使用的是免費帳戶。但是 PRO Huggingface 帳戶支持使用 WebSockets 進行流式處理,請參閱并行性和批處理作業。

這有助于顯著縮短模型和聊天應用程序之間的響應時間,我希望在后續文章中介紹這種方法。

如何與語言模型交互

首先,我們將 Huggingface 連接憑證添加到工作程序目錄中的 .env 文件中。

export HUGGINFACE_INFERENCE_TOKEN=<HUGGINGFACE ACCESS TOKEN>
export MODEL_URL=https://api-inference.huggingface.co/models/EleutherAI/gpt-j-6B

接下來,在worker.src 創建一個名為model的文件夾,然后添加一個文件gptj.py。然后添加下面的GPT類:

import os
from dotenv import load_dotenv
import requests
import json

load_dotenv()

class GPT:
def __init__(self):
self.url = os.environ.get('MODEL_URL')
self.headers = {
"Authorization": f"Bearer {os.environ.get('HUGGINFACE_INFERENCE_TOKEN')}"}
self.payload = {
"inputs": "",
"parameters": {
"return_full_text": False,
"use_cache": True,
"max_new_tokens": 25
}

}

def query(self, input: str) -> list:
self.payload["inputs"] = input
data = json.dumps(self.payload)
response = requests.request(
"POST", self.url, headers=self.headers, data=data)
print(json.loads(response.content.decode("utf-8")))
return json.loads(response.content.decode("utf-8"))

if __name__ == "__main__":
GPT().query("Will artificial intelligence help humanity conquer the universe?")

GPT類是用Huggingface模型url、身份驗證header和預定義的payload初始化的。但是有效負載輸入是一個動態字段,由查詢方法提供,并在我們向Huggingface端點發送請求之前更新

最后,我們通過直接在GPT類的實例上運行查詢方法來測試這一點。在終端中,運行python src/model/gptj.py,你應該得到這樣的響應(只是要記住,你的響應肯定與此不同):

[{'generated_text': ' (AI) could solve all the problems on this planet? I am of the opinion that in the short term artificial intelligence is much better than human beings, but in the long and distant future human beings will surpass artificial intelligence.\n\nIn the distant'}]

接下來,我們向 input 添加一些調整,通過更改 input 的格式,使與模型的交互更具對話性。

GPT像這樣更新類:

class GPT:
def __init__(self):
self.url = os.environ.get('MODEL_URL')
self.headers = {
"Authorization": f"Bearer {os.environ.get('HUGGINFACE_INFERENCE_TOKEN')}"}
self.payload = {
"inputs": "",
"parameters": {
"return_full_text": False,
"use_cache": False,
"max_new_tokens": 25
}

}

def query(self, input: str) -> list:
self.payload["inputs"] = f"Human: {input} Bot:"
data = json.dumps(self.payload)
response = requests.request(
"POST", self.url, headers=self.headers, data=data)
data = json.loads(response.content.decode("utf-8"))
text = data[0]['generated_text']
res = str(text.split("Human:")[0]).strip("\n").strip()
return res

if __name__ == "__main__":
GPT().query("Will artificial intelligence help humanity conquer the universe?")

我們使用字符串文本f"Human: {input} Bot:"更新了輸入。人工輸入被放置在字符串中,機器人提供響應。這種輸入格式將 GPT-J6B 變成了對話模型。您可能會注意到的其他變化包括

如何模擬 AI 模型的短期記憶

對于我們發送到模型的每個新輸入,模型無法記住對話歷史記錄。如果我們想在對話中保留上下文,這一點很重要。

但請記住,隨著我們發送到模型的 Token 數量增加,處理成本會變得更高,響應時間也會更長。

因此,我們需要探索一種有效方法來檢索短期歷史記錄,并將其發送給模型進行處理我們還需要弄清楚一個最佳點 – 我們想要檢索多少歷史數據并將其發送到模型?

要處理聊天記錄,我們需要回退到我們的 JSON 數據庫。我們將使用token來獲取上次聊天數據,然后在收到響應時,將響應附加到 JSON 數據庫。

更新worker.src.redis.config.py以包含create_rejson_connection該方法。此外,使用身份驗證數據更新 .env 文件,并確保已安裝 rejson。

您的worker.src.redis.config.py應如下所示:

import os
from dotenv import load_dotenv
import aioredis
from rejson import Client

load_dotenv()

class Redis():
def __init__(self):
"""initialize connection """
self.REDIS_URL = os.environ['REDIS_URL']
self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
self.REDIS_USER = os.environ['REDIS_USER']
self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
self.REDIS_HOST = os.environ['REDIS_HOST']
self.REDIS_PORT = os.environ['REDIS_PORT']

async def create_connection(self):
self.connection = aioredis.from_url(
self.connection_url, db=0)

return self.connection

def create_rejson_connection(self):
self.redisJson = Client(host=self.REDIS_HOST,
port=self.REDIS_PORT, decode_responses=True, username=self.REDIS_USER, password=self.REDIS_PASSWORD)

return self.redisJson

雖然您的 .env 文件應如下所示:

export REDIS_URL=<REDIS URL PROVIDED IN REDIS CLOUD>
export REDIS_USER=<REDIS USER IN REDIS CLOUD>
export REDIS_PASSWORD=<DATABASE PASSWORD IN REDIS CLOUD>
export REDIS_HOST=<REDIS HOST IN REDIS CLOUD>
export REDIS_PORT=<REDIS PORT IN REDIS CLOUD>
export HUGGINFACE_INFERENCE_TOKEN=<HUGGINGFACE ACCESS TOKEN>
export MODEL_URL=https://api-inference.huggingface.co/models/EleutherAI/gpt-j-6B

接下來,在worker.src.redis 創建一個新的文件命名為cache.py并添加以下代碼:

from .config import Redis
from rejson import Path

class Cache:
def __init__(self, json_client):
self.json_client = json_client

async def get_chat_history(self, token: str):
data = self.json_client.jsonget(
str(token), Path.rootPath())

return data

緩存是用一個json客戶端初始化的,get_chat_history方法接受一個令牌,從Redis獲取該令牌的聊天歷史。確保從json中導入了Path對象。

接下來,使用以下worker.main.py更新 :

from src.redis.config import Redis
import asyncio
from src.model.gptj import GPT
from src.redis.cache import Cache

redis = Redis()

async def main():
json_client = redis.create_rejson_connection()
data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
print(data)

if __name__ == "__main__":
asyncio.run(main())

我在Postman中硬編碼了一個從以前的測試中創建的樣例令牌。如果沒有創建令牌,只需向/token發送一個新請求并復制令牌,然后在終端中運行python main.py。您應該看到終端中的數據如下所示:

{'token': '18196e23-763b-4808-ae84-064348a0daff', 'messages': [], 'name': 'Stephen', 'session_start': '2022-07-16 13:20:01.092109'}

接下來,我們需要在我們的緩存類中添加一個add_message_to_cache方法,該方法將特定令牌的消息添加到Redis。

 async def add_message_to_cache(self, token: str, message_data: dict):
self.json_client.jsonarrappend(
str(token), Path('.messages'), message_data)

rejson 提供的jsonarrappend方法將新消息附加到 message 數組中。

請注意,要訪問 message 數組,我們需要提供 .messagesPath 的參數。如果您的消息數據具有不同/嵌套結構,只需提供要將新數據追加到的數組的路徑。

要測試此方法,請使用以下代碼更新 main.py 文件中的 main 函數:

async def main():
json_client = redis.create_rejson_connection()

await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", message_data={
"id": "1",
"msg": "Hello",
"timestamp": "2022-07-16 13:20:01.092109"
})

data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
print(data)

我們正在向緩存發送硬編碼消息,并從緩存中獲取聊天記錄。當您在終端的 worker 目錄中運行python main.py時,您應該在終端中打印出類似這樣的東西,并將消息添加到 message 數組中。

{'token': '18196e23-763b-4808-ae84-064348a0daff', 'messages': [{'id': '1', 'msg': 'Hello', 'timestamp': '2022-07-16 13:20:01.092109'}], 'name': 'Stephen', 'session_start': '2022-07-16 13:20:01.092109'}

最后,我們需要更新 main 函數以將消息數據發送到 GPT 模型,并使用客戶端和模型之間發送的最后 4 條消息更新輸入。

首先,讓我們使用add_message_to_cache新參數 “source” 更新我們的函數,該參數將告訴我們消息是人類還是機器人。然后,我們可以使用此 arg 將 “Human:” 或 “Bot:” 標簽添加到數據中,然后再將其存儲到緩存中。

更新 add_message_to_cacheCache 類中的方法,如下所示:

async def add_message_to_cache(self, token: str, source: str, message_data: dict):
if source == "human":
message_data['msg'] = "Human: " + (message_data['msg'])
elif source == "bot":
message_data['msg'] = "Bot: " + (message_data['msg'])

self.json_client.jsonarrappend(
str(token), Path('.messages'), message_data)

然后更新 worker 目錄中 main.py 中的 main 函數,并運行python main.py以查看 Redis 數據庫中的新結果。

async def main():
json_client = redis.create_rejson_connection()

await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", source="human", message_data={
"id": "1",
"msg": "Hello",
"timestamp": "2022-07-16 13:20:01.092109"
})

data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
print(data)

接下來,我們需要更新 main 函數以將新消息添加到緩存中,從緩存中讀取前 4 條消息,然后使用 query 方法對模型進行 API 調用。它將具有一個有效負載,該負載由最后 4 條消息的復合字符串組成。

您可以隨時調整要提取的歷史記錄中消息的數量,但在我看來,為了演示目的,選擇4條消息是一個相當合適的數字。

worker.src中,創建一個新的文件夾架構。然后創建一個名為chat.py并將我們的消息架構粘貼到 chat.py 中的新文件,如下所示:

from datetime import datetime
from pydantic import BaseModel
from typing import List, Optional
import uuid

class Message(BaseModel):
id = str(uuid.uuid4())
msg: str
timestamp = str(datetime.now())

接下來,更新 main.py 文件,如下所示:

async def main():

json_client = redis.create_rejson_connection()

await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", source="human", message_data={
"id": "3",
"msg": "I would like to go to the moon to, would you take me?",
"timestamp": "2022-07-16 13:20:01.092109"
})

data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")

print(data)

message_data = data['messages'][-4:]

input = ["" + i['msg'] for i in message_data]
input = " ".join(input)

res = GPT().query(input=input)

msg = Message(
msg=res
)

print(msg)
await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", source="bot", message_data=msg.dict())

在上面的代碼中,我們將新的消息數據添加到緩存中。此消息最終將來自消息隊列。接下來,我們從緩存中獲取聊天記錄,其中現在將包含我們添加的最新數據。

請注意,我們使用相同的硬編碼令牌添加到緩存并從緩存中獲取,暫時只是為了測試這一點。

接下來,我們修剪緩存數據并僅提取最后 4 項。然后我們通過提取列表中的 msg 并將其連接到空字符串來合并輸入數據。

最后,我們為機器人響應創建一個新的 Message 實例,并將響應添加到緩存中,將源指定為 “bot”

接下來,運行幾次,每次運行python main.py時根據需要更改人工消息和 id。您應該與模型進行完整的對話輸入和輸出。

打開 Redis Insight,您應該會看到類似于以下內容的內容:

從 Message Queue 流式傳輸使用者和實時數據拉取

接下來,我們要創建一個使用者并更新我們的worker.main.py 去連接消息隊列。我們希望它能夠實時提取令牌數據,因為我們目前正在對令牌和消息輸入進行硬編碼。

worker.src.redis中創建名為stream.py的新文件。使用以下代碼添加一個StreamConsumer類:

class StreamConsumer:
def __init__(self, redis_client):
self.redis_client = redis_client

async def consume_stream(self, count: int, block: int, stream_channel):

response = await self.redis_client.xread(
streams={stream_channel: '0-0'}, count=count, block=block)

return response

async def delete_message(self, stream_channel, message_id):
await self.redis_client.xdel(stream_channel, message_id)

StreamConsumer類是用Redis客戶端初始化的。consume_stream方法使用aioredis提供的xread方法從消息通道的隊列中提取一條新消息。

接下來,用while循環更新worker.main.py文件,以保持與消息通道的連接處于活動狀態,如下所示:

from src.redis.config import Redis
import asyncio
from src.model.gptj import GPT
from src.redis.cache import Cache
from src.redis.config import Redis
from src.redis.stream import StreamConsumer
import os
from src.schema.chat import Message

redis = Redis()

async def main():
json_client = redis.create_rejson_connection()
redis_client = await redis.create_connection()
consumer = StreamConsumer(redis_client)
cache = Cache(json_client)

print("Stream consumer started")
print("Stream waiting for new messages")

while True:
response = await consumer.consume_stream(stream_channel="message_channel", count=1, block=0)

if response:
for stream, messages in response:
# Get message from stream, and extract token, message data and message id
for message in messages:
message_id = message[0]
token = [k.decode('utf-8')
for k, v in message[1].items()][0]
message = [v.decode('utf-8')
for k, v in message[1].items()][0]
print(token)

# Create a new message instance and add to cache, specifying the source as human
msg = Message(msg=message)

await cache.add_message_to_cache(token=token, source="human", message_data=msg.dict())

# Get chat history from cache
data = await cache.get_chat_history(token=token)

# Clean message input and send to query
message_data = data['messages'][-4:]

input = ["" + i['msg'] for i in message_data]
input = " ".join(input)

res = GPT().query(input=input)

msg = Message(
msg=res
)

print(msg)

await cache.add_message_to_cache(token=token, source="bot", message_data=msg.dict())

# Delete messaage from queue after it has been processed

await consumer.delete_message(stream_channel="message_channel", message_id=message_id)

if __name__ == "__main__":
asyncio.run(main())

這是一個相當大的更新,所以讓我們一步一步來:

我們使用while True循環,以便 worker 可以在線偵聽隊列中的消息。

接下來,我們通過調用consume_stream方法等待來自message_channel的新消息。如果隊列中有消息,則提取message_id、令牌和消息。然后我們創建Message類的新實例,將消息添加到緩存中,然后獲取最后4條消息。我們將其設置為GPT模型查詢方法的輸入。

獲得響應后,使用add_message_to_cache方法將響應添加到緩存中,然后從隊列中刪除消息。

如何使用 AI 響應更新 Chat 客戶端

到目前為止,我們正在從客戶端向 message_channel 發送聊天消息(由查詢 AI 模型的工作程序接收)以獲取響應。

接下來,我們需要將此響應發送給客戶端。只要套接字連接仍處于打開狀態,客戶端就應該能夠接收響應。

如果連接已關閉,則客戶端始終可以使用refresh_token終端節點從聊天歷史記錄中獲取響應。

worker.src.redis創建一個名為producer.py的新文件Producer中,并添加一個類似于我們在聊天 Web 服務器上的類:

class Producer:
def __init__(self, redis_client):
self.redis_client = redis_client

async def add_to_stream(self, data: dict, stream_channel) -> bool:
msg_id = await self.redis_client.xadd(name=stream_channel, id="*", fields=data)
print(f"Message id {msg_id} added to {stream_channel} stream")
return msg_id

接下來,在main.py文件中,更新 main 函數以初始化創建者,創建流數據,并使用add_to_stream方法將響應發送到 response_channel

from src.redis.config import Redis
import asyncio
from src.model.gptj import GPT
from src.redis.cache import Cache
from src.redis.config import Redis
from src.redis.stream import StreamConsumer
import os
from src.schema.chat import Message
from src.redis.producer import Producer

redis = Redis()

async def main():
json_client = redis.create_rejson_connection()
redis_client = await redis.create_connection()
consumer = StreamConsumer(redis_client)
cache = Cache(json_client)
producer = Producer(redis_client)

print("Stream consumer started")
print("Stream waiting for new messages")

while True:
response = await consumer.consume_stream(stream_channel="message_channel", count=1, block=0)

if response:
for stream, messages in response:
# Get message from stream, and extract token, message data and message id
for message in messages:
message_id = message[0]
token = [k.decode('utf-8')
for k, v in message[1].items()][0]
message = [v.decode('utf-8')
for k, v in message[1].items()][0]

# Create a new message instance and add to cache, specifying the source as human
msg = Message(msg=message)

await cache.add_message_to_cache(token=token, source="human", message_data=msg.dict())

# Get chat history from cache
data = await cache.get_chat_history(token=token)

# Clean message input and send to query
message_data = data['messages'][-4:]

input = ["" + i['msg'] for i in message_data]
input = " ".join(input)

res = GPT().query(input=input)

msg = Message(
msg=res
)

stream_data = {}
stream_data[str(token)] = str(msg.dict())

await producer.add_to_stream(stream_data, "response_channel")

await cache.add_message_to_cache(token=token, source="bot", message_data=msg.dict())

# Delete messaage from queue after it has been processed
await consumer.delete_message(stream_channel="message_channel", message_id=message_id)

if __name__ == "__main__":
asyncio.run(main())

接下來,我們需要讓客戶端知道我們何時收到來自 socket 終端節點/chat中 worker 的響應。我們通過監聽響應流來實現這一點。在這里,我們無需包含while循環,因為只要連接保持開啟狀態,套接字就會持續進行監聽。

請注意,我們還需要通過添加邏輯來檢查連接的令牌是否等于響應中的令牌,從而檢查響應是針對哪個客戶端的。然后,我們在讀取響應隊列中的消息后將其刪除。

server.src.redis中創建一個名為 stream.py 的新文件并添加我們的StreamConsumer類,如下所示:

from .config import Redis

class StreamConsumer:
def __init__(self, redis_client):
self.redis_client = redis_client

async def consume_stream(self, count: int, block: int, stream_channel):
response = await self.redis_client.xread(
streams={stream_channel: '0-0'}, count=count, block=block)

return response

async def delete_message(self, stream_channel, message_id):
await self.redis_client.xdel(stream_channel, message_id)

接下來,更新/chat套接字端點,如下所示:

from ..redis.stream import StreamConsumer

@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
redis_client = await redis.create_connection()
producer = Producer(redis_client)
json_client = redis.create_rejson_connection()
consumer = StreamConsumer(redis_client)

try:
while True:
data = await websocket.receive_text()
stream_data = {}
stream_data[str(token)] = str(data)
await producer.add_to_stream(stream_data, "message_channel")
response = await consumer.consume_stream(stream_channel="response_channel", block=0)

print(response)
for stream, messages in response:
for message in messages:
response_token = [k.decode('utf-8')
for k, v in message[1].items()][0]

if token == response_token:
response_message = [v.decode('utf-8')
for k, v in message[1].items()][0]

print(message[0].decode('utf-8'))
print(token)
print(response_token)

await manager.send_personal_message(response_message, websocket)

await consumer.delete_message(stream_channel="response_channel", message_id=message[0].decode('utf-8'))

except WebSocketDisconnect:
manager.disconnect(websocket)

刷新令牌

最后,我們需要更新終端節點/refresh_token,以使用我們的Cache類從 Redis 數據庫獲取聊天記錄。

server.src.redis中,添加一個cache.py文件并添加以下代碼:

from rejson import Path

class Cache:
def __init__(self, json_client):
self.json_client = json_client

async def get_chat_history(self, token: str):
data = self.json_client.jsonget(
str(token), Path.rootPath())

return data

接下來,在server.src.routes.chat.py中導入緩存類并更新/token端點如下:

from ..redis.cache import Cache

@chat.get("/refresh_token")
async def refresh_token(request: Request, token: str):
json_client = redis.create_rejson_connection()
cache = Cache(json_client)
data = await cache.get_chat_history(token)

if data == None:
raise HTTPException(
status_code=400, detail="Session expired or does not exist")
else:
return data

現在,當我們使用任何令牌向/refresh_token終端節點發送 GET 請求時,終端節點將從 Redis 數據庫獲取數據。

如果 Token 未超時,則數據將發送給用戶。或者,如果未找到令牌,它將發送 400 響應。

如何在 Postman 中測試與多個客戶端的聊天

最后,我們將通過在 Postman 中創建多個聊天會話、在 Postman 中連接多個客戶端以及在客戶端上與機器人聊天來測試聊天系統。

最后,我們將嘗試獲取客戶的聊天記錄,并希望得到適當的回應。

回顧

讓我們快速回顧一下我們使用聊天系統取得的成就。聊天客戶端為與客戶端的每個聊天會話創建一個令牌。此令牌用于標識每個客戶端,連接到或 Web 服務器的客戶端發送的每條消息都在 Redis 通道 (message_chanel) 中排隊,由令牌標識。

我們的 worker environment 從這個通道讀取數據。它不知道客戶端是誰(除了它是一個唯一的令牌),并使用隊列中的消息向 Huggingface 推理 API 發送請求。

當它收到響應時,響應將添加到響應渠道中,并更新聊天歷史記錄。偵聽 response_channel 的客戶端在收到帶有其令牌的響應后立即將響應發送到客戶端。

如果套接字仍處于打開狀態,則發送此響應。如果套接字已關閉,我們確定響應被保留,因為響應已添加到聊天歷史記錄中。即使發生了頁面刷新或連接中斷,客戶端仍然能夠獲取到歷史記錄。

恭喜你走到了這一步!您已經能夠構建一個有效的聊天系統。

在后續文章中,我將重點介紹如何為客戶端構建聊天用戶界面、創建單元和功能測試、使用 WebSockets 和異步請求微調我們的工作線程環境以加快響應時間,并最終在 AWS 上部署聊天應用程序。

原文鏈接:https://www.freecodecamp.org/news/how-to-build-an-ai-chatbot-with-redis-python-and-gpt/

上一篇:

OpenAI ChatGPT API 與 React JS 的完美結合:全面指南

下一篇:

中通快運單號查詢:輕松實現快遞狀態實時追蹤
#你可能也喜歡這些API文章!

我們有何不同?

API服務商零注冊

多API并行試用

數據驅動選型,提升決策效率

查看全部API→
??

熱門場景實測,選對API

#AI文本生成大模型API

對比大模型API的內容創意新穎性、情感共鳴力、商業轉化潛力

25個渠道
一鍵對比試用API 限時免費

#AI深度推理大模型API

對比大模型API的邏輯推理準確性、分析深度、可視化建議合理性

10個渠道
一鍵對比試用API 限時免費