
使用NestJS和Prisma構建REST API:身份驗證
但是有點我們是需要注意,上面的只是說我們實現了異步任務,但是由于我們在接口中進行await,所以盡管的任務是異步,但是還是需要等待,我們本節主要是學習相關異步化后臺的任務,也就是說我們某些任務放置到后臺或其他地方去執行,不影響當前主線程的運行。
在前面示例中,對于的任務執行了await等待,就會進行任務掛起并等待。假如some_async_function()的任務需要耗時比較久的話,且不在意它相關的處理結果,或者結果可以通過另一種方式進行通知的話,我們可以把some_async_function()的任務進行后臺話,可以理解為放到另一個線程去執行,而不而不需要await等待,如之前的代碼,我們可以使用FasAPI框架所提供的BackgroundTasks的BackgroundTasks類來處理后臺任務,如下示例代碼:
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def some_async_function():
# 后臺任務的代碼
pass
@app.get("/background_task")
async def background_task(background_tasks: BackgroundTasks):
background_tasks.add_task(some_async_function)
return {"message": "后臺任務添加成功"}
當然上面這種方式也不合適長期運行耗時的任務,所以引出我們本小結主要需要了解的Celery庫,—芹菜. 使用Celery我們可以基于異步任務隊列的方式來處理異步任務。
下面我一步一步的介紹如何基于Celery實現異步任務以及延遲任務。
首先我們從官網介紹總結一下關于Celery,它是一個基于Python的分布式任務隊列框架,用于實現異步任務的調度和執行。它的主要作用是可以幫助我們將耗時的任務從主線程中分離出來,不因任務的耗時而空等,以此來提高我們系統的并發性和響應速度。
Celery是用來處理異步任務,所以我們可以使用它來處理以下類似的一些業務場景,例如發送電子郵件、生成報表、處理大量數據等。通過將這些任務放入任務隊列中,可以讓主線程繼續處理其他請求,而不需要等待任務完成。當然Celery還提供定時任務的調度功能,可以讓我們按照設定的時間間隔或者時間點執行任務。
通常我們使用Celery需要進行以下相關以下大致的幾個步驟:
3.當我們的任務發布之后,任務會進入一個消息隊列里面進行等待,等待消費者去消費,所以接下里我們需要啟動Celery Worker:啟動Celery Worker的消費者對象來處理任務隊列中的任務。
例如:
主要我們這里使用消息代理的中間件是redis,所以是先啟動我們的redis,
接著定義相關任務,如下示例代碼:
from fastapi import FastAPI
from celery import Celery
app = FastAPI()
celery_app = Celery("tasks", broker="redis://localhost:6379/0")
# 定義任務
@celery_app.task
def some_celery_task():
# Celery任務的代碼
pass
@app.get("/celery_task")
async def celery_task():
# 開始發布任務
some_celery_task.delay()
return {"message": "Celery 任務發布成功"}
if __name__ == "__main__":
# 使用os.path.basename函數獲取了當前文件的名稱,并將.py文件擴展名替換為空字符串\
# import os
# app_modeel_name = os.path.basename(__file__).replace(".py", "")
from pathlib import Path
# 使用Path函數獲取了當前文件的名稱,并將.py文件擴展名替換為空字符串\
# app_modeel_name = Path(__file__).name.replace(".py", "")
import uvicorn
import inspect
# 根據文件路徑返回模塊名
# print("app_modeel_name:",inspect.getmodulename(Path(__file__).name))
# 使用uvicorn.run函數運行了一個應用程序。它指定了應用程序的主機和端口,并且設置了reload參數為True。
uvicorn.run(f"{inspect.getmodulename(Path(__file__).name)}:app", host='127.0.0.1', port=31110, reload=True,workers=1)
啟動應用并發布任務:
在FastAPI應用程序,當調用 some_celery_task.delay() 來發布任務時,Celery將把任務放入Redis隊列中。
在命令行中,切換到您的項目目錄。啟動Celery worker進程。在命令行中運行以下命令:
celery -A main.celery_app worker --loglevel=info
啟動后得到如下圖所示的結果:
Celery worker進程將從Redis隊列中獲取任務,并執行任務所定義的代碼。當任務被執行時,您將在Celery worker進程的日志中看到相關的日志消息。但是我們運行時候遇到了問題如下:
Did you remember to import the module containing this task?
Or maybe you're using relative imports?
Please see
https://docs.celeryq.dev/en/latest/internals/protocol.html
for more information.
The full contents of the message body was:
b'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)
The full contents of the message headers:
{'lang': 'py', 'task': 'main_celely.some_celery_task', 'id': 'e134d62c-0d8c-46b3-86a8-981411f41eac', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'e134d62c-0d8c-46b3-86a8-981411f41eac', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen15648@xiaozhong', 'ignore_result': False, 'stamped_headers': None, 'stamps': {}}
The delivery info for this task is:
{'exchange': '', 'routing_key': 'celery'}
Traceback (most recent call last):
File "D:\code_loacl\mm_ring_v2\venv\Lib\site-packages\celery\worker\consumer\consumer.py", line 642, in on_task_received
strategy = strategies[type_]
~~~~~~~~~~^^^^^^^
出現上述原因問題在于我們的:
@celery_app.task
def some_celery_task():
# Celery任務的代碼
pass
沒有進行相關實例綁定,我們可以修改為:
@celery_app.task(bind=True)
def some_celery_task(self):
# Celery任務的代碼
import time
time.sleep(10)
pass
然后每次發布任務的時候,重新觀察消費者的任務輸出信息如下:
如上的輸出,表示我們的任務已經被正常進行消費了!
在上面實例我們已簡單完成相關異步任務的處理,通常我們一般需要使用提各種參數來配置和控制其行為。下面介紹一下Celery一些常用的配置項:
PS:配置選項的名稱和具體含義可能會因Celery的版本而有所不同。
'amqp://guest:guest@localhost//'
None
[]
False
True
時,Celery將跟蹤任務的開始狀態,并在任務開始時發送任務狀態更新。None
None
False
True
時,Celery將在任務執行完成后再發送確認消息。這可以確保即使在任務執行期間發生錯誤,任務也不會丟失。False
True
時,Celery將不會存儲任務的執行結果。這可以節省存儲資源,適用于不關心任務結果的情況。除了之前提到的常用配置項外,以下是一些其他常見的Celery配置項的說明:
'json'
task_serializer
相同'celery'
'celery'
task_default_queue
相同4
{}
······其他參數說明,有需要我得話,我在去官方文檔查閱接口。接下里我,我們看看和它對于就是所謂得配置文件。配置文件其實和上面所謂得參數是沒有區別得,也就是說,我們可以使用其他的方式來定義我們的參數傳入,如這些配置項我們還可以可以在Celery的配置文件中進行設置,通常命名為celeryconfig.py或celery.py。如下的示例代碼所示:
下面是一個使用配置文件的示例:
celeryconfig.py
的配置文件,里面包含的內容如下:# celeryconfig.py
# celeryconfig.py
from datetime import timedelta
from celery.schedules import crontab
broker_url = 'amqp://guest:guest@localhost//' # 消息代理的URL
result_backend = 'redis://localhost:6379/0' # 結果存儲后端的URL
task_serializer = 'json' # 任務的序列化器
result_serializer = 'json' # 任務結果的序列化器
timezone = 'Asia/Shanghai' # 使用的時區
task_acks_late = True # 啟用延遲確認
task_ignore_result = False # 不忽略任務結果
task_soft_time_limit = 60 # 任務的軟時間限制為60秒
task_time_limit = 120 # 任務的硬時間限制為120秒
worker_prefetch_multiplier = 4 # 任務執行者的預取乘數
worker_concurrency = 8 # 每個任務執行者的并發處理數量
worker_max_tasks_per_child = 100 # 每個任務執行者最大處理任務數
task_default_priority = 0 # 任務的默認優先級
task_routes = {
'myapp.tasks.email_task': {'queue': 'email_queue'}, # 指定任務的隊列
'myapp.tasks.image_task': {'queue': 'image_queue'}
}
worker_hijack_root_logger = False # 不重寫根日志記錄器
worker_disable_rate_limits = False # 不禁用任務速率限制
beat_schedule = {
'task1': {
'task': 'myapp.tasks.task1',
'schedule': crontab(minute='*/15'), # 每15分鐘執行一次
},
'task2': {
'task': 'myapp.tasks.task2',
'schedule': timedelta(seconds=30), # 每30秒執行一次
}
}
在你的Celery應用程序中加載配置文件。
# celery_app.py
from celery import Celery
app = Celery('myapp')
app.config_from_object('celeryconfig')
在上述示例中,我們創建了一個名為celeryconfig.py
的配置文件,并在其中設置了各種Celery的配置選項。然后,我們在Celery應用程序的入口文件celery_app.py
中加載了該配置文件。
你可以根據自己的需求和應用程序的特定要求,在配置文件中進行相應的配置。注意,Celery的配置文件應該位于與你的Celery應用程序代碼相同的目錄中,或者可以通過正確的路徑進行引用。
確保在啟動Celery應用程序時,使用正確的配置文件加載配置。例如,通過以下命令啟動Celery Worker:
celery -A celery_app worker --loglevel=info
PS: 通常win系統下可能你需要使用如下的方式啟動:
# D:\code_loacl\mm_ring_v2> celery -A src.tasks.app worker -n migutasks --loglevel=info -P eventlet
# celery -A src.tasks.app worker --loglevel=info -P eventlet
# celery -A src.tasks.app flower --address=127.0.0.1 --port=5559
# celery -A tasks worker --loglevel=info -P eventlet
# 啟動celery監控和管理Flower:
# celery -A src.tasks.app flower --address=127.0.0.1 --port=5555
獲取使用代碼的方式啟動:
from src.tasks.app import celery_app
if __name__ == '__main__':
# python -m celery -A src.tasks.app worker -Q mm_ring_v2 --loglevel=info -P eventlet
celery_app.worker_main(argv=['-A', 'src.tasks.app','worker','--loglevel=info', '-Q', 'mm_ring_v2','-P', 'eventlet'])
這樣,Celery將會根據配置文件中的設置來運行任務調度和執行過程。
延遲任務場景我們也通常會遇到,比如超時未支付則自當取消訂單等。下面舉例說明一下具體的實現過程:
基本任務項目結果:
1:定義Celery實例對象
from celery.signals import task_failure, task_retry, task_success
from celery.exceptions import MaxRetriesExceededError
from celery import Task
from src.tasks.service import TaskFactory
# 任務工廠,可以創建多個,目前只是創建了一個celery_app對象
factory = TaskFactory()
# 創建第一個 Celery 應用
celery_app = factory.create_celery_app(namespces='mm_sync_tasks', config_name='src.tasks.config.ProductionConfig')
celery_app.conf["worker_redirect_stdouts"] = False # 禁止重定向工作進程的標準輸出和標準錯誤流
def get_pending_tasks():
with celery_app.connection() as connection:
tasks = connection.default_channel.queue_declare(queue='celery', passive=True).message_count
return tasks
# 移除任務隊列中的所有等待執行的任務
def remove_pending_tasks():
with celery_app.connection() as connection:
connection.default_channel.queue_purge(queue='celery')
# 重新調度任務
def reschedule_tasks():
pending_tasks = get_pending_tasks()
remove_pending_tasks()
# task_prerun:任務開始運行前觸發的信號。
# task_postrun:任務運行完成后觸發的信號。
# task_success:任務成功完成時觸發的信號。
# task_failure:任務失敗時觸發的信號。
# task_retry:任務重試時觸發的信號。
# task_revoked:任務被撤銷時觸發的信號。
# task_rejected:任務被拒絕時觸發的信號。
# worker_ready:Worker準備就緒時觸發的信號。
@task_retry.connect
def handle_task_failure(sender: Task, task_id: str, exception: Exception, traceback, einfo, **kwargs):
if isinstance(exception, MaxRetriesExceededError):
# 處理 MaxRetriesExceededError 異常
print("全局異常錯誤獲?。。?, sender, task_id)
pass
@task_retry.connect
def handle_task_failure(sender: Task, task_id: str, exception: Exception, traceback, einfo, **kwargs):
if isinstance(exception, MaxRetriesExceededError):
# 處理 MaxRetriesExceededError 異常
print("全局異常錯誤獲取??!", sender, task_id)
pass
@task_failure.connect
def handle_task_failure(sender: Task, task_id: str, exception: Exception, traceback, einfo, **kwargs):
# # 處理任務失敗的邏輯
if isinstance(exception, MaxRetriesExceededError):
# 處理 MaxRetriesExceededError 異常
print("全局異常錯誤獲?。?!", sender, task_id)
pass
@task_success.connect
def handle_task_success(sender: Task, result, **kwargs):
# 處理任務成功完成的邏輯
print("handle_task_success!", sender, result)
pass
if __name__ == '__main__':
pass
# D:\code_loacl\mm_ring_v2> celery -A src.tasks.app worker -n migutasks --loglevel=info -P eventlet
# celery -A src.tasks.app worker --loglevel=info -P eventlet
# celery -A src.tasks.app flower --address=127.0.0.1 --port=5559
# celery -A tasks worker --loglevel=info -P eventlet
# 啟動celery監控和管理Flower:
# celery -A src.tasks.app flower --address=127.0.0.1 --port=5555
定義實例配置信息:
# !/usr/bin/envpython
# -*-coding:UTF-8-*-
'''
@File : __init__.py.py
@Contact : 308711822@qq.com
@License : (C) Copyright 2021-2225, Personal exclusive right.
@Modify Time @Author @Version @Desciption
------------ ------- -------- -----------
2023/6/16 11:25 小鐘同學 1.0 None
'''
from kombu import Queue, Exchange
class Config:
# broker = 'redis://127.0.0.1:6379/1' # 任務儲存
# backend = 'redis://127.0.0.1:6379/2' # 結果存儲,執行完之后結果放在這
BROKER_URL = 'redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True
# 全局的任務過期時間
# app.conf.update(result_expires=60)
# 定義隊列名稱
CELERY_QUEUES = (
Queue('mm_ring_v2'),
)
# CELERY_QUEUES = (
# Queue('default', exchange=Exchange('default'), routing_key='default'),
# Queue('app_task1', exchange=Exchange('app_task1'), routing_key='app_task1'),
# Queue('app_task2', exchange=Exchange('app_task2'), routing_key='app_task2'),
# )
# 指定任務走什么對了和routing_key
# CELERY_ROUTES = {
# 'src.tasks.task.migu_order_sync_status': {'queue': 'app_task1', 'routing_key': 'app_task1'},
# 'celery_app.task.task2': {'queue': 'app_task2', 'routing_key': 'app_task2'}
# }
# 指定需要加載的任務
# 指定要導入的任務模塊或任務文件列表
# celery_app.conf.update(
# include=[
# 'src.tasks.task.migu_order_sync_status',
# ]
# )
# 配置需要執行的任務所在的目錄
CELERY_INCLUDE = [
'src.tasks.task.migu_order_sync_status',
]
class DevelopmentConfig(Config):
DEBUG = True
class ProductionConfig(Config):
DEBUG = False
2:定義任務:
import datetime
import signal
from celery import current_app
from celery.result import AsyncResult
from pydantic import BaseModel
def exponential_backoff(retries):
return 2 ** retries
# 定義延遲執行的任務
# expires=3600 將任務結果的存儲有效期設置為 1 小時。
@celery_app.task(bind=True, max_retries=5, default_retry_delay=1, retry_backoff=exponential_backoff(2), expires=3660)
def action_migu_order_sync_status(self, orderid, mobile):
# self.request.retries 表示當前重試的次數
······
需要說明一點是,@celery_app.task:這是一個裝飾器,用于將一個普通的Python函數注冊為Celery任務。celery_app 是你的 Celery 應用實例,task 是 Celery 提供的裝飾器函數。且這個的任務相關參數項說明如下
3:在API接口發布任務
@router.get("/put/code", summary="訂單提交")
def callback(*, forms: PutCodeForm = Depends()):
.....
# 發布任務
result = action_migu_order_sync_status.apply_async(args=(Orders.orderid, Orders.mobile), countdown=60, retry=5,eta=eta, expires=expires,queue='mm_ring_v2')
if result == -1:
return Fail(message='提交失敗')
return Success(message='提交成功')
各個個參數項說明如下:
args=(Orders.orderid, Orders.mobile)
:這個參數用于指定要傳遞給 Celery 任務的位置參數,即任務函數在執行時所需的參數值。在這個例子中,任務函數需要兩個參數,分別對應 Orders.orderid
和 Orders.mobile
。countdown=60
:指定任務在被放入隊列之后需要延遲多少秒才開始執行。在這個例子中,任務會在被加入隊列后延遲 60 秒后開始執行。retry=5
:指定任務在發生錯誤時最多重試的次數。與之前提到的 max_retries
類似,但這里是針對單次任務調用的重試次數。eta=eta
:這個參數用于指定任務的預計執行時間。通常用于將任務調度到未來的某個時間點執行,而不是立即執行。expires=expires
:同樣的意義,指定任務的過期時間,單位為秒。如果任務在指定的時間內沒有被執行,將會被標記為過期并丟棄。queue='mm_ring_v2'
:指定任務被發送到的隊列名稱。Celery 支持將任務發送到不同的隊列中,以便進行任務的分類和分配。綜合起來,這些參數的設置使得對該 Celery 任務的調用具有了一定的靈活性。你可以控制任務的延遲執行、重試次數、預計執行時間以及過期時間,并且可以指定任務發送到哪個隊列中。
4:啟動消費者
if __name__ == '__main__':
pass
# D:\code_loacl\mm_ring_v2> celery -A src.tasks.app worker -n migutasks --loglevel=info -P eventlet
# celery -A src.tasks.app worker --loglevel=info -P eventlet
# celery -A src.tasks.app flower --address=127.0.0.1 --port=5559
# celery -A tasks worker --loglevel=info -P eventlet
5:啟動celery監控和管理Flower
本文章轉載微信公眾號@程序員小鐘同學