$ loading_
帮助你在 Django 应用中设计、配置并测试 Celery 异步任务与定时作业。
复制安装指令,让 AI 自动完成配置 · 推荐新手
请帮我安装 askskill 上的 "django-celery" 技能: 1. 下载 https://raw.githubusercontent.com/affaan-m/ECC/main/skills/django-celery/SKILL.md 2. 保存为 ~/.claude/skills/django-celery/SKILL.md 3. 装好后重载技能,告诉我可以用了
请为 Django 项目设计一个 Celery 异步邮件发送方案,包含 Celery 与 Django 的基础配置、任务定义、调用方式、失败重试策略,以及本地开发与生产环境的建议。
一套可落地的 Django + Celery 邮件异步处理方案,含配置示例、任务代码结构与重试建议。
请说明如何在 Django 中使用 Celery Beat 配置定时任务,例如每天凌晨同步订单数据;请给出配置方法、任务示例、时区注意事项,以及避免任务重复执行的建议。
关于 Celery Beat 周期任务的完整实现说明,包括调度配置、示例代码与运行注意事项。
请为 Django + Celery 项目制定后台任务的测试与监控方案,涵盖单元测试、集成测试、任务状态追踪、失败告警,以及 Flower 或其他监控工具的使用建议。
一份面向 Django + Celery 的测试监控实践清单,帮助提升任务可靠性与可观测性。
Production-grade patterns for background task processing in Django using Celery with Redis or RabbitMQ.
pip install 'celery[redis]' django-celery-results django-celery-beat
celery.py — App Entrypoint# config/celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.development')
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks() # Discovers tasks.py in each INSTALLED_APP
@app.task(bind=True, ignore_result=True)
def debug_task(self):
print(f'Request: {self.request!r}')
# config/__init__.py
from .celery import app as celery_app
__all__ = ('celery_app',)
# config/settings/base.py
# Broker (Redis recommended for production)
CELERY_BROKER_URL = env('CELERY_BROKER_URL', default='redis://localhost:6379/0')
CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND', default='django-db')
# Serialization
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
# Task behavior
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60 # Hard limit: 30 min
CELERY_TASK_SOFT_TIME_LIMIT = 25 * 60 # Soft limit: sends SoftTimeLimitExceeded
CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # Prevent worker hoarding long tasks
CELERY_TASK_ACKS_LATE = True # Re-queue on worker crash
# Result persistence
CELERY_RESULT_EXPIRES = 60 * 60 * 24 # Keep results 24 hours
# Beat scheduler (for periodic tasks)
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
# Installed apps
INSTALLED_APPS += [
'django_celery_results',
'django_celery_beat',
]
# Start worker (development)
celery -A config worker --loglevel=info
# Start beat scheduler (periodic tasks)
celery -A config beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler
# Combined worker + beat (dev only, never production)
celery -A config worker --beat --loglevel=info
# Production: multiple workers with concurrency
celery -A config worker --loglevel=warning --concurrency=4 -Q default,high_priority
# apps/notifications/tasks.py
from celery import shared_task
import logging
logger = logging.getLogger(__name__)
@shared_task(name='notifications.send_welcome_email')
def send_welcome_email(user_id: int) -> None:
"""Send welcome email to newly registered user."""
from apps.users.models import User
from apps.notifications.services import EmailService
try:
user = User.objects.get(pk=user_id)
except User.DoesNotExist:
logger.warning('send_welcome_email: user %s not found', user_id)
return # Idempotent — do not raise, task already impossible to complete
EmailService.send_welcome(user)
logger.info('Welcome email sent to user %s', user_id)
@shared_task(
bind=True,
name='integrations.sync_to_crm',
max_retries=5,
default_retry_delay=60, # seconds before first retry
autoretry_for=(ConnectionError, TimeoutError),
retry_backoff=True, # exponential backoff
retry_backoff_max=600, # cap at 10 minutes
retry_jitter=True, # randomise to avoid thundering herd
)
def sync_contact_to_crm(self, contact_id: int) -> dict:
"""Sync contact to external CRM with retry on transient failures."""
from apps.crm.services import CRMClient
try:
…
为 Quarkus 项目执行发布前验证闭环,涵盖构建、测试、扫描与差异审查。
为 Django 项目执行发布前校验流程,涵盖迁移、测试、安全与部署检查。