阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

高阶应用-celery

116次阅读
没有评论

共计 2917 个字符,预计需要花费 8 分钟才能阅读完成。

一、问题

用户发起 request,并且要等待 response 返回。但是在视图中有一些耗时的操作,导致用户可能会等待很长时间才能接受 response,这样用户体验很差

网站每隔一段时间要同步一次数据,但是 http 请求是需要触发的

celery 网址:http://docs.jinkan.org/docs/celery/

二、celery 模块包含

  • 任务 task

    本质是一个 python 函数,将耗时操作封装成一个函数

  • 队列 queue

    将要执行的任务放队列里

  • 工人 worker

    负责执行队列中的任务

  • 代理 broker

    负责调度,在部署环境中使用 redis

三、解决

  • 将耗时的操作放到 celery 中执行
  • 定时执行

四、安装

  • pip install celery==3.1.26
  • pip install celery-with-redis=3.0
  • pip install django-celery=3.3.1
  • pip install redis==2.10.6

五、配置 settings.py

  • INSTALLED_APPS 添加

    djcelery

  • 在 settings 下方添加如下代码

    import djcelery djcelery.setup_loader()# 初始化 BROKER_URL='redis://: 密码 @127.0.0.1:6379/0'#0 带表 16 个库中使用第 0 个库 CELERY_IMPORTS=('App.task') #myApp 是项目名

六、创建 task.py 文件

路径 App/task.py

from celery import task import time @task def tt(): print("lucky is a good man") time.sleep(5) print("lucky is a nice man")

七、迁移,生成 celery 需要的数据库表

python manage.py migrate

八、在工程 project 目录下创建 celery.py 的文件

from __future__ import absolute_import import os from celery import Celery from django.conf import settings os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'whthas_home.settings') app = Celery('portal') app.config_from_object('django.conf:settings') app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))

九、在工程目录下的 project 目录下的__init__.py 文件中添加

from .celery import app as celery_app

十、视图

from .task import tt def celery(request): tt.delay() # 添加到 celery 中执行,不会阻塞 return HttpResponse("测试 celery")

传参:
可以在 tt.delay([参数]) delay 中添加参数

十一、启动顺序

  • 启动 redis

    localhost:~ xialigang$ redis-cli

  • 启动服务

    python manange.py runserver

  • 启动 worker

    python manage.py celery worker –loglevel=info

十二、修改报错(解释器为 3.7 版本)

报错信息

File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/celery/utils/timer2.py", line 19 from kombu.async.timer import Entry, Timer as Schedule, to_timestamp, logger ^

原因:python3.7 版本中 async 是关键字,所以报错

解决方法有两种:

  • 降 python 版本,重新安装 Python 版本为 3.7 以下 如 3.6

  • 把 kombu.async 模块重命名,再把引用处修改过来

    把所有 async 的引入位置改为 async1

    将 site-packages\kombu\async -> site-packages\kombu\async1

    site-packages/celery/concurrency/asynpool.py”, line 42

    site-packages/kombu/transport/redis.py”

    line 815 862 871 876 877 914 920 925

    site-packages/celery/worker/autoscale.py”, line 21

    site-packages/celery/worker/components.py”, line 14

    site-packages/celery/worker/strategy.py”, line 13

十三、定时执行

  • 定时执行一个任务

    • 在 settings.py 文件中添加如下代码

      from datetime import timedelta # 在 settings.py 文件添加 CELERYBEAT_SCHEDULE = {'schedule-test': {'task': 'App.task.test', #App 的名称 task 为你创建任务的 py 文件名 test 为你的任务的名称 'schedule': timedelta(seconds=3), 'args': (2,) }, }
    • 启动顺序:

      • 启动 Django

        Python3 manage.py runserver

      • 启动 worker

        python manage.py celery worker –loglevel=info

      • 开启定时任务

        python manage.py celery beat –loglevel=info(或者 celery -A 你的工程名称 beat -l info)

  • 定时多个任务

    • settings.py 添加如下代码

      from datetime import timedelta # 在 settings.py 文件添加 CELERYBEAT_SCHEDULE = {# 第一个任务 'schedule-test1': {'task': 'App.mytask.test', 'schedule': timedelta(seconds=3), 'args': (2,) }, # 第二个任务 'schedule-test2': {'task': 'App.mytask.test2', 'schedule': timedelta(seconds=3), }, }
    • App/task.py

      from celery import task import time @task def test1(i): print('打印',i) time.sleep(5) print('打印',i) @task def test2(): print('test2') time.sleep(5) print('test2')

正文完
星哥说事-微信公众号
post-qrcode
 
星锅
版权声明:本站原创文章,由 星锅 2022-05-26发表,共计2917字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中