
如何快速實(shí)現(xiàn)REST API集成以?xún)?yōu)化業(yè)務(wù)流程
但是有點(diǎn)我們是需要注意,上面的只是說(shuō)我們實(shí)現(xiàn)了異步任務(wù),但是由于我們?cè)诮涌谥羞M(jìn)行await,所以盡管的任務(wù)是異步,但是還是需要等待,我們本節(jié)主要是學(xué)習(xí)相關(guān)異步化后臺(tái)的任務(wù),也就是說(shuō)我們某些任務(wù)放置到后臺(tái)或其他地方去執(zhí)行,不影響當(dāng)前主線程的運(yùn)行。
在前面示例中,對(duì)于的任務(wù)執(zhí)行了await等待,就會(huì)進(jìn)行任務(wù)掛起并等待。假如some_async_function()的任務(wù)需要耗時(shí)比較久的話,且不在意它相關(guān)的處理結(jié)果,或者結(jié)果可以通過(guò)另一種方式進(jìn)行通知的話,我們可以把some_async_function()的任務(wù)進(jìn)行后臺(tái)話,可以理解為放到另一個(gè)線程去執(zhí)行,而不而不需要await等待,如之前的代碼,我們可以使用FasAPI框架所提供的BackgroundTasks的BackgroundTasks類(lèi)來(lái)處理后臺(tái)任務(wù),如下示例代碼:
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def some_async_function():
# 后臺(tái)任務(wù)的代碼
pass
@app.get("/background_task")
async def background_task(background_tasks: BackgroundTasks):
background_tasks.add_task(some_async_function)
return {"message": "后臺(tái)任務(wù)添加成功"}
當(dāng)然上面這種方式也不合適長(zhǎng)期運(yùn)行耗時(shí)的任務(wù),所以引出我們本小結(jié)主要需要了解的Celery庫(kù),—芹菜. 使用Celery我們可以基于異步任務(wù)隊(duì)列的方式來(lái)處理異步任務(wù)。
下面我一步一步的介紹如何基于Celery實(shí)現(xiàn)異步任務(wù)以及延遲任務(wù)。
首先我們從官網(wǎng)介紹總結(jié)一下關(guān)于Celery,它是一個(gè)基于Python的分布式任務(wù)隊(duì)列框架,用于實(shí)現(xiàn)異步任務(wù)的調(diào)度和執(zhí)行。它的主要作用是可以幫助我們將耗時(shí)的任務(wù)從主線程中分離出來(lái),不因任務(wù)的耗時(shí)而空等,以此來(lái)提高我們系統(tǒng)的并發(fā)性和響應(yīng)速度。
Celery是用來(lái)處理異步任務(wù),所以我們可以使用它來(lái)處理以下類(lèi)似的一些業(yè)務(wù)場(chǎng)景,例如發(fā)送電子郵件、生成報(bào)表、處理大量數(shù)據(jù)等。通過(guò)將這些任務(wù)放入任務(wù)隊(duì)列中,可以讓主線程繼續(xù)處理其他請(qǐng)求,而不需要等待任務(wù)完成。當(dāng)然Celery還提供定時(shí)任務(wù)的調(diào)度功能,可以讓我們按照設(shè)定的時(shí)間間隔或者時(shí)間點(diǎn)執(zhí)行任務(wù)。
通常我們使用Celery需要進(jìn)行以下相關(guān)以下大致的幾個(gè)步驟:
3.當(dāng)我們的任務(wù)發(fā)布之后,任務(wù)會(huì)進(jìn)入一個(gè)消息隊(duì)列里面進(jìn)行等待,等待消費(fèi)者去消費(fèi),所以接下里我們需要啟動(dòng)Celery Worker:?jiǎn)?dòng)Celery Worker的消費(fèi)者對(duì)象來(lái)處理任務(wù)隊(duì)列中的任務(wù)。
例如:
主要我們這里使用消息代理的中間件是redis,所以是先啟動(dòng)我們的redis,
接著定義相關(guān)任務(wù),如下示例代碼:
from fastapi import FastAPI
from celery import Celery
app = FastAPI()
celery_app = Celery("tasks", broker="redis://localhost:6379/0")
# 定義任務(wù)
@celery_app.task
def some_celery_task():
# Celery任務(wù)的代碼
pass
@app.get("/celery_task")
async def celery_task():
# 開(kāi)始發(fā)布任務(wù)
some_celery_task.delay()
return {"message": "Celery 任務(wù)發(fā)布成功"}
if __name__ == "__main__":
# 使用os.path.basename函數(shù)獲取了當(dāng)前文件的名稱(chēng),并將.py文件擴(kuò)展名替換為空字符串\
# import os
# app_modeel_name = os.path.basename(__file__).replace(".py", "")
from pathlib import Path
# 使用Path函數(shù)獲取了當(dāng)前文件的名稱(chēng),并將.py文件擴(kuò)展名替換為空字符串\
# app_modeel_name = Path(__file__).name.replace(".py", "")
import uvicorn
import inspect
# 根據(jù)文件路徑返回模塊名
# print("app_modeel_name:",inspect.getmodulename(Path(__file__).name))
# 使用uvicorn.run函數(shù)運(yùn)行了一個(gè)應(yīng)用程序。它指定了應(yīng)用程序的主機(jī)和端口,并且設(shè)置了reload參數(shù)為T(mén)rue。
uvicorn.run(f"{inspect.getmodulename(Path(__file__).name)}:app", host='127.0.0.1', port=31110, reload=True,workers=1)
啟動(dòng)應(yīng)用并發(fā)布任務(wù):
在FastAPI應(yīng)用程序,當(dāng)調(diào)用 some_celery_task.delay() 來(lái)發(fā)布任務(wù)時(shí),Celery將把任務(wù)放入Redis隊(duì)列中。
在命令行中,切換到您的項(xiàng)目目錄。啟動(dòng)Celery worker進(jìn)程。在命令行中運(yùn)行以下命令:
celery -A main.celery_app worker --loglevel=info
啟動(dòng)后得到如下圖所示的結(jié)果:
Celery worker進(jìn)程將從Redis隊(duì)列中獲取任務(wù),并執(zhí)行任務(wù)所定義的代碼。當(dāng)任務(wù)被執(zhí)行時(shí),您將在Celery worker進(jìn)程的日志中看到相關(guān)的日志消息。但是我們運(yùn)行時(shí)候遇到了問(wèn)題如下:
Did you remember to import the module containing this task?
Or maybe you're using relative imports?
Please see
https://docs.celeryq.dev/en/latest/internals/protocol.html
for more information.
The full contents of the message body was:
b'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)
The full contents of the message headers:
{'lang': 'py', 'task': 'main_celely.some_celery_task', 'id': 'e134d62c-0d8c-46b3-86a8-981411f41eac', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'e134d62c-0d8c-46b3-86a8-981411f41eac', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen15648@xiaozhong', 'ignore_result': False, 'stamped_headers': None, 'stamps': {}}
The delivery info for this task is:
{'exchange': '', 'routing_key': 'celery'}
Traceback (most recent call last):
File "D:\code_loacl\mm_ring_v2\venv\Lib\site-packages\celery\worker\consumer\consumer.py", line 642, in on_task_received
strategy = strategies[type_]
~~~~~~~~~~^^^^^^^
出現(xiàn)上述原因問(wèn)題在于我們的:
@celery_app.task
def some_celery_task():
# Celery任務(wù)的代碼
pass
沒(méi)有進(jìn)行相關(guān)實(shí)例綁定,我們可以修改為:
@celery_app.task(bind=True)
def some_celery_task(self):
# Celery任務(wù)的代碼
import time
time.sleep(10)
pass
然后每次發(fā)布任務(wù)的時(shí)候,重新觀察消費(fèi)者的任務(wù)輸出信息如下:
如上的輸出,表示我們的任務(wù)已經(jīng)被正常進(jìn)行消費(fèi)了!
在上面實(shí)例我們已簡(jiǎn)單完成相關(guān)異步任務(wù)的處理,通常我們一般需要使用提各種參數(shù)來(lái)配置和控制其行為。下面介紹一下Celery一些常用的配置項(xiàng):
PS:配置選項(xiàng)的名稱(chēng)和具體含義可能會(huì)因Celery的版本而有所不同。
'amqp://guest:guest@localhost//'
None
[]
False
True
時(shí),Celery將跟蹤任務(wù)的開(kāi)始狀態(tài),并在任務(wù)開(kāi)始時(shí)發(fā)送任務(wù)狀態(tài)更新。None
None
False
True
時(shí),Celery將在任務(wù)執(zhí)行完成后再發(fā)送確認(rèn)消息。這可以確保即使在任務(wù)執(zhí)行期間發(fā)生錯(cuò)誤,任務(wù)也不會(huì)丟失。False
True
時(shí),Celery將不會(huì)存儲(chǔ)任務(wù)的執(zhí)行結(jié)果。這可以節(jié)省存儲(chǔ)資源,適用于不關(guān)心任務(wù)結(jié)果的情況。除了之前提到的常用配置項(xiàng)外,以下是一些其他常見(jiàn)的Celery配置項(xiàng)的說(shuō)明:
'json'
task_serializer
相同'celery'
'celery'
task_default_queue
相同4
{}
······其他參數(shù)說(shuō)明,有需要我得話,我在去官方文檔查閱接口。接下里我,我們看看和它對(duì)于就是所謂得配置文件。配置文件其實(shí)和上面所謂得參數(shù)是沒(méi)有區(qū)別得,也就是說(shuō),我們可以使用其他的方式來(lái)定義我們的參數(shù)傳入,如這些配置項(xiàng)我們還可以可以在Celery的配置文件中進(jìn)行設(shè)置,通常命名為celeryconfig.py或celery.py。如下的示例代碼所示:
下面是一個(gè)使用配置文件的示例:
celeryconfig.py
的配置文件,里面包含的內(nèi)容如下:# celeryconfig.py
# celeryconfig.py
from datetime import timedelta
from celery.schedules import crontab
broker_url = 'amqp://guest:guest@localhost//' # 消息代理的URL
result_backend = 'redis://localhost:6379/0' # 結(jié)果存儲(chǔ)后端的URL
task_serializer = 'json' # 任務(wù)的序列化器
result_serializer = 'json' # 任務(wù)結(jié)果的序列化器
timezone = 'Asia/Shanghai' # 使用的時(shí)區(qū)
task_acks_late = True # 啟用延遲確認(rèn)
task_ignore_result = False # 不忽略任務(wù)結(jié)果
task_soft_time_limit = 60 # 任務(wù)的軟時(shí)間限制為60秒
task_time_limit = 120 # 任務(wù)的硬時(shí)間限制為120秒
worker_prefetch_multiplier = 4 # 任務(wù)執(zhí)行者的預(yù)取乘數(shù)
worker_concurrency = 8 # 每個(gè)任務(wù)執(zhí)行者的并發(fā)處理數(shù)量
worker_max_tasks_per_child = 100 # 每個(gè)任務(wù)執(zhí)行者最大處理任務(wù)數(shù)
task_default_priority = 0 # 任務(wù)的默認(rèn)優(yōu)先級(jí)
task_routes = {
'myapp.tasks.email_task': {'queue': 'email_queue'}, # 指定任務(wù)的隊(duì)列
'myapp.tasks.image_task': {'queue': 'image_queue'}
}
worker_hijack_root_logger = False # 不重寫(xiě)根日志記錄器
worker_disable_rate_limits = False # 不禁用任務(wù)速率限制
beat_schedule = {
'task1': {
'task': 'myapp.tasks.task1',
'schedule': crontab(minute='*/15'), # 每15分鐘執(zhí)行一次
},
'task2': {
'task': 'myapp.tasks.task2',
'schedule': timedelta(seconds=30), # 每30秒執(zhí)行一次
}
}
在你的Celery應(yīng)用程序中加載配置文件。
# celery_app.py
from celery import Celery
app = Celery('myapp')
app.config_from_object('celeryconfig')
在上述示例中,我們創(chuàng)建了一個(gè)名為celeryconfig.py
的配置文件,并在其中設(shè)置了各種Celery的配置選項(xiàng)。然后,我們?cè)贑elery應(yīng)用程序的入口文件celery_app.py
中加載了該配置文件。
你可以根據(jù)自己的需求和應(yīng)用程序的特定要求,在配置文件中進(jìn)行相應(yīng)的配置。注意,Celery的配置文件應(yīng)該位于與你的Celery應(yīng)用程序代碼相同的目錄中,或者可以通過(guò)正確的路徑進(jìn)行引用。
確保在啟動(dòng)Celery應(yīng)用程序時(shí),使用正確的配置文件加載配置。例如,通過(guò)以下命令啟動(dòng)Celery Worker:
celery -A celery_app worker --loglevel=info
PS: 通常win系統(tǒng)下可能你需要使用如下的方式啟動(dòng):
# D:\code_loacl\mm_ring_v2> celery -A src.tasks.app worker -n migutasks --loglevel=info -P eventlet
# celery -A src.tasks.app worker --loglevel=info -P eventlet
# celery -A src.tasks.app flower --address=127.0.0.1 --port=5559
# celery -A tasks worker --loglevel=info -P eventlet
# 啟動(dòng)celery監(jiān)控和管理Flower:
# celery -A src.tasks.app flower --address=127.0.0.1 --port=5555
獲取使用代碼的方式啟動(dòng):
from src.tasks.app import celery_app
if __name__ == '__main__':
# python -m celery -A src.tasks.app worker -Q mm_ring_v2 --loglevel=info -P eventlet
celery_app.worker_main(argv=['-A', 'src.tasks.app','worker','--loglevel=info', '-Q', 'mm_ring_v2','-P', 'eventlet'])
這樣,Celery將會(huì)根據(jù)配置文件中的設(shè)置來(lái)運(yùn)行任務(wù)調(diào)度和執(zhí)行過(guò)程。
延遲任務(wù)場(chǎng)景我們也通常會(huì)遇到,比如超時(shí)未支付則自當(dāng)取消訂單等。下面舉例說(shuō)明一下具體的實(shí)現(xiàn)過(guò)程:
基本任務(wù)項(xiàng)目結(jié)果:
1:定義Celery實(shí)例對(duì)象
from celery.signals import task_failure, task_retry, task_success
from celery.exceptions import MaxRetriesExceededError
from celery import Task
from src.tasks.service import TaskFactory
# 任務(wù)工廠,可以創(chuàng)建多個(gè),目前只是創(chuàng)建了一個(gè)celery_app對(duì)象
factory = TaskFactory()
# 創(chuàng)建第一個(gè) Celery 應(yīng)用
celery_app = factory.create_celery_app(namespces='mm_sync_tasks', config_name='src.tasks.config.ProductionConfig')
celery_app.conf["worker_redirect_stdouts"] = False # 禁止重定向工作進(jìn)程的標(biāo)準(zhǔn)輸出和標(biāo)準(zhǔn)錯(cuò)誤流
def get_pending_tasks():
with celery_app.connection() as connection:
tasks = connection.default_channel.queue_declare(queue='celery', passive=True).message_count
return tasks
# 移除任務(wù)隊(duì)列中的所有等待執(zhí)行的任務(wù)
def remove_pending_tasks():
with celery_app.connection() as connection:
connection.default_channel.queue_purge(queue='celery')
# 重新調(diào)度任務(wù)
def reschedule_tasks():
pending_tasks = get_pending_tasks()
remove_pending_tasks()
# task_prerun:任務(wù)開(kāi)始運(yùn)行前觸發(fā)的信號(hào)。
# task_postrun:任務(wù)運(yùn)行完成后觸發(fā)的信號(hào)。
# task_success:任務(wù)成功完成時(shí)觸發(fā)的信號(hào)。
# task_failure:任務(wù)失敗時(shí)觸發(fā)的信號(hào)。
# task_retry:任務(wù)重試時(shí)觸發(fā)的信號(hào)。
# task_revoked:任務(wù)被撤銷(xiāo)時(shí)觸發(fā)的信號(hào)。
# task_rejected:任務(wù)被拒絕時(shí)觸發(fā)的信號(hào)。
# worker_ready:Worker準(zhǔn)備就緒時(shí)觸發(fā)的信號(hào)。
@task_retry.connect
def handle_task_failure(sender: Task, task_id: str, exception: Exception, traceback, einfo, **kwargs):
if isinstance(exception, MaxRetriesExceededError):
# 處理 MaxRetriesExceededError 異常
print("全局異常錯(cuò)誤獲取!!", sender, task_id)
pass
@task_retry.connect
def handle_task_failure(sender: Task, task_id: str, exception: Exception, traceback, einfo, **kwargs):
if isinstance(exception, MaxRetriesExceededError):
# 處理 MaxRetriesExceededError 異常
print("全局異常錯(cuò)誤獲取!!", sender, task_id)
pass
@task_failure.connect
def handle_task_failure(sender: Task, task_id: str, exception: Exception, traceback, einfo, **kwargs):
# # 處理任務(wù)失敗的邏輯
if isinstance(exception, MaxRetriesExceededError):
# 處理 MaxRetriesExceededError 異常
print("全局異常錯(cuò)誤獲取!!", sender, task_id)
pass
@task_success.connect
def handle_task_success(sender: Task, result, **kwargs):
# 處理任務(wù)成功完成的邏輯
print("handle_task_success!", sender, result)
pass
if __name__ == '__main__':
pass
# D:\code_loacl\mm_ring_v2> celery -A src.tasks.app worker -n migutasks --loglevel=info -P eventlet
# celery -A src.tasks.app worker --loglevel=info -P eventlet
# celery -A src.tasks.app flower --address=127.0.0.1 --port=5559
# celery -A tasks worker --loglevel=info -P eventlet
# 啟動(dòng)celery監(jiān)控和管理Flower:
# celery -A src.tasks.app flower --address=127.0.0.1 --port=5555
定義實(shí)例配置信息:
# !/usr/bin/envpython
# -*-coding:UTF-8-*-
'''
@File : __init__.py.py
@Contact : 308711822@qq.com
@License : (C) Copyright 2021-2225, Personal exclusive right.
@Modify Time @Author @Version @Desciption
------------ ------- -------- -----------
2023/6/16 11:25 小鐘同學(xué) 1.0 None
'''
from kombu import Queue, Exchange
class Config:
# broker = 'redis://127.0.0.1:6379/1' # 任務(wù)儲(chǔ)存
# backend = 'redis://127.0.0.1:6379/2' # 結(jié)果存儲(chǔ),執(zhí)行完之后結(jié)果放在這
BROKER_URL = 'redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True
# 全局的任務(wù)過(guò)期時(shí)間
# app.conf.update(result_expires=60)
# 定義隊(duì)列名稱(chēng)
CELERY_QUEUES = (
Queue('mm_ring_v2'),
)
# CELERY_QUEUES = (
# Queue('default', exchange=Exchange('default'), routing_key='default'),
# Queue('app_task1', exchange=Exchange('app_task1'), routing_key='app_task1'),
# Queue('app_task2', exchange=Exchange('app_task2'), routing_key='app_task2'),
# )
# 指定任務(wù)走什么對(duì)了和routing_key
# CELERY_ROUTES = {
# 'src.tasks.task.migu_order_sync_status': {'queue': 'app_task1', 'routing_key': 'app_task1'},
# 'celery_app.task.task2': {'queue': 'app_task2', 'routing_key': 'app_task2'}
# }
# 指定需要加載的任務(wù)
# 指定要導(dǎo)入的任務(wù)模塊或任務(wù)文件列表
# celery_app.conf.update(
# include=[
# 'src.tasks.task.migu_order_sync_status',
# ]
# )
# 配置需要執(zhí)行的任務(wù)所在的目錄
CELERY_INCLUDE = [
'src.tasks.task.migu_order_sync_status',
]
class DevelopmentConfig(Config):
DEBUG = True
class ProductionConfig(Config):
DEBUG = False
2:定義任務(wù):
import datetime
import signal
from celery import current_app
from celery.result import AsyncResult
from pydantic import BaseModel
def exponential_backoff(retries):
return 2 ** retries
# 定義延遲執(zhí)行的任務(wù)
# expires=3600 將任務(wù)結(jié)果的存儲(chǔ)有效期設(shè)置為 1 小時(shí)。
@celery_app.task(bind=True, max_retries=5, default_retry_delay=1, retry_backoff=exponential_backoff(2), expires=3660)
def action_migu_order_sync_status(self, orderid, mobile):
# self.request.retries 表示當(dāng)前重試的次數(shù)
······
需要說(shuō)明一點(diǎn)是,@celery_app.task:這是一個(gè)裝飾器,用于將一個(gè)普通的Python函數(shù)注冊(cè)為Celery任務(wù)。celery_app 是你的 Celery 應(yīng)用實(shí)例,task 是 Celery 提供的裝飾器函數(shù)。且這個(gè)的任務(wù)相關(guān)參數(shù)項(xiàng)說(shuō)明如下
3:在API接口發(fā)布任務(wù)
@router.get("/put/code", summary="訂單提交")
def callback(*, forms: PutCodeForm = Depends()):
.....
# 發(fā)布任務(wù)
result = action_migu_order_sync_status.apply_async(args=(Orders.orderid, Orders.mobile), countdown=60, retry=5,eta=eta, expires=expires,queue='mm_ring_v2')
if result == -1:
return Fail(message='提交失敗')
return Success(message='提交成功')
各個(gè)個(gè)參數(shù)項(xiàng)說(shuō)明如下:
args=(Orders.orderid, Orders.mobile)
:這個(gè)參數(shù)用于指定要傳遞給 Celery 任務(wù)的位置參數(shù),即任務(wù)函數(shù)在執(zhí)行時(shí)所需的參數(shù)值。在這個(gè)例子中,任務(wù)函數(shù)需要兩個(gè)參數(shù),分別對(duì)應(yīng) Orders.orderid
和 Orders.mobile
。countdown=60
:指定任務(wù)在被放入隊(duì)列之后需要延遲多少秒才開(kāi)始執(zhí)行。在這個(gè)例子中,任務(wù)會(huì)在被加入隊(duì)列后延遲 60 秒后開(kāi)始執(zhí)行。retry=5
:指定任務(wù)在發(fā)生錯(cuò)誤時(shí)最多重試的次數(shù)。與之前提到的 max_retries
類(lèi)似,但這里是針對(duì)單次任務(wù)調(diào)用的重試次數(shù)。eta=eta
:這個(gè)參數(shù)用于指定任務(wù)的預(yù)計(jì)執(zhí)行時(shí)間。通常用于將任務(wù)調(diào)度到未來(lái)的某個(gè)時(shí)間點(diǎn)執(zhí)行,而不是立即執(zhí)行。expires=expires
:同樣的意義,指定任務(wù)的過(guò)期時(shí)間,單位為秒。如果任務(wù)在指定的時(shí)間內(nèi)沒(méi)有被執(zhí)行,將會(huì)被標(biāo)記為過(guò)期并丟棄。queue='mm_ring_v2'
:指定任務(wù)被發(fā)送到的隊(duì)列名稱(chēng)。Celery 支持將任務(wù)發(fā)送到不同的隊(duì)列中,以便進(jìn)行任務(wù)的分類(lèi)和分配。綜合起來(lái),這些參數(shù)的設(shè)置使得對(duì)該 Celery 任務(wù)的調(diào)用具有了一定的靈活性。你可以控制任務(wù)的延遲執(zhí)行、重試次數(shù)、預(yù)計(jì)執(zhí)行時(shí)間以及過(guò)期時(shí)間,并且可以指定任務(wù)發(fā)送到哪個(gè)隊(duì)列中。
4:?jiǎn)?dòng)消費(fèi)者
if __name__ == '__main__':
pass
# D:\code_loacl\mm_ring_v2> celery -A src.tasks.app worker -n migutasks --loglevel=info -P eventlet
# celery -A src.tasks.app worker --loglevel=info -P eventlet
# celery -A src.tasks.app flower --address=127.0.0.1 --port=5559
# celery -A tasks worker --loglevel=info -P eventlet
5:?jiǎn)?dòng)celery監(jiān)控和管理Flower
本文章轉(zhuǎn)載微信公眾號(hào)@程序員小鐘同學(xué)
對(duì)比大模型API的內(nèi)容創(chuàng)意新穎性、情感共鳴力、商業(yè)轉(zhuǎn)化潛力
一鍵對(duì)比試用API 限時(shí)免費(fèi)