Python 多进程实现分析,python进程实现,Python Stand


Python Standard Library 从 2.6 起增加了子进程级别的并行开发支持 —— multiprocessing。

#!/usr/bin/env python# -*- coding:utf-8 -*-import os, timefrom multiprocessing import *def test(x):    print current_process().pid, x    time.sleep(1)if __name__ == "__main__":    print "main:", os.getpid()    p = Pool(5)    p.map(test, range(13))

输出:

$ ./test.pymain: 12081209 01210 11211 21212 31213 41210 51209 61212 71211 81213 91209 101210 111212 12

从输出结果,我们可以看出多个子进程 “并行” 完成了 “计算” 任务。

Process

我们先从最根本的 Process 入手,看看是如何启动子进程完成并行计算的。上面的 Pool 不过是创建多个 Process,然后将数据(args)提交给多个子进程完成而已。

#!/usr/bin/env python# -*- coding:utf-8 -*-import os, timefrom multiprocessing import *def test(x):    print current_process().pid, x    time.sleep(1)if __name__ == "__main__":    print "main:", os.getpid()    p = Process(target = test, args = [100])    p.start()    p.join()

输出:

$ ./test.py

main: 12291230 100

为了更好掌握这个 “并行” 利器,需要 “深入” 探究子进程的执行流程和生命周期。multiprocessing 在接口设计上参考了 threading,而我们分析的起点就是 Process.start()。

class Process(object):    '''    Process objects represent activity that is run in a separate process    The class is analagous to `threading.Thread`    '''    _Popen = None    def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):        ...        self._parent_pid = os.getpid()        self._popen = None        self._target = target        self._args = tuple(args)        self._kwargs = dict(kwargs)        ...    def start(self):        ...        if self._Popen is not None:            Popen = self._Popen        else:            from .forking import Popen        self._popen = Popen(self)        _current_process._children.add(self)

继续,这个 Popen 是关键,打开 forking.py。

class Popen(object):    def __init__(self, process_obj):        ...        self.pid = os.fork()        if self.pid == 0:            ...            code = process_obj._bootstrap()            sys.stdout.flush()            sys.stderr.flush()            os._exit(code)

这里调用了关键的 fork(),子进程由此而生。接下来子进程调用了 Process._bootstrap()。注意,这里还调用了 “os._exit(code)”,这表明子进程与 Process.Start() “self._popen = Popen(self)” 之后的代码分道扬镳。

class Process(object):    def _bootstrap(self):        from . import util        global _current_process        try:            self._children = set()            self._counter = itertools.count(1)            try:                sys.stdin.close()                sys.stdin = open(os.devnull)            except (OSError, ValueError):                pass            _current_process = self # 重置了 _current_process !            util._finalizer_registry.clear()            util._run_after_forkers()            util.info('child process calling self.run()')            try:                self.run()                exitcode = 0            finally:                util._exit_function()        except SystemExit, e:            if not e.args:                exitcode = 1            elif type(e.args[0]) is int:                exitcode = e.args[0]            else:                sys.stderr.write(e.args[0] + '\n')                sys.stderr.flush()                exitcode = 1        except:            exitcode = 1            import traceback            sys.stderr.write('Process %s:\n' % self.name)            sys.stderr.flush()            traceback.print_exc()        util.info('process exiting with exitcode %d' % exitcode)        return exitcode

这个方法看着复杂,初始化子进程环境,并对异常等做出处理,但关键的就是 “self.run()” 这一行。

class Process(object):    def run(self):        if self._target:            self._target(*self._args, **self._kwargs)

子进程在这里调用了我们创建 Process 对象时传递的 target 参数,也就是上例中的 test()。如此,我们就大概明白了子进程创建和调用的整个过程,似乎并不复杂。不过事情还没完,我们需要继续 “了解” 父进程在创建子进程之后做了什么。

回到 Process.start(),Popen 创建并调用子进程,而父进程则继续执行下一步。

class Process(object):    def start(self):        ...        if self._Popen is not None:            Popen = self._Popen        else:            from .forking import Popen        self._popen = Popen(self)        _current_process._children.add(self)

这个 _current_process 默认应该代表父进程,只是从何而来?在 process.py 最后面有这些代码。

class _MainProcess(Process):    def __init__(self):        self._identity = ()        self._daemonic = False        self._name = 'MainProcess'        self._parent_pid = None        self._popen = None        self._counter = itertools.count(1)        self._children = set()        self._authkey = AuthenticationString(os.urandom(32))        self._tempdir = None_current_process = _MainProcess()del _MainProcess

只是对当前进程(父进程)的简单包装。父进程通过 Process.start() 创建子进程,并添加到一个子进程列表(_children)中。那么父进程如何处理子进程的退出呢?要知道不小心会搞出很多僵尸进程(参见 《fork: 杀僵尸练级》)。

在 multiprocessing.init.py 中引入了 util。

from multiprocessing.util import SUBDEBUG, SUBWARNING

在 util.py 尾部有这些代码。

from multiprocessing.process import current_process, active_childrendef _exit_function():    global _exiting    info('process shutting down')    debug('running all "atexit" finalizers with priority >= 0')    _run_finalizers(0)    for p in active_children():        if p._daemonic:            info('calling terminate() for daemon %s', p.name)            p._popen.terminate()    for p in active_children():        info('calling join() for process %s', p.name)        p.join()    debug('running the remaining "atexit" finalizers')    _run_finalizers()atexit.register(_exit_function)

active_children 由 process 导入。

def current_process():    '''Return process object representing the current process'''    return _current_processdef active_children():    '''Return list of process objects corresponding to live child processes'''    _cleanup()    return list(_current_process._children)

_exit_function 会在父进程(或子进程,子进程可以创建自己的子进程)退出时检查其子进程列表,并调用 join 等待子进程退出。如果子进程是 daemon,则直接调用 terminate (发送 SIGTERM 信号) 终止。

class Process(object):    def join(self, timeout=None):        ...        res = self._popen.wait(timeout)        if res is not None:            _current_process._children.discard(self)class Popen(object):    def wait(self, timeout=None):        if timeout is None:            return self.poll(0)        ...        while 1:            res = self.poll()            ...        return res    def poll(self, flag=os.WNOHANG):        if self.returncode is None:            pid, sts = os.waitpid(self.pid, flag)            if pid == self.pid:                if os.WIFSIGNALED(sts):                    self.returncode = -os.WTERMSIG(sts)            else:                assert os.WIFEXITED(sts)                self.returncode = os.WEXITSTATUS(sts)        return self.returncode    def terminate(self):        if self.returncode is None:        try:            os.kill(self.pid, signal.SIGTERM)        except OSError, e:            if self.wait(timeout=0.1) is None:            raise

Process.join() 调用 Popen.wait(),而 wait() 内部则通过 poll() 调用 os.waitpid() 等待子进程终止,如此这般才算完成了一个完整的流程。

附注: 子进程同样也会在其退出时调用 uitl._exit_function。

评论关闭