app = FastAPI()

@app.get("/async_task")
async def async_task():
# 異步任務的代碼,并等待執行完成
await some_async_function()
return {"message": "OK"}

但是有點我們是需要注意,上面的只是說我們實現了異步任務,但是由于我們在接口中進行await,所以盡管的任務是異步,但是還是需要等待,我們本節主要是學習相關異步化后臺的任務,也就是說我們某些任務放置到后臺或其他地方去執行,不影響當前主線程的運行。

BackgroundTasks異步化的后臺任務實現方式

在前面示例中,對于的任務執行了await等待,就會進行任務掛起并等待。假如some_async_function()的任務需要耗時比較久的話,且不在意它相關的處理結果,或者結果可以通過另一種方式進行通知的話,我們可以把some_async_function()的任務進行后臺話,可以理解為放到另一個線程去執行,而不而不需要await等待,如之前的代碼,我們可以使用FasAPI框架所提供的BackgroundTasks的BackgroundTasks類來處理后臺任務,如下示例代碼:

from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

def some_async_function():
# 后臺任務的代碼
pass

@app.get("/background_task")
async def background_task(background_tasks: BackgroundTasks):
background_tasks.add_task(some_async_function)
return {"message": "后臺任務添加成功"}

當然上面這種方式也不合適長期運行耗時的任務,所以引出我們本小結主要需要了解的Celery庫,—芹菜. 使用Celery我們可以基于異步任務隊列的方式來處理異步任務。

Celery庫—芹菜.異步化的后臺任務實現方式

下面我一步一步的介紹如何基于Celery實現異步任務以及延遲任務。

首先我們從官網介紹總結一下關于Celery,它是一個基于Python的分布式任務隊列框架,用于實現異步任務的調度和執行。它的主要作用是可以幫助我們將耗時的任務從主線程中分離出來,不因任務的耗時而空等,以此來提高我們系統的并發性和響應速度。

Celery是用來處理異步任務,所以我們可以使用它來處理以下類似的一些業務場景,例如發送電子郵件、生成報表、處理大量數據等。通過將這些任務放入任務隊列中,可以讓主線程繼續處理其他請求,而不需要等待任務完成。當然Celery還提供定時任務的調度功能,可以讓我們按照設定的時間間隔或者時間點執行任務。

Celery簡單使用步驟

通常我們使用Celery需要進行以下相關以下大致的幾個步驟:

  1. 實例化Celery對象來創建一個Celery應用。
  2. 基于已實例化的任務的實例對象來定于該實例所包含的任務:通常我們是使用裝飾器將函數注冊為Celery任務,并設置任務的參數和返回值。(加入我們的是在相關的Fastapi框架進行使用,通常我們需要做就是實例化對象并聲明任務,并進行任務的發布,通常是通過調用Celery應用的apply_async()方法來提交任務到任務隊列中。)

3.當我們的任務發布之后,任務會進入一個消息隊列里面進行等待,等待消費者去消費,所以接下里我們需要啟動Celery Worker:啟動Celery Worker的消費者對象來處理任務隊列中的任務。

例如:

1 定義任務和發布任務

主要我們這里使用消息代理的中間件是redis,所以是先啟動我們的redis,

接著定義相關任務,如下示例代碼:

from fastapi import FastAPI
from celery import Celery

app = FastAPI()
celery_app = Celery("tasks", broker="redis://localhost:6379/0")

# 定義任務
@celery_app.task
def some_celery_task():
# Celery任務的代碼
pass

@app.get("/celery_task")
async def celery_task():
# 開始發布任務
some_celery_task.delay()
return {"message": "Celery 任務發布成功"}

if __name__ == "__main__":
# 使用os.path.basename函數獲取了當前文件的名稱,并將.py文件擴展名替換為空字符串\
# import os
# app_modeel_name = os.path.basename(__file__).replace(".py", "")
from pathlib import Path
# 使用Path函數獲取了當前文件的名稱,并將.py文件擴展名替換為空字符串\
# app_modeel_name = Path(__file__).name.replace(".py", "")
import uvicorn
import inspect
# 根據文件路徑返回模塊名
# print("app_modeel_name:",inspect.getmodulename(Path(__file__).name))
# 使用uvicorn.run函數運行了一個應用程序。它指定了應用程序的主機和端口,并且設置了reload參數為True。
uvicorn.run(f"{inspect.getmodulename(Path(__file__).name)}:app", host='127.0.0.1', port=31110, reload=True,workers=1)

啟動應用并發布任務:

FastAPI應用程序,當調用 some_celery_task.delay() 來發布任務時,Celery將把任務放入Redis隊列中。

2 啟動Celery worker進程,進行任務消費

在命令行中,切換到您的項目目錄。啟動Celery worker進程。在命令行中運行以下命令:

celery -A main.celery_app worker --loglevel=info

啟動后得到如下圖所示的結果:

Celery worker進程將從Redis隊列中獲取任務,并執行任務所定義的代碼。當任務被執行時,您將在Celery worker進程的日志中看到相關的日志消息。但是我們運行時候遇到了問題如下:

Did you remember to import the module containing this task?
Or maybe you're using relative imports?

Please see
https://docs.celeryq.dev/en/latest/internals/protocol.html
for more information.

The full contents of the message body was:
b'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)

The full contents of the message headers:
{'lang': 'py', 'task': 'main_celely.some_celery_task', 'id': 'e134d62c-0d8c-46b3-86a8-981411f41eac', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'e134d62c-0d8c-46b3-86a8-981411f41eac', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen15648@xiaozhong', 'ignore_result': False, 'stamped_headers': None, 'stamps': {}}

The delivery info for this task is:
{'exchange': '', 'routing_key': 'celery'}
Traceback (most recent call last):
File "D:\code_loacl\mm_ring_v2\venv\Lib\site-packages\celery\worker\consumer\consumer.py", line 642, in on_task_received
strategy = strategies[type_]
~~~~~~~~~~^^^^^^^

出現上述原因問題在于我們的:

@celery_app.task
def some_celery_task():
# Celery任務的代碼
pass

沒有進行相關實例綁定,我們可以修改為:

@celery_app.task(bind=True)
def some_celery_task(self):
# Celery任務的代碼
import time
time.sleep(10)
pass

然后每次發布任務的時候,重新觀察消費者的任務輸出信息如下:

如上的輸出,表示我們的任務已經被正常進行消費了!

Celery 詳解之相關參數項

在上面實例我們已簡單完成相關異步任務的處理,通常我們一般需要使用提各種參數來配置和控制其行為。下面介紹一下Celery一些常用的配置項:

PS:配置選項的名稱和具體含義可能會因Celery的版本而有所不同。

  1. broker:
  2. backend:
  3. include:
  4. task_track_started:
  5. task_time_limit:
  6. task_soft_time_limit:
  7. task_acks_late:
  8. task_ignore_result:

除了之前提到的常用配置項外,以下是一些其他常見的Celery配置項的說明:

  1. task_serializer:
  2. result_serializer:
  3. task_default_queue:
  4. task_default_exchange:
  5. task_default_routing_key:
  6. worker_concurrency:
  7. worker_prefetch_multiplier:
  8. beat_schedule:

······其他參數說明,有需要我得話,我在去官方文檔查閱接口。接下里我,我們看看和它對于就是所謂得配置文件。配置文件其實和上面所謂得參數是沒有區別得,也就是說,我們可以使用其他的方式來定義我們的參數傳入,如這些配置項我們還可以可以在Celery的配置文件中進行設置,通常命名為celeryconfig.py或celery.py。如下的示例代碼所示:

下面是一個使用配置文件的示例:

  1. 首先我們創建一個名為celeryconfig.py的配置文件,里面包含的內容如下:
# celeryconfig.py

# celeryconfig.py
from datetime import timedelta
from celery.schedules import crontab

broker_url = 'amqp://guest:guest@localhost//' # 消息代理的URL
result_backend = 'redis://localhost:6379/0' # 結果存儲后端的URL
task_serializer = 'json' # 任務的序列化器
result_serializer = 'json' # 任務結果的序列化器
timezone = 'Asia/Shanghai' # 使用的時區
task_acks_late = True # 啟用延遲確認
task_ignore_result = False # 不忽略任務結果
task_soft_time_limit = 60 # 任務的軟時間限制為60秒
task_time_limit = 120 # 任務的硬時間限制為120秒
worker_prefetch_multiplier = 4 # 任務執行者的預取乘數
worker_concurrency = 8 # 每個任務執行者的并發處理數量
worker_max_tasks_per_child = 100 # 每個任務執行者最大處理任務數
task_default_priority = 0 # 任務的默認優先級
task_routes = {
'myapp.tasks.email_task': {'queue': 'email_queue'}, # 指定任務的隊列
'myapp.tasks.image_task': {'queue': 'image_queue'}
}
worker_hijack_root_logger = False # 不重寫根日志記錄器
worker_disable_rate_limits = False # 不禁用任務速率限制
beat_schedule = {
'task1': {
'task': 'myapp.tasks.task1',
'schedule': crontab(minute='*/15'), # 每15分鐘執行一次
},
'task2': {
'task': 'myapp.tasks.task2',
'schedule': timedelta(seconds=30), # 每30秒執行一次
}
}

在你的Celery應用程序中加載配置文件。

# celery_app.py

from celery import Celery

app = Celery('myapp')
app.config_from_object('celeryconfig')

在上述示例中,我們創建了一個名為celeryconfig.py的配置文件,并在其中設置了各種Celery的配置選項。然后,我們在Celery應用程序的入口文件celery_app.py中加載了該配置文件。

你可以根據自己的需求和應用程序的特定要求,在配置文件中進行相應的配置。注意,Celery的配置文件應該位于與你的Celery應用程序代碼相同的目錄中,或者可以通過正確的路徑進行引用。

確保在啟動Celery應用程序時,使用正確的配置文件加載配置。例如,通過以下命令啟動Celery Worker:

celery -A celery_app worker --loglevel=info

PS: 通常win系統下可能你需要使用如下的方式啟動:

  #  D:\code_loacl\mm_ring_v2> celery -A src.tasks.app worker -n migutasks    --loglevel=info -P eventlet
# celery -A src.tasks.app worker --loglevel=info -P eventlet
# celery -A src.tasks.app flower --address=127.0.0.1 --port=5559
# celery -A tasks worker --loglevel=info -P eventlet

# 啟動celery監控和管理Flower:
# celery -A src.tasks.app flower --address=127.0.0.1 --port=5555

獲取使用代碼的方式啟動:

from src.tasks.app import celery_app

if __name__ == '__main__':
# python -m celery -A src.tasks.app worker -Q mm_ring_v2 --loglevel=info -P eventlet
celery_app.worker_main(argv=['-A', 'src.tasks.app','worker','--loglevel=info', '-Q', 'mm_ring_v2','-P', 'eventlet'])

這樣,Celery將會根據配置文件中的設置來運行任務調度和執行過程。

Celery延時任務的執行

延遲任務場景我們也通常會遇到,比如超時未支付則自當取消訂單等。下面舉例說明一下具體的實現過程:

基本任務項目結果:

1:定義Celery實例對象

from celery.signals import task_failure, task_retry, task_success
from celery.exceptions import MaxRetriesExceededError
from celery import Task

from src.tasks.service import TaskFactory

# 任務工廠,可以創建多個,目前只是創建了一個celery_app對象
factory = TaskFactory()
# 創建第一個 Celery 應用
celery_app = factory.create_celery_app(namespces='mm_sync_tasks', config_name='src.tasks.config.ProductionConfig')
celery_app.conf["worker_redirect_stdouts"] = False # 禁止重定向工作進程的標準輸出和標準錯誤流

def get_pending_tasks():
with celery_app.connection() as connection:
tasks = connection.default_channel.queue_declare(queue='celery', passive=True).message_count
return tasks

# 移除任務隊列中的所有等待執行的任務
def remove_pending_tasks():
with celery_app.connection() as connection:
connection.default_channel.queue_purge(queue='celery')

# 重新調度任務
def reschedule_tasks():
pending_tasks = get_pending_tasks()
remove_pending_tasks()

# task_prerun:任務開始運行前觸發的信號。
# task_postrun:任務運行完成后觸發的信號。
# task_success:任務成功完成時觸發的信號。
# task_failure:任務失敗時觸發的信號。
# task_retry:任務重試時觸發的信號。
# task_revoked:任務被撤銷時觸發的信號。
# task_rejected:任務被拒絕時觸發的信號。
# worker_ready:Worker準備就緒時觸發的信號。

@task_retry.connect
def handle_task_failure(sender: Task, task_id: str, exception: Exception, traceback, einfo, **kwargs):
if isinstance(exception, MaxRetriesExceededError):
# 處理 MaxRetriesExceededError 異常
print("全局異常錯誤獲?。。?, sender, task_id)
pass

@task_retry.connect
def handle_task_failure(sender: Task, task_id: str, exception: Exception, traceback, einfo, **kwargs):
if isinstance(exception, MaxRetriesExceededError):
# 處理 MaxRetriesExceededError 異常
print("全局異常錯誤獲取??!", sender, task_id)
pass

@task_failure.connect
def handle_task_failure(sender: Task, task_id: str, exception: Exception, traceback, einfo, **kwargs):
# # 處理任務失敗的邏輯
if isinstance(exception, MaxRetriesExceededError):
# 處理 MaxRetriesExceededError 異常
print("全局異常錯誤獲?。?!", sender, task_id)
pass

@task_success.connect
def handle_task_success(sender: Task, result, **kwargs):
# 處理任務成功完成的邏輯
print("handle_task_success!", sender, result)
pass

if __name__ == '__main__':
pass
# D:\code_loacl\mm_ring_v2> celery -A src.tasks.app worker -n migutasks --loglevel=info -P eventlet
# celery -A src.tasks.app worker --loglevel=info -P eventlet
# celery -A src.tasks.app flower --address=127.0.0.1 --port=5559
# celery -A tasks worker --loglevel=info -P eventlet

# 啟動celery監控和管理Flower:
# celery -A src.tasks.app flower --address=127.0.0.1 --port=5555

定義實例配置信息:

# !/usr/bin/envpython
# -*-coding:UTF-8-*-
'''
@File : __init__.py.py
@Contact : 308711822@qq.com
@License : (C) Copyright 2021-2225, Personal exclusive right.

@Modify Time @Author @Version @Desciption
------------ ------- -------- -----------
2023/6/16 11:25 小鐘同學 1.0 None
'''
from kombu import Queue, Exchange

class Config:
# broker = 'redis://127.0.0.1:6379/1' # 任務儲存
# backend = 'redis://127.0.0.1:6379/2' # 結果存儲,執行完之后結果放在這
BROKER_URL = 'redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'

CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True

# 全局的任務過期時間
# app.conf.update(result_expires=60)

# 定義隊列名稱
CELERY_QUEUES = (
Queue('mm_ring_v2'),
)
# CELERY_QUEUES = (
# Queue('default', exchange=Exchange('default'), routing_key='default'),
# Queue('app_task1', exchange=Exchange('app_task1'), routing_key='app_task1'),
# Queue('app_task2', exchange=Exchange('app_task2'), routing_key='app_task2'),
# )
# 指定任務走什么對了和routing_key
# CELERY_ROUTES = {
# 'src.tasks.task.migu_order_sync_status': {'queue': 'app_task1', 'routing_key': 'app_task1'},
# 'celery_app.task.task2': {'queue': 'app_task2', 'routing_key': 'app_task2'}
# }
# 指定需要加載的任務
# 指定要導入的任務模塊或任務文件列表
# celery_app.conf.update(
# include=[
# 'src.tasks.task.migu_order_sync_status',
# ]
# )

# 配置需要執行的任務所在的目錄
CELERY_INCLUDE = [
'src.tasks.task.migu_order_sync_status',
]

class DevelopmentConfig(Config):
DEBUG = True

class ProductionConfig(Config):
DEBUG = False

2:定義任務:

import datetime
import signal

from celery import current_app
from celery.result import AsyncResult

from pydantic import BaseModel

def exponential_backoff(retries):
return 2 ** retries

# 定義延遲執行的任務
# expires=3600 將任務結果的存儲有效期設置為 1 小時。
@celery_app.task(bind=True, max_retries=5, default_retry_delay=1, retry_backoff=exponential_backoff(2), expires=3660)
def action_migu_order_sync_status(self, orderid, mobile):
# self.request.retries 表示當前重試的次數
······

需要說明一點是,@celery_app.task:這是一個裝飾器,用于將一個普通的Python函數注冊為Celery任務。celery_app 是你的 Celery 應用實例,task 是 Celery 提供的裝飾器函數。且這個的任務相關參數項說明如下

3:在API接口發布任務

@router.get("/put/code", summary="訂單提交")
def callback(*, forms: PutCodeForm = Depends()):
.....
# 發布任務
result = action_migu_order_sync_status.apply_async(args=(Orders.orderid, Orders.mobile), countdown=60, retry=5,eta=eta, expires=expires,queue='mm_ring_v2')

if result == -1:
return Fail(message='提交失敗')

return Success(message='提交成功')

各個個參數項說明如下:

  1. args=(Orders.orderid, Orders.mobile):這個參數用于指定要傳遞給 Celery 任務的位置參數,即任務函數在執行時所需的參數值。在這個例子中,任務函數需要兩個參數,分別對應 Orders.orderid 和 Orders.mobile
  2. countdown=60:指定任務在被放入隊列之后需要延遲多少秒才開始執行。在這個例子中,任務會在被加入隊列后延遲 60 秒后開始執行。
  3. retry=5:指定任務在發生錯誤時最多重試的次數。與之前提到的 max_retries 類似,但這里是針對單次任務調用的重試次數。
  4. eta=eta:這個參數用于指定任務的預計執行時間。通常用于將任務調度到未來的某個時間點執行,而不是立即執行。
  5. expires=expires:同樣的意義,指定任務的過期時間,單位為秒。如果任務在指定的時間內沒有被執行,將會被標記為過期并丟棄。
  6. queue='mm_ring_v2':指定任務被發送到的隊列名稱。Celery 支持將任務發送到不同的隊列中,以便進行任務的分類和分配。

綜合起來,這些參數的設置使得對該 Celery 任務的調用具有了一定的靈活性。你可以控制任務的延遲執行、重試次數、預計執行時間以及過期時間,并且可以指定任務發送到哪個隊列中。

4:啟動消費者

if __name__ == '__main__':
pass
# D:\code_loacl\mm_ring_v2> celery -A src.tasks.app worker -n migutasks --loglevel=info -P eventlet
# celery -A src.tasks.app worker --loglevel=info -P eventlet
# celery -A src.tasks.app flower --address=127.0.0.1 --port=5559
# celery -A tasks worker --loglevel=info -P eventlet

5:啟動celery監控和管理Flower

本文章轉載微信公眾號@程序員小鐘同學

上一篇:

FastAPI-Cache2:一個讓接口飛起來的緩存神器

下一篇:

Flask-RESTful:最強Python Web服務框架,輕松構建REST API
#你可能也喜歡這些API文章!

我們有何不同?

API服務商零注冊

多API并行試用

數據驅動選型,提升決策效率

查看全部API→
??

熱門場景實測,選對API

#AI文本生成大模型API

對比大模型API的內容創意新穎性、情感共鳴力、商業轉化潛力

25個渠道
一鍵對比試用API 限時免費

#AI深度推理大模型API

對比大模型API的邏輯推理準確性、分析深度、可視化建議合理性

10個渠道
一鍵對比試用API 限時免費