问答 celery 无法结束 task 任务创建的 subprocess.Popen 子进程

南山老人i · 2022年09月30日 · 最后由 南山老人i 回复于 2023年02月10日 · 10471 次阅读

场景描述:最近使用 fastapi 框架搭建 web 服务,有这样一个场景,我需要某个接口先返回数据,再去处理业务,所以就使用到了 celery 来做异步处理。
task 中是封装的业务代码,在此中有一个公共方法的功能是使用 scrcpy 录制手机屏幕,在结束录屏的时候,我遇到了 bug!
话不多说上代码:
task 任务:
srd = ScreenRecording(devicename, uid) # 开启录屏,并存放在 uid 中
srd.run()
sleep(10)
srd.stop()
ScreenRecording 录屏工具类:

import subprocess
from loguru import logger

class ScreenRecording():
def init(self, devicename, uid):
self.devicename = devicename
self.uid = uid
logger.debug(f"创建录屏实例化对象!属性为:devicename={devicename}&uid={uid}")

def run(self):
cmd = f"scrcpy -s {self.devicename} -Nr ./report/{self.uid}/report/{self.uid}.mp4"
self.p = subprocess.Popen(cmd, shell=True)
logger.debug(f"设备{self.devicename}已开始录屏!")

def stop(self):
# 方案一----------------------------------------------
# self.p.send_signal(signal.CTRL_C_EVENT) # 不能使用发送 ctrl+c 信号,会导致 celery 服务停止!
# 方案二-------------------------------------------------
# os.kill(self.p.pid, signal.CTRL_C_EVENT) # 方法可行,但是会造成 celery 队列卡死
# self.p.wait() # 加了 wait 与方案一效果一致
# sleep(2) # 加了延迟也与方案一一致
# 方案三----------------------------------------------------
# self.p.send_signal(signal.SIGTERM) # 无法终止录屏
# 方案四-----------------------------------------------
# self.p.terminate() # 无法终止录屏
# 方案五---------------------------------------------
# os.kill(self.p.pid, signal.CTRL_C_EVENT) # 无效
# sleep(0.01)
# self.p.wait()
# 方案六-----------------------------------------
# p = psutil.Process(pid=self.p) # 无效
# print(self.p)
# print(p)
# p.send_signal(signal.CTRL_C_EVENT)
# 方案七-------------------------------------
# os.system(f'taskkill /t /pid {self.p.pid}') # 无效
# 方案八-------------------------------------
# os.killpg(os.getpgid(self.p.pid), 2) # 无效
# self.p.wait()
logger.debug(f"设备{self.devicename}已结束录屏!")

额外补充:在此期间我尝试了各种 kill 信号,包括-2,-9 信号。总结就是,要么结束不掉 popen 管道进程,要么连着 celery 服务一起 ctrl+c 结束了!
求助求助

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
共收到 10 条回复 时间 点赞

之前也用过 celery 做后台任务,后来发现 apscheduler 库也能满足就换到 apscheduler 库了,还不用单独启动 celery worker,楼主可以试试看

上面描述太臭太长,换个描述:windows 系统中如何终止以 shell=True 参数启动的 subprocess.Popen 方法

好的哥,我先去简单看看,基本上只要满足可以实现异步就可以满足我的使用场景

我去看了大哥说的这个框架,貌似是跟我的场景不太符合,我这边使用 celery 主要是为了他的异步,对定时任务基本没有需求,硬用定时任务上异步的话暂时没有好的思路,可能不太符合了

南山老人i 回复

apscheduler 也支持异步的,在调用 add_job 方法的时候,不传 trigger 参数可以立即触发一个异步任务

换个思路 你这个是子进程里面再开子进程 具体怎么停要看平台特性

我有个很粗暴的方案,直接杀 adbd,哈哈哈

可以试试启用线程去执行 subprocess,同时记录这个线程的 id,处理完业务需要关闭的时候杀掉线程:

t1 = threading.Thread(target=***)
t1.start()
self.tid = t1.ident  #获取线程id
#业务代码
_async_raise(self.tid,SystemExit) #强制关闭线程

def _async_raise(tid, exctype): #关闭线程方法
    """raises the exception, performs cleanup if needed"""
    try:
        tid = ctypes.c_long(tid)
        if not inspect.isclass(exctype):
            exctype = type(exctype)
        res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
        if res == 0:
            # pass
            raise ValueError("invalid thread id")
        elif res != 1:
            # """if it returns a number greater than one, you're in trouble,
            # and you should call it again with exc=NULL to revert the effect"""
            ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
            raise SystemError("PyThreadState_SetAsyncExc failed")
    except Exception as err:
        print(err)

更新回帖!时隔这么久我终于找到了问题所在
官方文档描述:

args is required for all calls and should be a string, or a sequence of program arguments. Providing a sequence of arguments is generally preferred, as it allows the module to take care of any required escaping and quoting of arguments (e.g. to permit spaces in file names). If passing a single string, either shell must be True (see below) or else the string must simply name the program to be executed without specifying any arguments.

If shell is True, the specified command will be executed through the shell. This can be useful if you are using Python primarily for the enhanced control flow it offers over most system shells and still want convenient access to other shell features such as shell pipes, filename wildcards, environment variable expansion, and expansion of ~ to a user’s home directory. However, note that Python itself offers implementations of many shell-like features (in particular, glob, fnmatch, os.walk(), os.path.expandvars(), os.path.expanduser(), and shutil).

文档大意:如果带上 shell=True ,那么会通过 shell 来启动进程。这意味着,一次 Popen 会启动两个进程,一个 shell 进程,一个命令进程。然后 Popen 返回的 pid 是 shell 进程的 pid,这会导致 Popen.kill() 等函数不起作用,进程还在正常运行,所以一定要使用参数列表的形式启动,不要通过命令行的形式,不要使用 shell=True 。

也算是无意之间看到大佬的这篇文档才了解为什么,链接附上,大佬文章里有更详细的解决与讲解:https://blog.csdn.net/u012849539/article/details/117457490

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册