
Ollama Python 調用:本地大模型的高效交互方式
24/11/LLM異步API調用/LLM_API調用/agenerate/llm_api_example.py
https://github.com/JieShenAI/csdn/blob/main/24/11/LLM%E5%BC%82%E6%AD%A5API%E8%B0%83%E7%94%A8/LLM_API%E8%B0%83%E7%94%A8/agenerate/llm_api_example.py
py
和 jupyter.ipynb
的不同寫法;agenerate
支持異步 ,對比invoke
不支持異步新建一個.env
文件,在其中存放BASE_URL
和API_KEY
裝包
pip install aiolimiter
以下是一個簡單的異步編程 Demo,展示如何通過協程和令牌池并發處理任務:
import random
import asyncio
from uuid import uuid4
from tqdm import tqdm
from dataclasses import dataclass
from aiolimiter import AsyncLimiter
# 創建限速器,每秒最多發出 5 個請求
limiter = AsyncLimiter(10, 1)
@dataclass
class
Token:
uid: str
idx: int
cnt: int = 0
# 將 connect_web 改為異步函數
async
def
llm_api(data):
t = random.randint(0, 2)
# 使用 asyncio.sleep, 模擬API調用
await asyncio.sleep(t)
return data * 10
# 保持 call_api 異步
async
def
call_api(token, data, rate_limit_seconds=0.5):
token.cnt += 1
async
with limiter:
await asyncio.sleep(rate_limit_seconds)
return
await llm_api(data)
workders = 1
tokens = [Token(uid=str(uuid4()), idx=i) for i in range(workders)]
async
def
_run_task_with_progress(task, pbar):
"""包裝任務以更新進度條"""
result = await task
pbar.update(1)
return result
# 主函數
async
def
main():
nums = 100
data = [i for i in range(nums)]
results = [call_api(tokens[int(i % workders)], item) for i, item in enumerate(data)]
# 使用 tqdm 創建一個進度條
with tqdm(total=len(results)) as pbar:
# 使用 asyncio.gather 并行執行任務
results = await asyncio.gather(
*(_run_task_with_progress(task, pbar) for task in results)
)
return results
# 運行程序
result = asyncio.run(main())
print(result)
在使用異步協程時,一定要限速,不然會被封。
limiter = AsyncLimiter(5, 1)
, 創建限速器,每秒最多發出 5 個請求。tokens[int(i % workders)
令牌輪轉,避免同一個token訪問頻率過高被封。
假如 AsyncLimiter 限速 每秒15條請求,令牌池中有3個token,那么相當于每個token的請求速度降低到了每秒5(15 / 3)條請求。每個token的頻率降低了,但是總的頻率還是很高的。
建議:最好使用多個平臺的API接口。服務商能夠看到我們主機的IP,即便使用了多個token,但是IP是同一個,容易被封IP。目前API的服務器提供商很多,咱們用多個平臺的 API 對服務商也好,壓力散布到多個服務商,不用只霍霍一家服務商。
使用tqdm
與_run_task_with_progress
結合構建進度條
asyncio.gather
函數用于并行運行多個協程,并在所有協程完成后返回結果。利用asyncio.gather實現一個進度條工具,創建一個協程來更新進度條,同時使用asyncio.gather來跟蹤其他協程的完成情況。
使用 tqdm 創建一個進度條對象 pbar,并設置 total 為任務的數量。
使用 asyncio.gather 并行執行所有任務,同時通過 _run_task_with_progress 包裝每個任務以更新進度條。
await
是錯誤的,正確的做法是構建任務列表,然后通過asyncio.gather
并發執行任務。result = [await call_api(tokens[int(i % workers)], item) for i, item in enumerate(data)]
result = [call_api(tokens[int(i % workers)], item) for i, item in enumerate(data)]
asyncio.gather
并發運行任務可以充分利用異步特性,縮短總執行時間。下面的代碼展示了如何使用多個 API 密鑰組成的令牌池來優化 LLM API 調用。我們以.env
文件存儲 API 密鑰為例。
創建.env
文件,存放多個api key 構成令牌池
API_KEY=sk-xxx,sk-xxx,sk-xxx
utils.py
import re
import json
import random
import time
from typing import Union, Dict
def
generate_arithmetic_expression(num: int):
"""
num: 幾個操作符
"""
# 定義操作符和數字范圍,除法
operators = ['+', '-', '*']
expression = f"{random.randint(1, 100)}
{random.choice(operators)}
{random.randint(1, 100)}"
num -= 1
for _ in range(num):
expression = f"{expression}
{random.choice(operators)}
{random.randint(1, 100)}"
result = eval(expression)
expression = expression.replace('*', 'x')
return expression, result
def
re_parse_json(text) -> Union[Dict, None]:
# 提取 JSON 內容
json_match = re.search(r'\{.*?\}', text, re.DOTALL)
if json_match:
json_data = json_match.group(0)
response_data = json.loads(json_data)
return response_data
print(f"異常:\n{text}")
return
None
def
calculate_time_difference(start_time, end_time):
elapsed_time = end_time - start_time
hours, rem = divmod(elapsed_time, 3600)
minutes, seconds = divmod(rem, 60)
milliseconds = (elapsed_time - int(elapsed_time)) * 1000
print(
f"executed in {int(hours):02}:{int(minutes):02}:{int(seconds):02}.{int(milliseconds):03} (h:m:s.ms)"
)
def
time_logger(func):
def
wrapper(*args, **kwargs):
start_time = time.time() # 記錄開始時間
result = func(*args, **kwargs) # 執行目標函數
end_time = time.time() # 記錄結束時間
elapsed_time = end_time - start_time
hours, rem = divmod(elapsed_time, 3600)
minutes, seconds = divmod(rem, 60)
milliseconds = (elapsed_time - int(elapsed_time)) * 1000
print(
f"Function '{func.__name__}' executed in {int(hours):02}:{int(minutes):02}:{int(seconds):02}.{int(milliseconds):03} (h:m:s.ms)")
return result
return wrapper
# 測試生成
if __name__ == "__main__":
expr, res = generate_arithmetic_expression(4)
print(f"生成的運算表達式: {expr}")
print(f"計算結果: {res}")
異步協程核心代碼:
import asyncio
import os
import time
from tqdm import tqdm
from dataclasses import dataclass, field
from typing import List, Tuple, TypedDict
from aiolimiter import AsyncLimiter
# 創建限速器,每秒最多發出 5 個請求
limiter = AsyncLimiter(5, 1)
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from utils import (
generate_arithmetic_expression,
re_parse_json,
calculate_time_difference,
)
@dataclass
class
LLMAPI:
"""
大模型API的調用類
"""
base_url: str
api_key: str # 每個API的key不一樣
uid: int
cnt: int = 0
# 統計每個API被調用了多少次
llm: ChatOpenAI = field(init=False) # 自動創建的對象,不需要用戶傳入
def
__post_init__(self):
# 初始化 llm 對象
self.llm = self.create_llm()
def
create_llm(self):
# 創建 llm 對象
return ChatOpenAI(
model="gpt-4o-mini",
base_url=self.base_url,
api_key=self.api_key,
)
async
def
agenerate(self, text):
self.cnt += 1
res = await self.llm.agenerate([text])
return res
async
def
call_llm(llm: LLMAPI, text: str):
# 異步協程 限速
async
with limiter:
res = await llm.agenerate(text)
return res
async
def
_run_task_with_progress(task, pbar):
"""包裝任務以更新進度條"""
result = await task
pbar.update(1)
return result
async
def
run_api(llms: List[LLMAPI], data: List[str]) -> Tuple[List[str], List[LLMAPI]]:
results = [call_llm(llms[i % len(llms)], text) for i, text in enumerate(data)]
# 使用 tqdm 創建一個進度條
with tqdm(total=len(results)) as pbar:
# 使用 asyncio.gather 并行執行任務
results = await asyncio.gather(
*[_run_task_with_progress(task, pbar) for task in results]
)
return results, llms
if __name__ == "__main__":
load_dotenv()
# 四則運算提示詞模板
prompt_template = """
請將以下表達式的計算結果返回為 JSON 格式:
{{
"expression": "{question}",
"infer": ?
}}
"""
questions = []
labels = []
for _ in range(10000):
question, label = generate_arithmetic_expression(2)
questions.append(prompt_template.format(question=question))
labels.append(label)
start_time = time.time()
# for jupyter
# results, llms = await run_api(api_keys, questions)
api_keys = os.getenv("API_KEY").split(",")
base_url = os.getenv("BASE_URL")
# 創建LLM
llms = [LLMAPI(base_url=base_url, api_key=key, uid=i) for i, key in enumerate(api_keys)]
results, llms = asyncio.run(run_api(llms, questions))
right = 0
# 大模型回答正確
except_cnt = 0
# 大模型不按照json格式返回結果
not_equal = 0
# 大模型解答錯誤
for q, res, label in zip(questions, results, labels):
res = res.generations[0][0].text
try:
res = re_parse_json(res)
if res is
None:
except_cnt += 1
continue
res = res.get("infer", None)
if res is
None:
except_cnt += 1
continue
res = int(res)
if res == label:
right += 1
else:
not_equal += 1
except Exception as e:
print(e)
print(f"question:{q}\nresult:{res}")
print("accuracy: {}%".format(right / len(questions) * 100))
end_time = time.time()
calculate_time_difference(start_time, end_time)
print(right, except_cnt, not_equal)
上述是大模型進行四則運算實戰的代碼,雖然寫的內容有點多了,但是相信大家看完還是會有所收獲的。
如果大家想直接將其應用到自己的代碼中,建議瀏覽run_api
函數。仿照上述類似的流程完成代碼的編寫即可實現。
如下圖是API調用的網頁后臺數據,其在短時間內,發出了多個請求。如果不使用協程,則必須收到上一個請求的結果后,才能發送下一個請求。
在異步協程不限速時,在90條四則運算進行推理,對比花費的時間:
1個key | 3個key | |
---|---|---|
invoke | 5分半 | / |
agenerate | 15秒 | 15秒 |
invoke
不支持異步,agenerate
支持異步。
在異步協程不限速的情況下,發現使用1個key和多key的運行時間是一樣的。這是因為不限速的情況下,會在第一時間把所有的請求發出去,令牌池效果體現不出來。
只有在對異步協程限速的情況下,才能體現出令牌池的效果。在上文的限速部分進行了細致的舉例說明。
若只使用一個令牌,對它限速,確保不讓服務商封號,使用異步協程保持在一個恰當的速度,比較省事。注冊很多賬號,也很磨人。
上圖是運行程序輸出的結果:
如上圖的進度條所示,20秒跑完100條數據,平均每秒處理4.88條數據,大模型計算四則運算的準確率 85%(只在100條數據上實驗,會有波動)。
print(right, except_cnt, not_equal)
的輸出結果是 85 0 15,大模型計算正確85條數據,異常0條,計算錯誤15條。
Question是輸入到大模型的提示詞,LLM Infer是大模型生成的答案,label 是真實的結果。
在實驗中發現,上述提示詞讓大模型做四則運算的準確率不夠高。本文更新了一版提示詞后,準確率達到98%。
我不想花時間琢磨提示詞的編寫,故提示詞也是讓大模型自己生成的。
你是一名擅長數學運算的助手,負責逐步推理并解決四則運算問題。請按照以下步驟進行:
1. 閱讀并理解問題。
2. 分步計算,逐步解決問題。
3. 給出最終的結果。
4. 按照 JSON 格式輸出結果,包括:
- reason: 詳細的推理過程。
- infer: 最終的計算結果。
問題:{問題描述}
請給出分析和結果。
使用上述提示詞后,準確率達到98%。
通過異步編程結合令牌池的設計,可以顯著提高大模型 API 的調用效率。關鍵在于:
asyncio
管理異步任務。asyncio.gather
并發執行。這一思路可以應用于需要高并發的場景,例如自然語言處理、實時數據處理等,助力開發者構建高效的 AI 應用系統。
文章轉自微信公眾號@AI悠閑區