歡迎您光臨本站 註冊首頁

Python celery原理及運行流程解析

←手機掃碼閱讀     zmcjlove @ 2020-06-14 , reply:0

celery簡介

celery是一個基於分佈式消息傳輸的異步任務隊列,它專注於實時處理,同時也支持任務調度。它的執行單元為任務(task),利用多線程,如Eventlet,gevent等,它們能被併發地執行在單個或多個職程服務器(worker servers)上。任務能異步執行(後臺運行)或同步執行(等待任務完成)。

在生產系統中,celery能夠一天處理上百萬的任務。它的完整架構圖如下:

組件介紹:

  • Producer:調用了Celery提供的API、函數或者裝飾器而產生任務並交給任務隊列處理的都是任務生產者。

  • Celery Beat:任務調度器,Beat進程會讀取配置文件的內容,週期性地將配置中到期需要執行的任務發送給任務隊列。

  • Broker:消息代理,又稱消息中間件,接受任務生產者發送過來的任務消息,存進隊列再按序分發給任務消費方(通常是消息隊列或者數據庫)。Celery目前支持RabbitMQ、Redis、MongoDB、Beanstalk、SQLAlchemy、Zookeeper等作為消息代理,但適用於生產環境的只有RabbitMQ和Redis, 官方推薦 RabbitMQ。

  • Celery Worker:執行任務的消費者,通常會在多臺服務器運行多個消費者來提高執行效率。

  • Result Backend:任務處理完後保存狀態信息和結果,以供查詢。Celery默認已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。

工作原理
 

它的基本工作就是管理分配任務到不同的服務器,並且取得結果。至於說服務器之間是如何進行通信的?這個Celery本身不能解決。所以,RabbitMQ作為一個消息隊列管理工具被引入到和Celery集成,負責處理服務器之間的通信任務。和rabbitmq的關係只是在於,celery沒有消息存儲功能,他需要介質,比如rabbitmq、redis、mysql、mongodb 都是可以的。推薦使用rabbitmq,他的速度和可用性都很高。

Celery安裝及使用
 

1、安裝celery

pip install celery
 

2、查看完整可用命令選項

celery worker --help
 

3、創建一個工程項目project,然後再項目內創建一個celery_tasks異步任務列表。如圖:

4、首先是celery_tasks異步任務主程序main.py,代碼如下:

  from celery import Celery  # 生成celery應用  celery_app = Celery("caicai")  # 加載配置文件  celery_app.config_from_object('celery_tasks.config')  # 註冊任務  celery_app.autodiscover_tasks(['celery_tasks.email']) # 注意:傳遞的參數是任務列表

 

分析一下這個程序:

  • "from celery import Celery"是導入celery中的Celery類。celery_app

  • celery_app是Celery類的實例。

  • 把Celery配置存放進project/config.py文件,使用celery_app.config_from_object加載配置。

  • 將任務註冊到應用中

5、接著是配置文件config.py,代碼如下:

  BROKER_URL = 'redis://localhost:6379/1' # 使用Redis作為消息代理    CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 把任務結果存在了Redis    # CELERY_TASK_SERIALIZER = 'msgpack' # 任務序列化和反序列化使用msgpack方案    CELERY_RESULT_SERIALIZER = 'json' # 讀取任務結果一般性能要求不高,所以使用了可讀性更好的JSON    CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # celery任務結果有效期    CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # 指定接受的內容類型    CELERY_TIMEZONE = 'Asia/Shanghai'       # celery使用的時區  CELERY_ENABLE_UTC = True            # 啟動時區設置  CELERYD_LOG_FILE = "/var/log/celery/celery.log"   # celery日誌存儲位置

 

6、創建email目錄,目錄下創建tesks.py文件用來編寫發送郵件的代碼,代碼如下:

  import time  from celery_tasks.main import celery_app  @celery_app.task(name='seed_email')   # 添加celery_app.task這個裝飾器,指定該任務的任務名name='seed_email'  def seed():    time.sleep(1)    return "我將發送郵件"

 

7、在項目app.py中,採用delay()用來調用任務。

  from celery_tasks.email.tasks import seed  seed.delay()  seed.delay()  seed.delay()  seed.delay()  seed.delay()

 

8、項目運行

  首先,我們需要啟動redis。接著,切換至proj項目所在目錄,並運行命令:

celery -A celery_tasks.main worker -l info

  界面如下:

然後,我們運行app.py,app.py調用添加異步任務,輸出的結果如下:



[zmcjlove ] Python celery原理及運行流程解析已經有272次圍觀

http://coctec.com/docs/python/shhow-post-238537.html