作者: 吴老板  2021-03-05 08:52:00
系统
Windows
分布式 大家好,我是吴老板。用Celery 官方的话来说,Celery 是一个非常优秀的分布式队列,可应用于分布式共享中间队列和定时任务等等。 

10多年的章贡网站建设经验,针对设计、前端、开发、售后、文案、推广等六对一服务,响应快,48小时及时工作处理。成都营销网站建设的优势是能够根据用户设备显示端的尺寸不同,自动调整章贡建站的显示方式,使网站能够适用不同显示终端,在浏览器中调整网站的宽度,无论在任何一种浏览器上浏览网站,都能展现优雅布局与设计,从而大程度地提升浏览体验。成都创新互联从事“章贡网站设计”,“章贡网站推广”以来,每个客户项目都认真落实执行。
1 前言
大家好,我是吴老板。用Celery 官方的话来说,Celery 是一个非常优秀的分布式队列,可应用于分布式共享中间队列和定时任务等等。
2 版本的差异
Celery 有很多个版本,各版本之间的差异可谓不小,比如最新的 Celery6.0 版本在稳定性远不如 Celery4.0,所以在使用不同版本的时候,系统给到我们的反馈可能并不能如我们所愿。
3 服务
在 windows 下挂在 Celery 服务有时候会出现不稳定的情况(unix中暂时未发现这种情况),比如在执行定时任务的时候,过了一段时间之后,Celery 出现了假死状态,以至于不能按照我们指定的时间点去执行任务。
这些任务只是加入到待运行队列中(堆积在 Redis 中),只能人为重启 Celery 服务之后才能将堆积的任务释放出来运行。
这样一来,第一是定时任务在指定时间点没有正常运行,其二是在其他时间运行了这些任务,很可能会产生更新数据不及时,时间节点混乱的问题,不仅达不到业务需求,还会反受其害。
4 设置心跳
为了解决 Celery 在 windows 中的这种弊端,可以为 Celery 任务队列设置一个心跳时间,比如每一分钟或者每五分钟向 Redis 数据库发送一次数据以保证队列始终是活跃的状态,这样只要你的电脑不关机并保持网络畅通(如果是远程 Redis),Celery 任务队列服务就不会出现假死状态。
5 举个栗子
我总是很喜欢用示例来说话,前些时间在对某平台的商家后台进行数据采集的时候,为了使用时能自动获取该网站的 cookie ,
用Pyppeteer 写了一个自动化登陆的脚本,和往常一样仍在 Celery 队列中并迅速的启动服务。
脚本是这样的(非常接近实际的伪代码,没办法,保命要紧)
- # -*- coding: utf-8 -*-
 - from db.redisCurd import RedisQueue
 - import asyncio
 - import random
 - import tkinter
 - from pyppeteer.launcher import launch
 - from platLogin.config import USERNAME, PASSWORD, LOGIN_URL
 - class Login():
 - def __init__(self, shopId):
 - self.shopId = shopId
 - self.RedisQueue = RedisQueue("cookie")
 - def screen_size(self):
 - tk = tkinter.Tk()
 - width = tk.winfo_screenwidth()
 - height = tk.winfo_screenheight()
 - tk.quit()
 - return {'width': width, 'height': height}
 - async def login(self, username, password, url):
 - browser = await launch(
 - {
 - 'headless': False,
 - 'dumpio': True
 - },
 - args=['--no-sandbox', '--disable-infobars', '--user-data-dir=./userData'],
 - )
 - page = await browser.newPage() # 启动新的浏览器页面
 - try:
 - await page.setViewport(viewport=self.screen_size())
 - await page.setJavaScriptEnabled(enabled=True) # 启用js
 - await page.setUserAgent(
 - 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 Edge/16.16299'
 - )
 - await self.page_evaluate(page)
 - await page.goto(url)
 - await asyncio.sleep(2)
 - # 输入用户名,密码
 - await page.evaluate(f'document.querySelector("#userName").value=""')
 - await page.type('#userName', username, {'delay': self.input_time_random() - 50}) # delay是限制输入的时间
 - await page.evaluate('document.querySelector("#passWord").value=""')
 - await page.type('#passWord', password, {'delay': self.input_time_random()})
 - await page.waitFor(6000)
 - loginImgVcode = await page.waitForSelector('#checkCode')
 - await loginImgVcode.screenshot({'path': './loginImg.png'})
 - await page.waitFor(6000)
 - res = use_cjy("./loginImg.png")
 - pic_str = res.get("pic_str") if res.get("err_str") == "OK" else "1234"
 - await page.waitFor(6000)
 - await page.type('#checkWord', pic_str, {'delay': self.input_time_random() - 50})
 - await page.waitFor(6000)
 - await page.click('#subMit')
 - await page.waitFor(6000)
 - await asyncio.sleep(2)
 - await self.get_cookie(page)
 - await page.waitFor(3000)
 - await self.page_close(browser)
 - return {'code': 200, 'msg': '登陆成功'}
 - except:
 - return {'code': -1, 'msg': '出错'}
 - finally:
 - await page.waitFor(3000)
 - await self.page_close(browser)
 - # 获取登录后cookie
 - async def get_cookie(self, page):
 - cookies_list = await page.cookies()
 - cookies = ''
 - for cookie in cookies_list:
 - str_cookie = '{0}={1}; '
 - str_cookie = str_cookie.format(cookie.get('name'), cookie.get('value'))
 - cookies += str_cookie
 - # 将cookie 放入 cookie 池
 - self.RedisQueue.put_hash(self.shopId, cookies)
 - return cookies
 - async def page_evaluate(self, page):
 - await page.evaluate('''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => undefined } }) }''')
 - await page.evaluate('''() =>{ window.navigator.chrome = { runtime: {}, }; }''')
 - await page.evaluate(
 - '''() =>{ Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] }); }''')
 - await page.evaluate(
 - '''() =>{ Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5,6], }); }''')
 - await page.waitFor(3000)
 - async def page_close(self, browser):
 - for _page in await browser.pages():
 - await _page.close()
 - await browser.close()
 - def input_time_random(self):
 - return random.randint(100, 151)
 - def run(self, username=USERNAME, password=PASSWORD, url=LOGIN_URL):
 - loop = asyncio.get_event_loop()
 - i_future = asyncio.ensure_future(self.login(username, password, url))
 - loop.run_until_complete(i_future)
 - return i_future.result()
 - if __name__ == '__main__':
 - Z = Login(shopId="001")
 - Z.run()
 
Celery 任务文件是这样的
- # -*- coding: utf-8 -*-
 - from __future__ import absolute_import
 - import os
 - import sys
 - import time
 - from db.redisCurd import RedisQueue
 - from send_msg.weinxin import Send_msg
 - base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
 - sys.path.append(base_dir)
 - from logger.logger import log_v
 - from celery import Task
 - from platLogin.login import Login # 登陆类
 - from celery import Celery
 - randomQueue = RedisQueue("cookie")
 - celery_app = Celery('task')
 - celery_app.config_from_object('celeryConfig')
 - S = Send_msg()
 - dl_dict = {
 - 'demo': {
 - 'cookie': '',
 - 'loginClass': 'Login',
 - }
 - }
 - # todo 这是三种运行的状态
 - class task_status(Task):
 - def on_success(self, retval, task_id, args, kwargs):
 - log_v.info('任务信息 -> id:{} , arg:{} , successful ..... Done'.format(task_id, args))
 - def on_failure(self, exc, task_id, args, kwargs, einfo):
 - log_v.error('task id:{} , arg:{} , failed ! error : {}'.format(task_id, args, exc))
 - def on_retry(self, exc, task_id, args, kwargs, einfo):
 - log_v.warning('task id:{} , arg:{} , retry ! info: {}'.format(task_id, args, exc))
 - # todo 随便找个hash key作为轮询对象, celery在win10系统可能不太稳定,有时候会有连接断开的情况
 - @celery_app.task(base=task_status)
 - def get_cookie_status(platName="demo"):
 - try:
 - # log_v.debug(f'[+] 轮询 {platName} 定时器启动 ..... Done')
 - randomQueue.get_hash(platName).decode()
 - log_v.debug(f'[+] 轮询 {platName} 成功 ..... Done')
 - return "Erp 轮询成功"
 - except:
 - return "Erp 轮询失败"
 - @celery_app.task(base=task_status)
 - def set_plat_cookie(platName="demo", shopId=None):
 - log_v.debug(f"[+] {platName} 正在登陆")
 - core = eval(dl_dict[platName]['loginClass'])(shopId=shopId)
 - result = core.run()
 - return result
 
Celery 配置文件是这样的
- from __future__ import absolute_import
 - import datetime
 - from kombu import Exchange, Queue
 - from celery.schedules import crontab
 - from urllib import parse
 - BROKER_URL = f'redis://root:{parse.quote("你的不规则密码")}@主机:6379/15'
 - # 导入任务,如tasks.py
 - CELERY_IMPORTS = ('monitor.tasks',)
 - # 列化任务载荷的默认的序列化方式
 - CELERY_TASK_SERIALIZER = 'json'
 - # 结果序列化方式
 - CELERY_RESULT_SERIALIZER = 'json'
 - CELERY_ACCEPT_CONTENT = ['json']
 - CELERY_TIMEZONE = 'Asia/Shanghai' # 指定时区,不指定默认为 'UTC'
 - # CELERY_TIMEZONE='UTC'
 - CELERYBEAT_SCHEDULE = {
 - 'add-every-60-seconds': {
 - 'task': 'tasks.get_cookie_status',
 - 'schedule': datetime.timedelta(minutes=1), # 每 1 分钟执行一次
 - 'args': () # 任务函数参数
 - },
 - }
 
启动服务
- celery -A tasks beat -l INFO
 - celery -A tasks worker -l INFO -c 2
 
以 2 个线程启动消费者队列服务并启用定时任务,当发现当前平台的 cookie 不可用时,我会向 Celery 发送一个信号(就是调用了前面的set_plat_cookie 这个方法),消费者得到这个任务这个就会执行自动化脚本以获取 cookie 并储存在 Redis 中,使用时在从 Redis 中获取就能正常请求到该平台的数据。
在空闲时间,Celery中的 get_cookie_status 方法会每隔一分钟向 Redis 请求数据,这就是我们设置的 1分钟心跳。
这样不管我们的 Celery 是否是后台启动,都不会出现假死、卡死的状态,则万事大吉矣!!
6 总结
本文为了解决 Celery 在 windows 中的这种弊端,为 Celery 任务队列设置一个心跳时间,比如每一分钟或者每五分钟向 Redis 数据库发送一次数据以保证队列始终是活跃的状态,这样只要你的电脑不关机并保持网络畅通(如果是远程 Redis),Celery 任务队列服务都不会出现假死、卡死的状态。
                网站题目:手把手教你在Windows下设置分布式队列Celery的心跳轮询
                
                网页地址:http://www.csdahua.cn/qtweb/news10/199860.html
            
网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网