
全網最詳細的Spring入門教程
cd rag-fastapi-project
現在,讓我們安裝必要的軟件包。創建包含以下內容的文件:requirements.txt
langchain
langchain-openai
langchain-core
langchain_community
docx2txt
pypdf
langchain_chroma
python-multipart
fastapi
uvicorn
使用 pip 安裝這些軟件包:
pip install -r requirements.txt
設置好環境后,我們就可以開始構建生產就緒的 RAG 聊天機器人 API。在下一節中,我們將深入研究項目結構并開始實現我們的 FastAPI 應用程序。
當然!讓我們繼續下一部分,在那里我們將討論項目結構概述。本節將幫助讀者了解我們如何組織代碼以提高可維護性和可伸縮性。(當然!讓我們進入下一部分,我們將討論項目結構概述。這一節將幫助讀者了解我們如何組織代碼以實現更好的可維護性和可擴展性。)
在從原型向生產就緒型應用過渡的過程中,合理的代碼組織變得至關重要。結構良好的項目更易于維護、測試和擴展。對于我們的RAG聊天機器人API,我們將采用模塊化結構,以分離關注點并促進代碼重用。
以下是我們項目結構的概述:
rag-fastapi-project/
│
├── main.py
├── chroma_utils.py
├── db_utils.py
├── langchain_utils.py
├── pydantic_models.py
├── requirements.txt
└── chroma_db/ (directory for Chroma persistence)
讓我們分解每個文件的用途:
此結構遵循 FastAPI 應用程序的最佳實踐,并為構建我們的 RAG 聊天機器人 API 提供了堅實的基礎。在本教程中,我們將深入研究這些文件,解釋它們的內容以及它們如何協同工作以創建我們的生產就緒系統。
main.py?文件是我們 FastAPI 應用程序的核心。它定義了我們的 API 端點并編排了我們系統的不同組件之間的交互。讓我們分解此文件的關鍵元素:(main.py文件是我們FastAPI應用的核心。它定義了我們的API端點,并協調我們系統不同組件之間的交互。讓我們分解此文件的關鍵元素:)
from fastapi import FastAPI, File, UploadFile, HTTPException
from pydantic_models import QueryInput, QueryResponse, DocumentInfo, DeleteFileRequest
from langchain_utils import get_rag_chain
from db_utils import insert_application_logs, get_chat_history, get_all_documents, insert_document_record, delete_document_record
from chroma_utils import index_document_to_chroma, delete_doc_from_chroma
import os
import uuid
import logging
import shutil
# Set up logging
logging.basicConfig(filename='app.log', level=logging.INFO)
# Initialize FastAPI app
app = FastAPI()
在這里,我們導入必要的模塊并初始化我們的 FastAPI 應用程序。我們還設置了基本日志記錄來跟蹤應用程序中的重要事件。
現在,讓我們看看我們的主要 API 端點:
@app.post("/chat", response_model=QueryResponse)
def chat(query_input: QueryInput):
session_id = query_input.session_id or str(uuid.uuid4())
logging.info(f"Session ID: {session_id}, User Query: {query_input.question}, Model: {query_input.model.value}")
chat_history = get_chat_history(session_id)
rag_chain = get_rag_chain(query_input.model.value)
answer = rag_chain.invoke({
"input": query_input.question,
"chat_history": chat_history
})['answer']
insert_application_logs(session_id, query_input.question, answer, query_input.model.value)
logging.info(f"Session ID: {session_id}, AI Response: {answer}")
return QueryResponse(answer=answer, session_id=session_id, model=query_input.model)
此終端節點處理聊天交互。如果未提供,它會生成會話 ID,檢索聊天記錄,調用 RAG 鏈以生成響應,記錄交互并返回響應。(此端點處理聊天交互。如果未提供會話ID,則生成一個會話ID,檢索聊天歷史,調用RAG鏈生成響應,記錄交互,并返回響應。)
@app.post("/upload-doc")
def upload_and_index_document(file: UploadFile = File(...)):
allowed_extensions = ['.pdf', '.docx', '.html']
file_extension = os.path.splitext(file.filename)[1].lower()
if file_extension not in allowed_extensions:
raise HTTPException(status_code=400, detail=f"Unsupported file type. Allowed types are: {', '.join(allowed_extensions)}")
temp_file_path = f"temp_{file.filename}"
try:
# Save the uploaded file to a temporary file
with open(temp_file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
file_id = insert_document_record(file.filename)
success = index_document_to_chroma(temp_file_path, file_id)
if success:
return {"message": f"File {file.filename} has been successfully uploaded and indexed.", "file_id": file_id}
else:
delete_document_record(file_id)
raise HTTPException(status_code=500, detail=f"Failed to index {file.filename}.")
finally:
if os.path.exists(temp_file_path):
os.remove(temp_file_path)
此終端節點處理文檔上傳。它會檢查允許的文件類型,臨時保存文件,在 Chroma 中為其編制索引,并更新數據庫中的文檔記錄。
@app.get("/list-docs", response_model=list[DocumentInfo])
def list_documents():
return get_all_documents()
這個簡單的端點返回所有索引文檔的列表。
@app.post("/delete-doc")
def delete_document(request: DeleteFileRequest):
chroma_delete_success = delete_doc_from_chroma(request.file_id)
if chroma_delete_success:
db_delete_success = delete_document_record(request.file_id)
if db_delete_success:
return {"message": f"Successfully deleted document with file_id {request.file_id} from the system."}
else:
return {"error": f"Deleted from Chroma but failed to delete document with file_id {request.file_id} from the database."}
else:
return {"error": f"Failed to delete document with file_id {request.file_id} from Chroma."}
此端點處理文檔刪除,從 Chroma 和數據庫中刪除文檔。
Pydantic是一個數據驗證庫,它使用Python類型注解來定義數據模式。在我們的FastAPI應用中,我們使用Pydantic模型來定義請求和響應數據的結構。讓我們分解在models.py中定義的模型:
from pydantic import BaseModel, Field
from enum import Enum
from datetime import datetime
class ModelName(str, Enum):
GPT4_O = "gpt-4o"
GPT4_O_MINI = "gpt-4o-mini"
class QueryInput(BaseModel):
question: str
session_id: str = Field(default=None)
model: ModelName = Field(default=ModelName.GPT4_O_MINI)
class QueryResponse(BaseModel):
answer: str
session_id: str
model: ModelName
class DocumentInfo(BaseModel):
id: int
filename: str
upload_timestamp: datetime
class DeleteFileRequest(BaseModel):
file_id: int
讓我們看看每個模型及其用途:
ModelName
(枚舉):
QueryInput
:
question
:用戶的問題 (必填)。session_id
:可選會話 ID。如果未提供,將生成一個。model
:要使用的語言模型,默認為 GPT4_O_MINI。QueryResponse
:
answer
:生成的答案。session_id
:會話 ID(用于繼續對話)。model
:用于生成響應的模型。DocumentInfo
:
id
:文檔的唯一標識符。filename
:上傳文件的名稱。upload_timestamp
:上傳文檔并為其編制索引的時間。DeleteFileRequest
:
file_id
:要刪除的文檔的 ID。在我們的?main.py?中,我們使用這些模型來定義請求和響應數據的形狀。例如:
@app.post("/chat", response_model=QueryResponse)
def chat(query_input: QueryInput):
# Function implementation
在這里,FastAPI 用于驗證傳入的請求數據,以及驗證和序列化響應。這可確保我們的 API 行為一致,并在提供無效數據時提供清晰的錯誤消息。(在這里,FastAPI使用Pydantic模型來驗證傳入的請求數據,并驗證和序列化響應。這確保了我們的API行為一致,并在提供無效數據時提供清晰的錯誤消息,QueryInput和QueryResponse)QueryInputQueryResponse
隨著我們的API不斷發展,我們可以輕松地擴展這些模型。例如,如果我們想為我們的文檔信息添加更多元數據,只需向模型中添加字段即可:DocumentInfo
class DocumentInfo(BaseModel):
id: int
filename: str
upload_timestamp: datetime
file_size: int # New field
content_type: str # New field
FastAPI 和 Pydantic 將自動處理新字段,提供驗證和文檔,而無需對我們的端點邏輯進行任何更改。
通過使用 Pydantic 模型,我們為 API 創建了強大的基礎,確保了數據的完整性,并為我們的端點提供了清晰的契約。這種方法顯著減少了我們需要編寫的手動驗證代碼的數量,并有助于防止與錯誤數據處理相關的漏洞。
(utils.py
?文件包含用于與我們的 SQLite 數據庫交互的函數。我們使用 SQLite 是因為它的簡單性和易于設置,使其非常適合原型設計和中小型應用程序。讓我們分解此文件的關鍵組件:db_
utils.py
文件包含與SQLite數據庫交互的函數。我們選擇SQLite是因為其簡單性和易于設置的特性,使其非常適合原型設計和中小型應用程序。讓我們分解該文件的關鍵組件:)
import sqlite3
from datetime import datetime
DB_NAME = "rag_app.db"
def get_db_connection():
conn = sqlite3.connect(DB_NAME)
conn.row_factory = sqlite3.Row
return conn
我們首先導入必要的模塊并定義數據庫名稱。get_db_connection()
函數用于創建與SQLite數據庫的連接,并將行工廠設置為sqlite3.Row
,以便更容易地訪問數據。
def create_application_logs():
conn = get_db_connection()
conn.execute('''CREATE TABLE IF NOT EXISTS application_logs
(id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT,
user_query TEXT,
gpt_response TEXT,
model TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')
conn.close()
def create_document_store():
conn = get_db_connection()
conn.execute('''CREATE TABLE IF NOT EXISTS document_store
(id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT,
upload_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')
conn.close()
這些函數創建了我們的兩個主要表:
application_logs
:存儲聊天歷史記錄和模型響應。document_store
:跟蹤上傳的文檔。def insert_application_logs(session_id, user_query, gpt_response, model):
conn = get_db_connection()
conn.execute('INSERT INTO application_logs (session_id, user_query, gpt_response, model) VALUES (?, ?, ?, ?)',
(session_id, user_query, gpt_response, model))
conn.commit()
conn.close()
def get_chat_history(session_id):
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('SELECT user_query, gpt_response FROM application_logs WHERE session_id = ? ORDER BY created_at', (session_id,))
messages = []
for row in cursor.fetchall():
messages.extend([
{"role": "human", "content": row['user_query']},
{"role": "ai", "content": row['gpt_response']}
])
conn.close()
return messages
這些函數處理插入新的聊天日志和檢索給定會話的聊天歷史記錄。聊天記錄的格式設置為我們的 RAG 系統易于使用。
def insert_document_record(filename):
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('INSERT INTO document_store (filename) VALUES (?)', (filename,))
file_id = cursor.lastrowid
conn.commit()
conn.close()
return file_id
def delete_document_record(file_id):
conn = get_db_connection()
conn.execute('DELETE FROM document_store WHERE id = ?', (file_id,))
conn.commit()
conn.close()
return True
def get_all_documents():
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('SELECT id, filename, upload_timestamp FROM document_store ORDER BY upload_timestamp DESC')
documents = cursor.fetchall()
conn.close()
return [dict(doc) for doc in documents]
這些函數處理文檔記錄的 CRUD 操作:
在文件末尾,我們初始化我們的數據庫表:
# Initialize the database tables
create_application_logs()
create_document_store()
這可確保在應用程序啟動時創建我們的表(如果它們尚不存在)。(這確保了當應用程序啟動時(如果表尚不存在),我們的表會被創建。)
通過將數據庫操作集中在utils.py
中,我們保持了關注點的清晰分離。我們的主要應用程序邏輯無需擔心數據庫交互的細節,從而使代碼更加模塊化且易于維護。
在生產環境中,您可以考慮使用像 SQLAlchemy 這樣的 ORM(對象關系映射)庫來實現更復雜的數據庫操作和更好的可擴展性。但是,對于我們當前的需求,這種簡單的 SQLite 實現效果很好。(在生產環境中,對于更復雜的數據庫操作和更好的可擴展性,您可能會考慮使用像SQLAlchemy這樣的ORM(對象關系映射)庫。然而,對于我們當前的需求,這種直接的SQLite實現已經足夠好了。)
utils.py
文件包含與Chroma向量存儲交互的函數,這對于我們的檢索增強型生成(RAG)系統的檢索功能至關重要。讓我們分解該文件的關鍵組件:
from langchain_community.document_loaders import PyPDFLoader, Docx2txtLoader, UnstructuredHTMLLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_chroma import Chroma
from typing import List
from langchain_core.documents import Document
import os
# Initialize text splitter and embedding function
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200, length_function=len)
embedding_function = OpenAIEmbeddings()
# Initialize Chroma vector store
vectorstore = Chroma(persist_directory="./chroma_db", embedding_function=embedding_function)
在這里,我們導入了必要的模塊,并初始化了文本分割器、嵌入函數和Chroma向量存儲。RecursiveCharacterTextSplitter
用于將文檔分割成可管理的塊,而OpenAIEmbeddings
則提供了我們的文檔嵌入函數。
def load_and_split_document(file_path: str) -> List[Document]:
if file_path.endswith('.pdf'):
loader = PyPDFLoader(file_path)
elif file_path.endswith('.docx'):
loader = Docx2txtLoader(file_path)
elif file_path.endswith('.html'):
loader = UnstructuredHTMLLoader(file_path)
else:
raise ValueError(f"Unsupported file type: {file_path}")
documents = loader.load()
return text_splitter.split_documents(documents)
此函數處理加載不同的文檔類型(PDF、DOCX、HTML)并將它們拆分為塊。它根據文件擴展名使用適當的加載器,然后應用我們的文本拆分器來創建可管理的文檔塊。
def index_document_to_chroma(file_path: str, file_id: int) -> bool:
try:
splits = load_and_split_document(file_path)
# Add metadata to each split
for split in splits:
split.metadata['file_id'] = file_id
vectorstore.add_documents(splits)
return True
except Exception as e:
print(f"Error indexing document: {e}")
return False
此函數采用文件路徑和文件 ID,加載并拆分文檔,將元數據(文件 ID)添加到每個拆分中,然后將這些文檔塊添加到我們的 Chroma 矢量存儲中。元數據允許我們將 vector store 條目鏈接回我們的數據庫記錄。(此函數接受文件路徑和文件ID,加載并分割文檔,為每個分割添加元數據(文件ID),然后將這些文檔塊添加到我們的Chroma向量存儲中。元數據使我們能夠將向量存儲條目鏈接回我們的數據庫記錄。)
def delete_doc_from_chroma(file_id: int):
try:
docs = vectorstore.get(where={"file_id": file_id})
print(f"Found {len(docs['ids'])} document chunks for file_id {file_id}")
vectorstore._collection.delete(where={"file_id": file_id})
print(f"Deleted all documents with file_id {file_id}")
return True
except Exception as e:
print(f"Error deleting document with file_id {file_id} from Chroma: {str(e)}")
return False
此函數從 Chroma 矢量存儲中刪除與給定文件 ID 關聯的所有文檔塊。它首先檢索文檔以確認其存在,然后執行刪除。
盡管在utils.py
文件中沒有明確顯示,但Chroma向量存儲對于我們的RAG系統的檢索步驟至關重要。在utils.py
中,我們使用此向量存儲來創建一個檢索器:
retriever = vectorstore.as_retriever(search_kwargs={"k": 2})
然后,在我們的 RAG 鏈中使用此檢索器,以根據用戶的查詢獲取相關的文檔塊。
通過將我們的向量存儲操作集中在utils.py
中,我們保持了關注點的清晰分離,并在未來需要時更容易替換或升級我們的向量存儲實現。
utils.py
文件是我們使用LangChain實現檢索增強型生成(RAG)系統核心的地方。此文件設置了語言模型、檢索器和RAG鏈。讓我們分解其關鍵組件:
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.chains import create_history_aware_retriever, create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from typing import List
from langchain_core.documents import Document
import os
from chroma_utils import vectorstore
retriever = vectorstore.as_retriever(search_kwargs={"k": 2})
output_parser = StrOutputParser()
在這里,我們導入必要的 LangChain 組件,并使用我們之前創建的 Chroma vectorstore 設置我們的檢索器。我們還初始化了一個字符串輸出解析器,用于處理語言模型的輸出。
contextualize_q_system_prompt = (
"Given a chat history and the latest user question "
"which might reference context in the chat history, "
"formulate a standalone question which can be understood "
"without the chat history. Do NOT answer the question, "
"just reformulate it if needed and otherwise return it as is."
)
contextualize_q_prompt = ChatPromptTemplate.from_messages([
("system", contextualize_q_system_prompt),
MessagesPlaceholder("chat_history"),
("human", "{input}"),
])
qa_prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful AI assistant. Use the following context to answer the user's question."),
("system", "Context: {context}"),
MessagesPlaceholder(variable_name="chat_history"),
("human", "{input}")
])
我們定義了兩個主要提示:
contextualize_q_prompt
:用于根據聊天記錄重新構建用戶的問題。qa_prompt
:用于根據檢索到的上下文和聊天歷史記錄生成最終答案。def get_rag_chain(model="gpt-4o-mini"):
llm = ChatOpenAI(model=model)
history_aware_retriever = create_history_aware_retriever(llm, retriever, contextualize_q_prompt)
question_answer_chain = create_stuff_documents_chain(llm, qa_prompt)
rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain)
return rag_chain
此函數創建我們的 RAG 鏈:
ChatOpenAI
在我們的main.py中,我們在聊天端點使用了這個RAG鏈:
@app.post("/chat", response_model=QueryResponse)
def chat(query_input: QueryInput):
# ... (other code)
rag_chain = get_rag_chain(query_input.model.value)
answer = rag_chain.invoke({
"input": query_input.question,
"chat_history": chat_history
})['answer']
# ... (rest of the function)
這顯示了如何使用用戶指定的模型實例化 RAG 鏈,以及如何使用用戶的問題和聊天記錄調用。
通過將我們的 LangChain 邏輯集中在?(通過將LangChain邏輯集中在utils.py中,我們保持了關注點的清晰分離,并使得未來修改或擴展RAG系統變得更加容易。這種模塊化方法使我們能夠輕松嘗試不同的模型、檢索器或鏈結構,而不會影響應用程序的其他部分。)utils.py
?中,我們保持了清晰的關注點分離,并使得將來更容易修改或擴展我們的 RAG 系統。這種模塊化方法使我們能夠輕松地試驗不同的模型、檢索器或鏈結構,而不會影響應用程序的其余部分。langchain_
在本教程中,我們逐步了解了如何使用FastAPI和LangChain構建一個生產就緒的檢索增強生成(RAG)聊天機器人。讓我們回顧一下我們所完成的工作,并討論一些關鍵收獲和可能的下一步行動。
此架構允許可在生產環境中部署可擴展、可維護和可擴展的 RAG 系統。
構建生產就緒型 RAG 聊天機器人涉及的不僅僅是將語言模型連接到文檔存儲。它需要仔細考慮數據流、錯誤處理、可擴展性和用戶體驗。我們構建的系統提供了堅實的基礎,可以進行調整和擴展以滿足特定的業務需求。(我們所構建的系統提供了一個堅實的基礎,可以根據特定的業務需求進行適應和擴展。)
隨著AI和自然語言處理技術的不斷發展,像這樣的系統對于創建智能的、支持上下文感知的應用將變得越來越重要。通過了解RAG系統的原理和組件,您將在自己的項目中構建和改進這項技術方面做好準備。
請記住,成功的RAG系統的關鍵不僅在于各個組件本身,還在于它們如何協同工作以創建無縫、智能的交互。基于實際使用的持續測試、監控和精煉對于確保您的RAG聊天機器人的長期成功和有效性至關重要。
為了幫助您進一步了解和實施這個 RAG 聊天機器人系統,我準備了一些額外的資源:
在 FutureSmart AI,我們專注于根據您的特定需求構建定制的自然語言處理 (NLP) 解決方案。我們的專業知識不僅限于 RAG 系統,還包括:
我們已經成功地為各個行業實施了這些技術,幫助企業利用 AI 的強大功能來增強其運營和用戶體驗。
無論您是希望實施像我們在本教程中構建的 RAG 系統,還是有更具體的 NLP 需求,我們 FutureSmart AI 的團隊都可以幫助您將 AI 愿望變為現實。(無論您是想實施像本教程中構建的RAG系統,還是有更具體的NLP需求,FutureSmart AI的團隊都在這里幫助您將AI愿景變為現實。)
原文鏈接:https://blog.futuresmart.ai/building-a-production-ready-rag-chatbot-with-fastapi-and-langchain