python django+celery+ansibleApi无返回,djangoansibleapi,1.python调用An
python django+celery+ansibleApi无返回,djangoansibleapi,1.python调用An
1.python调用AnsibleApi远程执行任务,不用celery的情况下能正确运行,使用的话返回为空.pdb调试发现是调用Ansible返回异常,但具体原因几天实在无法查出
2.代码复现如现如下:
tasks.py
from celery import shared_taskfrom .deploy_tomcat2 import django_process@shared_taskdef deploy(jira_num): #return 'hello world {0}'.format(jira_num) #rdb.set_trace() return django_process(jira_num)
deploy_tomcat2.py
from .AnsibleApi import CallApidef django_process(jira_num): server = '10.10.10.30' name = 'abc' port = 11011 code = 'efs' jdk = '1.12.13' jvm = 'xxxx' if str.isdigit(jira_num): # import pdb # pdb.set_trace() call = CallApi(server,name,port,code,jdk,jvm) return call.run_task()
AnsibleApi.py
#!/usr/bin/env python# -*- coding: utf-8 -*-import loggingfrom .Logger import Loggerfrom django.conf import settingsfrom collections import namedtuplefrom ansible.parsing.dataloader import DataLoaderfrom ansible.vars import VariableManagerfrom ansible.inventory import Inventoryfrom ansible.playbook.play import Playfrom ansible.executor.task_queue_manager import TaskQueueManagerfrom ansible.plugins.callback import CallbackBaseLog = Logger('/tmp/auto_deploy_tomcat.log',logging.INFO)class ResultCallback(CallbackBase): def __init__(self, *args, **kwargs): super(ResultCallback ,self).__init__(*args, **kwargs) self.host_ok = {} self.host_unreachable = {} self.host_failed = {} def v2_runner_on_unreachable(self, result): self.host_unreachable[result._host.get_name()] = result def v2_runner_on_ok(self, result, *args, **kwargs): self.host_ok[result._host.get_name()] = result def v2_runner_on_failed(self, result, *args, **kwargs): self.host_failed[result._host.get_name()] = resultclass CallApi(object): user = settings.SSH_USER ssh_private_key_file = settings.SSH_PRIVATE_KEY_FILE results_callback = ResultCallback() Options = namedtuple('Options', ['connection', 'module_path', 'private_key_file', 'forks', 'become', 'become_method', 'become_user', 'check']) def __init__(self,ip,name,port,code,jdk,jvm): self.ip = ip self.name = name self.port = port self.code = code self.jdk = jdk self.jvm = jvm self.results_callback = ResultCallback() self.results_raw = {} def _gen_user_task(self): tasks = [] deploy_script = 'autodeploy/tomcat_deploy.sh' dst_script = '/tmp/tomcat_deploy.sh' cargs = dict(src=deploy_script, dest=dst_script, owner=self.user, group=self.user, mode='0755') args = "%s %s %d %s %s '%s'" % (dst_script, self.name, self.port, self.code, self.jdk, self.jvm) tasks.append(dict(action=dict(module='copy', args=cargs),register='shell_out')) tasks.append(dict(action=dict(module='debug', args=dict(msg='{{shell_out}}')))) # tasks.append(dict(action=dict(module='command', args=args))) # tasks.append(dict(action=dict(module='command', args=args), register='result')) # tasks.append(dict(action=dict(module='debug', args=dict(msg='{{result.stdout}}')))) self.tasks = tasks def _set_option(self): self._gen_user_task() self.variable_manager = VariableManager() self.loader = DataLoader() self.options = self.Options(connection='smart', module_path=None, private_key_file=self.ssh_private_key_file, forks=None, become=True, become_method='sudo', become_user='root', check=False) self.inventory = Inventory(loader=self.loader, variable_manager=self.variable_manager, host_list=[self.ip]) self.variable_manager.set_inventory(self.inventory) play_source = dict( name = "auto deploy tomcat", hosts = self.ip, remote_user = self.user, gather_facts='no', tasks = self.tasks ) self.play = Play().load(play_source, variable_manager=self.variable_manager, loader=self.loader) def run_task(self): self.results_raw = {'success':{}, 'failed':{}, 'unreachable':{}} tqm = None from celery.contrib import rdb;rdb.set_trace() #import pdb;pdb.set_trace() self._set_option() try: tqm = TaskQueueManager( inventory=self.inventory, variable_manager=self.variable_manager, loader=self.loader, options=self.options, passwords=None, stdout_callback=self.results_callback, ) result = tqm.run(self.play) finally: if tqm is not None: tqm.cleanup() for host, result in self.results_callback.host_ok.items(): self.results_raw['success'][host] = result._result for host, result in self.results_callback.host_failed.items(): self.results_raw['failed'][host] = result._result for host, result in self.results_callback.host_unreachable.items(): self.results_raw['unreachable'][host]= result._result Log.info("result is :%s" % self.results_raw) return self.results_raw
复现方法
启动celery worker:
celery -A jira worker -Q queue.ops.deploy -n "deploy.%h" -l info
另一窗口生产消息:
deploy.apply_async(args=['150'], queue='queue.ops.deploy', routing_key='ops.deploy')
有两种方法解决这个问题,就是关闭assert:
1.在celery 的worker启动窗口设置export PYTHONOPTIMIZE=1或打开celery这个参数-O OPTIMIZATION
2.注释掉python包multiprocessing下面process.py中102行,关闭assert
既然都用django,CRUD看来是标配了,那么你不如试试 post_save 这个 signal
直接 deploy.delay(**params)
请问解决了没,我应该是遇到同样的问题,delay执行有输出,可实际上没执行到ansible的操作
编橙之家文章,
相关内容
- Python处理二进制流应该怎么写,Python处理二进制流,我现
- Python判读变量是否定义用什么方法,python判读变量定义
- Python webpy模板变量可以遍历两次吗?为什么我的方法不行
- python处理小文件读写并行的一些疑惑,python读写,我有大
- python threading线程再次开启多线程AttributeError错误应对方
- WEB应用Python比PHP更适合吗,,我记得以前看过一篇文章,
- 贴个python源码求分析,微信第三方接口无法为客户端返回
- 提升Python编程能力有什么方法,提升python编程能力,我是
- 一个页面中的这两个信息能不用python无头浏览器爬取到
- Python菜鸟对类应用的问题求助,python菜鸟问题求助,cl
评论关闭