歡迎您光臨本站 註冊首頁

如何使用Celery和Docker處理Django中的定期任務

←手機掃碼閱讀     qp18502452 @ 2020-05-03 , reply:0

在構建和擴展Django應用程序時,不可避免地需要定期在後臺自動運行某些任務。
一些例子:
生成定期報告
清除緩存
發送批量電子郵件通知
執行每晚維護工作
這是構建和擴展不屬於Django核心的Web應用程序所需的少數功能之一。幸運的是,Celery提供了一個強大的解決方案,該解決方案非常容易實現,稱為Celery Beat。
在下面的文章中,我們將向您展示如何使用Docker設置Django,Celery和Redis,以便通過Celery Beat定期運行自定義Django Admin命令。
依存關係:
Django v3.0.5
Docker v19.03.8
Python v3.8.2
芹菜v4.4.1
Redis v5.0.8
Django + Celery系列:
Django和Celery的異步任務
使用Celery和Docker處理Django中的定期任務(本文!)
目標
在本教程結束時,您應該能夠:
使用Docker容器化Django,Celery和Redis
將Celery集成到Django應用中並創建任務
編寫自定義Django Admin命令
安排自定義Django Admin命令以通過Celery Beat定期運行
項目設置
從 django-celery-beat 存儲庫中克隆基礎項目,然後簽出基礎分支:
$ git clone
https://github.com/testdrivenio/django-celery-beat
--branch base --single-branch
$ cd django-celery-beat
由於我們總共需要管理四個流程(Django,Redis,worker和Scheduler),因此我們將使用Docker通過連接起來簡化它們的工作流程,從而使它們都可以通過一個命令從一個終端窗口運行 。
從項目根目錄創建映像,並啟動Docker容器:
$ docker-compose up -d --build $ docker-compose exec web python manage.py migrate
構建完成後,導航至http:// localhost:1337以確保該應用程序能夠按預期運行。 您應該看到以下文本:
Orders
No orders found!
項目結構:
├── .gitignore
├── docker-compose.yml
└── project
├── Dockerfile
├── core
│ ├── __init__.py
│ ├── asgi.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
├── entrypoint.sh
├── manage.py
├── orders
│ ├── __init__.py
│ ├── admin.py
│ ├── apps.py
│ ├── migrations
│ │ ├── 0001_initial.py
│ │ └── __init__.py
│ ├── models.py
│ ├── tests.py
│ ├── urls.py
│ └── views.py
├── requirements.txt
└── templates
└── orders
└── order_list.html
Celery和Redis
現在,我們需要為Celery,Celery Beat和Redis添加容器。
首先,將依賴項添加到requirements.txt文件中:
Django==3.0.5 celery==4.4.1 redis==3.4.1
docker-compose.yml 文件內容:
redis: image: redis:alpine celery: build: ./project command: celery -A core worker -l info volumes: - ./project/:/usr/src/app/ environment: - DEBUG=1 - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1] depends_on: - redis celery-beat: build: ./project command: celery -A core beat -l info volumes: - ./project/:/usr/src/app/ environment: - DEBUG=1 - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1] depends_on: - redis
我們還需要更新Web服務的depends_on部分:
web: build: ./project command: python manage.py runserver 0.0.0.0:8000 volumes: - ./project/:/usr/src/app/ ports: - 1337:8000 environment: - DEBUG=1 - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1] depends_on: - redis # NEW
完整的docker-compose文件如下:
version: '3.7' services: web: build: ./project command: python manage.py runserver 0.0.0.0:8000 volumes: - ./project/:/usr/src/app/ ports: - 1337:8000 environment: - DEBUG=1 - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1] depends_on: - redis redis: image: redis:alpine celery: build: ./project command: celery -A core worker -l info volumes: - ./project/:/usr/src/app/ environment: - DEBUG=1 - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1] depends_on: - redis celery-beat: build: ./project command: celery -A core beat -l info volumes: - ./project/:/usr/src/app/ environment: - DEBUG=1 - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1] depends_on: - redis
在構建新容器之前,我們需要在Django應用中配置Celery。
芹菜配置
設定
在“核心”目錄中,創建一個celery.py文件並添加以下代碼:
import os from celery import Celery os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings") app = Celery("core") app.config_from_object("django.conf:settings", namespace="CELERY") app.autodiscover_tasks()
這裡發生了什麼事?
首先,我們為DJANGO_SETTINGS_MODULE環境變量設置一個默認值,以便Celery知道如何找到Django項目。
接下來,我們創建了一個名稱為core的新Celery實例,並將該值分配給名為app的變量。
然後,我們從django.conf的settings對象中加載了celery配置值。 我們使用namespace =“ CELERY”來防止與其他Django設置發生衝突。 換句話說,Celery的所有配置設置必須以CELERY_為前綴。
最後, app.autodiscover_tasks() 告訴Celery從settings.INSTALLED_APPS中定義的應用程序中查找Celery任務。
將以下代碼添加到core / __ init__.py:
from .celery import app as celery_app __all__ = ("celery_app",)
最後,使用以下Celery設置更新core / settings.py文件,使其可以連接到Redis:
CELERY_BROKER_URL = "redis://redis:6379" CELERY_RESULT_BACKEND = "redis://redis:6379"
build:
$ docker-compose up -d --build
查看日誌:
$ docker-compose logs 'web' $ docker-compose logs 'celery' $ docker-compose logs 'celery-beat' $ docker-compose logs 'redis'
如果一切順利,我們現在有四個容器,每個容器提供不同的服務。
現在,我們準備創建一個示例任務,以查看其是否可以正常工作。
創建一個任務
創建一個新文件core / tasks.py併為僅打印到控制檯的示例任務添加以下代碼:
from celery import shared_task @shared_task def sample_task(): print("The sample task just ran.")
安排任務
在settings.py文件的末尾,添加以下代碼,以使用Celery Beat將sample_task安排為每分鐘運行一次:
CELERY_BEAT_SCHEDULE = { "sample_task": { "task": "core.tasks.sample_task", "schedule": crontab(minute="*/1"), }, }
在這裡,我們使用CELERY_BEAT_SCHEDULE設置定義了定期任務。 我們給任務命名了sample_task,然後聲明瞭兩個設置:
任務聲明要運行的任務。
時間表設置任務應運行的時間間隔。 這可以是整數,時間增量或crontab。 我們在任務中使用了crontab模式,告訴它每分鐘運行一次。 您可以在此處找到有關Celery日程安排的更多信息。
確保添加導入:
from celery.schedules import crontab import core.tasks
重啟容器,應用變更:
$ docker-compose up -d --build
查看日誌:
$ docker-compose logs -f 'celery' celery_1 | -------------- [queues] celery_1 | .> celery exchange=celery(direct) key=celery celery_1 | celery_1 | celery_1 | [tasks] celery_1 | . core.tasks.sample_task
我們可以看到Celery獲得了示例任務core.tasks.sample_task。
每分鐘,您應該在日誌中看到一行以“示例任務剛剛運行”結尾的行:
celery_1 | [2020-04-15 22:49:00,003: INFO/MainProcess]
Received task: core.tasks.sample_task[8ee5a84f-c54b-4e41-945b-645765e7b20a]
celery_1 | [2020-04-15 22:49:00,007: WARNING/ForkPoolWorker-1] The sample task just ran.
自定義Django Admin命令
Django提供了許多內置的django-admin命令,例如:
遷移
啟動項目
startapp
轉儲數據
移民
除了內置命令,Django還為我們提供了創建自己的自定義命令的選項:
自定義管理命令對於運行獨立腳本或從UNIX crontab或Windows計劃任務控制面板定期執行的腳本特別有用。
因此,我們將首先配置一個新命令,然後使用Celery Beat自動運行它。
首先創建一個名為orders / management / commands / my_custom_command.py的新文件。 然後,添加運行它所需的最少代碼:
from django.core.management.base import BaseCommand, CommandError class Command(BaseCommand): help = "A description of the command" def handle(self, *args, **options): pass
BaseCommand有一些可以被覆蓋的方法,但是唯一需要的方法是handle。 handle是自定義命令的入口點。 換句話說,當我們運行命令時,將調用此方法。
為了進行測試,我們通常只添加一個快速打印語句。 但是,建議根據Django文檔使用stdout.write代替:
當您使用管理命令並希望提供控制檯輸出時,應該寫入self.stdout和self.stderr,而不是直接打印到stdout和stderr。 通過使用這些代理,測試自定義命令變得更加容易。 另請注意,您無需以換行符結束消息,除非您指定結束參數,否則它將自動添加。
因此,添加一個self.stdout.write命令:
from django.core.management.base import BaseCommand, CommandError class Command(BaseCommand): help = "A description of the command" def handle(self, *args, **options): self.stdout.write("My sample command just ran.") # NEW
測試:
$ docker-compose exec web python manage.py my_custom_command My sample command just ran.
這樣,讓我們將所有內容捆綁在一起!
使用Celery Beat安排自定義命令
現在我們已經啟動並運行了容器,已經過測試,可以安排任務定期運行,並編寫了自定義的Django Admin示例命令,現在該進行設置以定期運行自定義命令了。
設定
在項目中,我們有一個非常基本的應用程序,稱為訂單。 它包含兩個模型,產品和訂單。 讓我們創建一個自定義命令,該命令從當天發送確認訂單的電子郵件報告。
首先,我們將通過此項目中包含的夾具將一些產品和訂單添加到數據庫中:
$ docker-compose exec web python manage.py loaddata products.json
創建超級用戶:
$ docker-compose exec web python manage.py createsuperuser
出現提示時,請填寫用戶名,電子郵件和密碼。 然後在您的Web瀏覽器中導航到http://127.0.0.1:1337/admin。 使用您剛創建的超級用戶登錄,並創建幾個訂單。 確保至少有一個日期為今天。
讓我們為我們的電子郵件報告創建一個新的自定義命令。
創建一個名為orders / management / commands / email_report.py的文件:
from datetime import timedelta, time, datetime from django.core.mail import mail_admins from django.core.management import BaseCommand from django.utils import timezone from django.utils.timezone import make_aware from orders.models import Order today = timezone.now() tomorrow = today + timedelta(1) today_start = make_aware(datetime.combine(today, time())) today_end = make_aware(datetime.combine(tomorrow, time())) class Command(BaseCommand): help = "Send Today's Orders Report to Admins" def handle(self, *args, **options): orders = Order.objects.filter(confirmed_date__range=(today_start, today_end)) if orders: message = "" for order in orders: message += f"{order}
" subject = ( f"Order Report for {today_start.strftime('%Y-%m-%d')} " f"to {today_end.strftime('%Y-%m-%d')}" ) mail_admins(subject=subject, message=message, html_message=None) self.stdout.write("E-mail Report was sent.") else: self.stdout.write("No orders confirmed today.")
在代碼中,我們向數據庫查詢了日期為Confirmed_date的訂單,將訂單合併為電子郵件正文的單個消息,然後使用Django內置的mail_admins命令將電子郵件發送給管理員。
添加一個虛擬管理員電子郵件,並將EMAIL_BACKEND設置為使用控制檯後端,以便將該電子郵件發送到設置文件中的stdout:
EMAIL_BACKEND = "django.core.mail.backends.console.EmailBackend" DEFAULT_FROM_EMAIL = "noreply@email.com" ADMINS = [("testuser", "test.user@email.com"), ]
運行:
$ docker-compose exec web python manage.py email_report
Content-Type: text/plain; charset="utf-8"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
Subject: [Django] Order Report for 2020-04-15 to 2020-04-16
From: root@localhost
To: test.user@email.com
Date: Wed, 15 Apr 2020 23:10:45 -0000
Message-ID:<158699224565.85.8278261495663971825@5ce6313185d3>

Order: 337ef21c-5f53-4761-9f81-07945de385ae - product: Rice

-------------------------------------------------------------------------------
E-mail Report was sent.
Celery Beat
現在,我們需要創建一個定期任務來每天運行此命令。
向core / tasks.py添加一個新任務:
from celery import shared_task from django.core.management import call_command # NEW @shared_task def sample_task(): print("The sample task just ran.") # NEW @shared_task def send_email_report(): call_command("email_report", )
因此,首先我們添加了一個call_command導入,該導入用於以編程方式調用django-admin命令。 在新任務中,然後將call_command與自定義命令的名稱一起用作參數。
要安排此任務,請打開core / settings.py文件,並更新CELERY_BEAT_SCHEDULE設置以包括新任務。
CELERY_BEAT_SCHEDULE = { "sample_task": { "task": "core.tasks.sample_task", "schedule": crontab(minute="*/1"), }, "send_email_report": { "task": "core.tasks.send_email_report", "schedule": crontab(hour="*/1"), }, }
在這裡,我們向CELERY_BEAT_SCHEDULE添加了一個名為send_email_report的新條目。 正如我們對上一個任務所做的那樣,我們聲明瞭該任務應運行的任務-例如core.tasks.send_email_report-並使用crontab模式設置重複性。
重新啟動容器,以確保新設置處於活動狀態:
$ docker-compose up -d --build 看日誌: $ docker-compose logs -f 'celery' celery_1 | -------------- [queues] celery_1 | .> celery exchange=celery(direct) key=celery celery_1 | celery_1 | celery_1 | [tasks] celery_1 | . core.tasks.sample_task celery_1 | . core.tasks.send_email_report
一分鐘後郵件發出:
celery_1 | [2020-04-15 23:20:00,309: WARNING/ForkPoolWorker-1] Content-Type: text/plain; charset="utf-8"
celery_1 | MIME-Version: 1.0
celery_1 | Content-Transfer-Encoding: 7bit
celery_1 | Subject: [Django] Order Report for 2020-04-15 to 2020-04-16
celery_1 | From: root@localhost
celery_1 | To: test.user@email.com
celery_1 | Date: Wed, 15 Apr 2020 23:20:00 -0000
celery_1 | Message-ID:<158699280030.12.8934112422500683251@42481c198b77>
celery_1 |
celery_1 | Order: 337ef21c-5f53-4761-9f81-07945de385ae - product: Rice
celery_1 | [2020-04-15 23:20:00,310: WARNING/ForkPoolWorker-1] -------------------------------------------------------------------------------
celery_1 | [2020-04-15 23:20:00,312: WARNING/ForkPoolWorker-1] E-mail Report was sent.
結論
在本文中,我們指導您為Celery,Celery Beat和Redis設置Docker容器。 然後,我們展示瞭如何使用Celery Beat創建自定義Django Admin命令和定期任務以自動運行該命令。
原文:https://testdriven.io/blog/django-celery-periodic-tasks/


[qp18502452 ] 如何使用Celery和Docker處理Django中的定期任務已經有495次圍觀

http://coctec.com/docs/docker/show-post-232704.html