app = FastAPI()

@app.get("/async_task")
async def async_task():
# 異步任務(wù)的代碼,并等待執(zhí)行完成
await some_async_function()
return {"message": "OK"}

但是有點(diǎn)我們是需要注意,上面的只是說(shuō)我們實(shí)現(xiàn)了異步任務(wù),但是由于我們?cè)诮涌谥羞M(jìn)行await,所以盡管的任務(wù)是異步,但是還是需要等待,我們本節(jié)主要是學(xué)習(xí)相關(guān)異步化后臺(tái)的任務(wù),也就是說(shuō)我們某些任務(wù)放置到后臺(tái)或其他地方去執(zhí)行,不影響當(dāng)前主線程的運(yùn)行。

BackgroundTasks異步化的后臺(tái)任務(wù)實(shí)現(xiàn)方式

在前面示例中,對(duì)于的任務(wù)執(zhí)行了await等待,就會(huì)進(jìn)行任務(wù)掛起并等待。假如some_async_function()的任務(wù)需要耗時(shí)比較久的話,且不在意它相關(guān)的處理結(jié)果,或者結(jié)果可以通過(guò)另一種方式進(jìn)行通知的話,我們可以把some_async_function()的任務(wù)進(jìn)行后臺(tái)話,可以理解為放到另一個(gè)線程去執(zhí)行,而不而不需要await等待,如之前的代碼,我們可以使用FasAPI框架所提供的BackgroundTasks的BackgroundTasks類(lèi)來(lái)處理后臺(tái)任務(wù),如下示例代碼:

from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

def some_async_function():
# 后臺(tái)任務(wù)的代碼
pass

@app.get("/background_task")
async def background_task(background_tasks: BackgroundTasks):
background_tasks.add_task(some_async_function)
return {"message": "后臺(tái)任務(wù)添加成功"}

當(dāng)然上面這種方式也不合適長(zhǎng)期運(yùn)行耗時(shí)的任務(wù),所以引出我們本小結(jié)主要需要了解的Celery庫(kù),—芹菜. 使用Celery我們可以基于異步任務(wù)隊(duì)列的方式來(lái)處理異步任務(wù)。

Celery庫(kù)—芹菜.異步化的后臺(tái)任務(wù)實(shí)現(xiàn)方式

下面我一步一步的介紹如何基于Celery實(shí)現(xiàn)異步任務(wù)以及延遲任務(wù)。

首先我們從官網(wǎng)介紹總結(jié)一下關(guān)于Celery,它是一個(gè)基于Python的分布式任務(wù)隊(duì)列框架,用于實(shí)現(xiàn)異步任務(wù)的調(diào)度和執(zhí)行。它的主要作用是可以幫助我們將耗時(shí)的任務(wù)從主線程中分離出來(lái),不因任務(wù)的耗時(shí)而空等,以此來(lái)提高我們系統(tǒng)的并發(fā)性和響應(yīng)速度。

Celery是用來(lái)處理異步任務(wù),所以我們可以使用它來(lái)處理以下類(lèi)似的一些業(yè)務(wù)場(chǎng)景,例如發(fā)送電子郵件、生成報(bào)表、處理大量數(shù)據(jù)等。通過(guò)將這些任務(wù)放入任務(wù)隊(duì)列中,可以讓主線程繼續(xù)處理其他請(qǐng)求,而不需要等待任務(wù)完成。當(dāng)然Celery還提供定時(shí)任務(wù)的調(diào)度功能,可以讓我們按照設(shè)定的時(shí)間間隔或者時(shí)間點(diǎn)執(zhí)行任務(wù)。

Celery簡(jiǎn)單使用步驟

通常我們使用Celery需要進(jìn)行以下相關(guān)以下大致的幾個(gè)步驟:

  1. 實(shí)例化Celery對(duì)象來(lái)創(chuàng)建一個(gè)Celery應(yīng)用。
  2. 基于已實(shí)例化的任務(wù)的實(shí)例對(duì)象來(lái)定于該實(shí)例所包含的任務(wù):通常我們是使用裝飾器將函數(shù)注冊(cè)為Celery任務(wù),并設(shè)置任務(wù)的參數(shù)和返回值。(加入我們的是在相關(guān)的Fastapi框架進(jìn)行使用,通常我們需要做就是實(shí)例化對(duì)象并聲明任務(wù),并進(jìn)行任務(wù)的發(fā)布,通常是通過(guò)調(diào)用Celery應(yīng)用的apply_async()方法來(lái)提交任務(wù)到任務(wù)隊(duì)列中。)

3.當(dāng)我們的任務(wù)發(fā)布之后,任務(wù)會(huì)進(jìn)入一個(gè)消息隊(duì)列里面進(jìn)行等待,等待消費(fèi)者去消費(fèi),所以接下里我們需要啟動(dòng)Celery Worker:?jiǎn)?dòng)Celery Worker的消費(fèi)者對(duì)象來(lái)處理任務(wù)隊(duì)列中的任務(wù)。

例如:

1 定義任務(wù)和發(fā)布任務(wù)

主要我們這里使用消息代理的中間件是redis,所以是先啟動(dòng)我們的redis,

接著定義相關(guān)任務(wù),如下示例代碼:

from fastapi import FastAPI
from celery import Celery

app = FastAPI()
celery_app = Celery("tasks", broker="redis://localhost:6379/0")

# 定義任務(wù)
@celery_app.task
def some_celery_task():
# Celery任務(wù)的代碼
pass

@app.get("/celery_task")
async def celery_task():
# 開(kāi)始發(fā)布任務(wù)
some_celery_task.delay()
return {"message": "Celery 任務(wù)發(fā)布成功"}

if __name__ == "__main__":
# 使用os.path.basename函數(shù)獲取了當(dāng)前文件的名稱(chēng),并將.py文件擴(kuò)展名替換為空字符串\
# import os
# app_modeel_name = os.path.basename(__file__).replace(".py", "")
from pathlib import Path
# 使用Path函數(shù)獲取了當(dāng)前文件的名稱(chēng),并將.py文件擴(kuò)展名替換為空字符串\
# app_modeel_name = Path(__file__).name.replace(".py", "")
import uvicorn
import inspect
# 根據(jù)文件路徑返回模塊名
# print("app_modeel_name:",inspect.getmodulename(Path(__file__).name))
# 使用uvicorn.run函數(shù)運(yùn)行了一個(gè)應(yīng)用程序。它指定了應(yīng)用程序的主機(jī)和端口,并且設(shè)置了reload參數(shù)為T(mén)rue。
uvicorn.run(f"{inspect.getmodulename(Path(__file__).name)}:app", host='127.0.0.1', port=31110, reload=True,workers=1)

啟動(dòng)應(yīng)用并發(fā)布任務(wù):

FastAPI應(yīng)用程序,當(dāng)調(diào)用 some_celery_task.delay() 來(lái)發(fā)布任務(wù)時(shí),Celery將把任務(wù)放入Redis隊(duì)列中。

2 啟動(dòng)Celery worker進(jìn)程,進(jìn)行任務(wù)消費(fèi)

在命令行中,切換到您的項(xiàng)目目錄。啟動(dòng)Celery worker進(jìn)程。在命令行中運(yùn)行以下命令:

celery -A main.celery_app worker --loglevel=info

啟動(dòng)后得到如下圖所示的結(jié)果:

Celery worker進(jìn)程將從Redis隊(duì)列中獲取任務(wù),并執(zhí)行任務(wù)所定義的代碼。當(dāng)任務(wù)被執(zhí)行時(shí),您將在Celery worker進(jìn)程的日志中看到相關(guān)的日志消息。但是我們運(yùn)行時(shí)候遇到了問(wèn)題如下:

Did you remember to import the module containing this task?
Or maybe you're using relative imports?

Please see
https://docs.celeryq.dev/en/latest/internals/protocol.html
for more information.

The full contents of the message body was:
b'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)

The full contents of the message headers:
{'lang': 'py', 'task': 'main_celely.some_celery_task', 'id': 'e134d62c-0d8c-46b3-86a8-981411f41eac', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'e134d62c-0d8c-46b3-86a8-981411f41eac', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen15648@xiaozhong', 'ignore_result': False, 'stamped_headers': None, 'stamps': {}}

The delivery info for this task is:
{'exchange': '', 'routing_key': 'celery'}
Traceback (most recent call last):
File "D:\code_loacl\mm_ring_v2\venv\Lib\site-packages\celery\worker\consumer\consumer.py", line 642, in on_task_received
strategy = strategies[type_]
~~~~~~~~~~^^^^^^^

出現(xiàn)上述原因問(wèn)題在于我們的:

@celery_app.task
def some_celery_task():
# Celery任務(wù)的代碼
pass

沒(méi)有進(jìn)行相關(guān)實(shí)例綁定,我們可以修改為:

@celery_app.task(bind=True)
def some_celery_task(self):
# Celery任務(wù)的代碼
import time
time.sleep(10)
pass

然后每次發(fā)布任務(wù)的時(shí)候,重新觀察消費(fèi)者的任務(wù)輸出信息如下:

如上的輸出,表示我們的任務(wù)已經(jīng)被正常進(jìn)行消費(fèi)了!

Celery 詳解之相關(guān)參數(shù)項(xiàng)

在上面實(shí)例我們已簡(jiǎn)單完成相關(guān)異步任務(wù)的處理,通常我們一般需要使用提各種參數(shù)來(lái)配置和控制其行為。下面介紹一下Celery一些常用的配置項(xiàng):

PS:配置選項(xiàng)的名稱(chēng)和具體含義可能會(huì)因Celery的版本而有所不同。

  1. broker:
  2. backend:
  3. include:
  4. task_track_started:
  5. task_time_limit:
  6. task_soft_time_limit:
  7. task_acks_late:
  8. task_ignore_result:

除了之前提到的常用配置項(xiàng)外,以下是一些其他常見(jiàn)的Celery配置項(xiàng)的說(shuō)明:

  1. task_serializer:
  2. result_serializer:
  3. task_default_queue:
  4. task_default_exchange:
  5. task_default_routing_key:
  6. worker_concurrency:
  7. worker_prefetch_multiplier:
  8. beat_schedule:

······其他參數(shù)說(shuō)明,有需要我得話,我在去官方文檔查閱接口。接下里我,我們看看和它對(duì)于就是所謂得配置文件。配置文件其實(shí)和上面所謂得參數(shù)是沒(méi)有區(qū)別得,也就是說(shuō),我們可以使用其他的方式來(lái)定義我們的參數(shù)傳入,如這些配置項(xiàng)我們還可以可以在Celery的配置文件中進(jìn)行設(shè)置,通常命名為celeryconfig.py或celery.py。如下的示例代碼所示:

下面是一個(gè)使用配置文件的示例:

  1. 首先我們創(chuàng)建一個(gè)名為celeryconfig.py的配置文件,里面包含的內(nèi)容如下:
# celeryconfig.py

# celeryconfig.py
from datetime import timedelta
from celery.schedules import crontab

broker_url = 'amqp://guest:guest@localhost//' # 消息代理的URL
result_backend = 'redis://localhost:6379/0' # 結(jié)果存儲(chǔ)后端的URL
task_serializer = 'json' # 任務(wù)的序列化器
result_serializer = 'json' # 任務(wù)結(jié)果的序列化器
timezone = 'Asia/Shanghai' # 使用的時(shí)區(qū)
task_acks_late = True # 啟用延遲確認(rèn)
task_ignore_result = False # 不忽略任務(wù)結(jié)果
task_soft_time_limit = 60 # 任務(wù)的軟時(shí)間限制為60秒
task_time_limit = 120 # 任務(wù)的硬時(shí)間限制為120秒
worker_prefetch_multiplier = 4 # 任務(wù)執(zhí)行者的預(yù)取乘數(shù)
worker_concurrency = 8 # 每個(gè)任務(wù)執(zhí)行者的并發(fā)處理數(shù)量
worker_max_tasks_per_child = 100 # 每個(gè)任務(wù)執(zhí)行者最大處理任務(wù)數(shù)
task_default_priority = 0 # 任務(wù)的默認(rèn)優(yōu)先級(jí)
task_routes = {
'myapp.tasks.email_task': {'queue': 'email_queue'}, # 指定任務(wù)的隊(duì)列
'myapp.tasks.image_task': {'queue': 'image_queue'}
}
worker_hijack_root_logger = False # 不重寫(xiě)根日志記錄器
worker_disable_rate_limits = False # 不禁用任務(wù)速率限制
beat_schedule = {
'task1': {
'task': 'myapp.tasks.task1',
'schedule': crontab(minute='*/15'), # 每15分鐘執(zhí)行一次
},
'task2': {
'task': 'myapp.tasks.task2',
'schedule': timedelta(seconds=30), # 每30秒執(zhí)行一次
}
}

在你的Celery應(yīng)用程序中加載配置文件。

# celery_app.py

from celery import Celery

app = Celery('myapp')
app.config_from_object('celeryconfig')

在上述示例中,我們創(chuàng)建了一個(gè)名為celeryconfig.py的配置文件,并在其中設(shè)置了各種Celery的配置選項(xiàng)。然后,我們?cè)贑elery應(yīng)用程序的入口文件celery_app.py中加載了該配置文件。

你可以根據(jù)自己的需求和應(yīng)用程序的特定要求,在配置文件中進(jìn)行相應(yīng)的配置。注意,Celery的配置文件應(yīng)該位于與你的Celery應(yīng)用程序代碼相同的目錄中,或者可以通過(guò)正確的路徑進(jìn)行引用。

確保在啟動(dòng)Celery應(yīng)用程序時(shí),使用正確的配置文件加載配置。例如,通過(guò)以下命令啟動(dòng)Celery Worker:

celery -A celery_app worker --loglevel=info

PS: 通常win系統(tǒng)下可能你需要使用如下的方式啟動(dòng):

  #  D:\code_loacl\mm_ring_v2> celery -A src.tasks.app worker -n migutasks    --loglevel=info -P eventlet
# celery -A src.tasks.app worker --loglevel=info -P eventlet
# celery -A src.tasks.app flower --address=127.0.0.1 --port=5559
# celery -A tasks worker --loglevel=info -P eventlet

# 啟動(dòng)celery監(jiān)控和管理Flower:
# celery -A src.tasks.app flower --address=127.0.0.1 --port=5555

獲取使用代碼的方式啟動(dòng):

from src.tasks.app import celery_app

if __name__ == '__main__':
# python -m celery -A src.tasks.app worker -Q mm_ring_v2 --loglevel=info -P eventlet
celery_app.worker_main(argv=['-A', 'src.tasks.app','worker','--loglevel=info', '-Q', 'mm_ring_v2','-P', 'eventlet'])

這樣,Celery將會(huì)根據(jù)配置文件中的設(shè)置來(lái)運(yùn)行任務(wù)調(diào)度和執(zhí)行過(guò)程。

Celery延時(shí)任務(wù)的執(zhí)行

延遲任務(wù)場(chǎng)景我們也通常會(huì)遇到,比如超時(shí)未支付則自當(dāng)取消訂單等。下面舉例說(shuō)明一下具體的實(shí)現(xiàn)過(guò)程:

基本任務(wù)項(xiàng)目結(jié)果:

1:定義Celery實(shí)例對(duì)象

from celery.signals import task_failure, task_retry, task_success
from celery.exceptions import MaxRetriesExceededError
from celery import Task

from src.tasks.service import TaskFactory

# 任務(wù)工廠,可以創(chuàng)建多個(gè),目前只是創(chuàng)建了一個(gè)celery_app對(duì)象
factory = TaskFactory()
# 創(chuàng)建第一個(gè) Celery 應(yīng)用
celery_app = factory.create_celery_app(namespces='mm_sync_tasks', config_name='src.tasks.config.ProductionConfig')
celery_app.conf["worker_redirect_stdouts"] = False # 禁止重定向工作進(jìn)程的標(biāo)準(zhǔn)輸出和標(biāo)準(zhǔn)錯(cuò)誤流

def get_pending_tasks():
with celery_app.connection() as connection:
tasks = connection.default_channel.queue_declare(queue='celery', passive=True).message_count
return tasks

# 移除任務(wù)隊(duì)列中的所有等待執(zhí)行的任務(wù)
def remove_pending_tasks():
with celery_app.connection() as connection:
connection.default_channel.queue_purge(queue='celery')

# 重新調(diào)度任務(wù)
def reschedule_tasks():
pending_tasks = get_pending_tasks()
remove_pending_tasks()

# task_prerun:任務(wù)開(kāi)始運(yùn)行前觸發(fā)的信號(hào)。
# task_postrun:任務(wù)運(yùn)行完成后觸發(fā)的信號(hào)。
# task_success:任務(wù)成功完成時(shí)觸發(fā)的信號(hào)。
# task_failure:任務(wù)失敗時(shí)觸發(fā)的信號(hào)。
# task_retry:任務(wù)重試時(shí)觸發(fā)的信號(hào)。
# task_revoked:任務(wù)被撤銷(xiāo)時(shí)觸發(fā)的信號(hào)。
# task_rejected:任務(wù)被拒絕時(shí)觸發(fā)的信號(hào)。
# worker_ready:Worker準(zhǔn)備就緒時(shí)觸發(fā)的信號(hào)。

@task_retry.connect
def handle_task_failure(sender: Task, task_id: str, exception: Exception, traceback, einfo, **kwargs):
if isinstance(exception, MaxRetriesExceededError):
# 處理 MaxRetriesExceededError 異常
print("全局異常錯(cuò)誤獲取!!", sender, task_id)
pass

@task_retry.connect
def handle_task_failure(sender: Task, task_id: str, exception: Exception, traceback, einfo, **kwargs):
if isinstance(exception, MaxRetriesExceededError):
# 處理 MaxRetriesExceededError 異常
print("全局異常錯(cuò)誤獲取!!", sender, task_id)
pass

@task_failure.connect
def handle_task_failure(sender: Task, task_id: str, exception: Exception, traceback, einfo, **kwargs):
# # 處理任務(wù)失敗的邏輯
if isinstance(exception, MaxRetriesExceededError):
# 處理 MaxRetriesExceededError 異常
print("全局異常錯(cuò)誤獲取!!", sender, task_id)
pass

@task_success.connect
def handle_task_success(sender: Task, result, **kwargs):
# 處理任務(wù)成功完成的邏輯
print("handle_task_success!", sender, result)
pass

if __name__ == '__main__':
pass
# D:\code_loacl\mm_ring_v2> celery -A src.tasks.app worker -n migutasks --loglevel=info -P eventlet
# celery -A src.tasks.app worker --loglevel=info -P eventlet
# celery -A src.tasks.app flower --address=127.0.0.1 --port=5559
# celery -A tasks worker --loglevel=info -P eventlet

# 啟動(dòng)celery監(jiān)控和管理Flower:
# celery -A src.tasks.app flower --address=127.0.0.1 --port=5555

定義實(shí)例配置信息:

# !/usr/bin/envpython
# -*-coding:UTF-8-*-
'''
@File : __init__.py.py
@Contact : 308711822@qq.com
@License : (C) Copyright 2021-2225, Personal exclusive right.

@Modify Time @Author @Version @Desciption
------------ ------- -------- -----------
2023/6/16 11:25 小鐘同學(xué) 1.0 None
'''
from kombu import Queue, Exchange

class Config:
# broker = 'redis://127.0.0.1:6379/1' # 任務(wù)儲(chǔ)存
# backend = 'redis://127.0.0.1:6379/2' # 結(jié)果存儲(chǔ),執(zhí)行完之后結(jié)果放在這
BROKER_URL = 'redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'

CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True

# 全局的任務(wù)過(guò)期時(shí)間
# app.conf.update(result_expires=60)

# 定義隊(duì)列名稱(chēng)
CELERY_QUEUES = (
Queue('mm_ring_v2'),
)
# CELERY_QUEUES = (
# Queue('default', exchange=Exchange('default'), routing_key='default'),
# Queue('app_task1', exchange=Exchange('app_task1'), routing_key='app_task1'),
# Queue('app_task2', exchange=Exchange('app_task2'), routing_key='app_task2'),
# )
# 指定任務(wù)走什么對(duì)了和routing_key
# CELERY_ROUTES = {
# 'src.tasks.task.migu_order_sync_status': {'queue': 'app_task1', 'routing_key': 'app_task1'},
# 'celery_app.task.task2': {'queue': 'app_task2', 'routing_key': 'app_task2'}
# }
# 指定需要加載的任務(wù)
# 指定要導(dǎo)入的任務(wù)模塊或任務(wù)文件列表
# celery_app.conf.update(
# include=[
# 'src.tasks.task.migu_order_sync_status',
# ]
# )

# 配置需要執(zhí)行的任務(wù)所在的目錄
CELERY_INCLUDE = [
'src.tasks.task.migu_order_sync_status',
]

class DevelopmentConfig(Config):
DEBUG = True

class ProductionConfig(Config):
DEBUG = False

2:定義任務(wù):

import datetime
import signal

from celery import current_app
from celery.result import AsyncResult

from pydantic import BaseModel

def exponential_backoff(retries):
return 2 ** retries

# 定義延遲執(zhí)行的任務(wù)
# expires=3600 將任務(wù)結(jié)果的存儲(chǔ)有效期設(shè)置為 1 小時(shí)。
@celery_app.task(bind=True, max_retries=5, default_retry_delay=1, retry_backoff=exponential_backoff(2), expires=3660)
def action_migu_order_sync_status(self, orderid, mobile):
# self.request.retries 表示當(dāng)前重試的次數(shù)
······

需要說(shuō)明一點(diǎn)是,@celery_app.task:這是一個(gè)裝飾器,用于將一個(gè)普通的Python函數(shù)注冊(cè)為Celery任務(wù)。celery_app 是你的 Celery 應(yīng)用實(shí)例,task 是 Celery 提供的裝飾器函數(shù)。且這個(gè)的任務(wù)相關(guān)參數(shù)項(xiàng)說(shuō)明如下

3:在API接口發(fā)布任務(wù)

@router.get("/put/code", summary="訂單提交")
def callback(*, forms: PutCodeForm = Depends()):
.....
# 發(fā)布任務(wù)
result = action_migu_order_sync_status.apply_async(args=(Orders.orderid, Orders.mobile), countdown=60, retry=5,eta=eta, expires=expires,queue='mm_ring_v2')

if result == -1:
return Fail(message='提交失敗')

return Success(message='提交成功')

各個(gè)個(gè)參數(shù)項(xiàng)說(shuō)明如下:

  1. args=(Orders.orderid, Orders.mobile):這個(gè)參數(shù)用于指定要傳遞給 Celery 任務(wù)的位置參數(shù),即任務(wù)函數(shù)在執(zhí)行時(shí)所需的參數(shù)值。在這個(gè)例子中,任務(wù)函數(shù)需要兩個(gè)參數(shù),分別對(duì)應(yīng) Orders.orderid 和 Orders.mobile
  2. countdown=60:指定任務(wù)在被放入隊(duì)列之后需要延遲多少秒才開(kāi)始執(zhí)行。在這個(gè)例子中,任務(wù)會(huì)在被加入隊(duì)列后延遲 60 秒后開(kāi)始執(zhí)行。
  3. retry=5:指定任務(wù)在發(fā)生錯(cuò)誤時(shí)最多重試的次數(shù)。與之前提到的 max_retries 類(lèi)似,但這里是針對(duì)單次任務(wù)調(diào)用的重試次數(shù)。
  4. eta=eta:這個(gè)參數(shù)用于指定任務(wù)的預(yù)計(jì)執(zhí)行時(shí)間。通常用于將任務(wù)調(diào)度到未來(lái)的某個(gè)時(shí)間點(diǎn)執(zhí)行,而不是立即執(zhí)行。
  5. expires=expires:同樣的意義,指定任務(wù)的過(guò)期時(shí)間,單位為秒。如果任務(wù)在指定的時(shí)間內(nèi)沒(méi)有被執(zhí)行,將會(huì)被標(biāo)記為過(guò)期并丟棄。
  6. queue='mm_ring_v2':指定任務(wù)被發(fā)送到的隊(duì)列名稱(chēng)。Celery 支持將任務(wù)發(fā)送到不同的隊(duì)列中,以便進(jìn)行任務(wù)的分類(lèi)和分配。

綜合起來(lái),這些參數(shù)的設(shè)置使得對(duì)該 Celery 任務(wù)的調(diào)用具有了一定的靈活性。你可以控制任務(wù)的延遲執(zhí)行、重試次數(shù)、預(yù)計(jì)執(zhí)行時(shí)間以及過(guò)期時(shí)間,并且可以指定任務(wù)發(fā)送到哪個(gè)隊(duì)列中。

4:?jiǎn)?dòng)消費(fèi)者

if __name__ == '__main__':
pass
# D:\code_loacl\mm_ring_v2> celery -A src.tasks.app worker -n migutasks --loglevel=info -P eventlet
# celery -A src.tasks.app worker --loglevel=info -P eventlet
# celery -A src.tasks.app flower --address=127.0.0.1 --port=5559
# celery -A tasks worker --loglevel=info -P eventlet

5:?jiǎn)?dòng)celery監(jiān)控和管理Flower

本文章轉(zhuǎn)載微信公眾號(hào)@程序員小鐘同學(xué)

上一篇:

FastAPI-Cache2:一個(gè)讓接口飛起來(lái)的緩存神器

下一篇:

Flask-RESTful:最強(qiáng)Python Web服務(wù)框架,輕松構(gòu)建REST API
#你可能也喜歡這些API文章!

我們有何不同?

API服務(wù)商零注冊(cè)

多API并行試用

數(shù)據(jù)驅(qū)動(dòng)選型,提升決策效率

查看全部API→
??

熱門(mén)場(chǎng)景實(shí)測(cè),選對(duì)API

#AI文本生成大模型API

對(duì)比大模型API的內(nèi)容創(chuàng)意新穎性、情感共鳴力、商業(yè)轉(zhuǎn)化潛力

25個(gè)渠道
一鍵對(duì)比試用API 限時(shí)免費(fèi)

#AI深度推理大模型API

對(duì)比大模型API的邏輯推理準(zhǔn)確性、分析深度、可視化建議合理性

10個(gè)渠道
一鍵對(duì)比試用API 限時(shí)免費(fèi)