python线程池实现,python线程池,python原生包中没有


python原生包中没有线程池的支持,下面是一个线程池的简单的实现:

import threadingfrom time import sleepclass ThreadPool:    """Flexible thread pool class.  Creates a pool of threads, then    accepts tasks that will be dispatched to the next available    thread."""    def __init__(self, numThreads):        """Initialize the thread pool with numThreads workers."""        self.__threads = []        self.__resizeLock = threading.Condition(threading.Lock())        self.__taskLock = threading.Condition(threading.Lock())        self.__tasks = []        self.__isJoining = False        self.setThreadCount(numThreads)    def setThreadCount(self, newNumThreads):        """ External method to set the current pool size.  Acquires        the resizing lock, then calls the internal version to do real        work."""        # Can't change the thread count if we're shutting down the pool!        if self.__isJoining:            return False        self.__resizeLock.acquire()        try:            self.__setThreadCountNolock(newNumThreads)        finally:            self.__resizeLock.release()        return True    def __setThreadCountNolock(self, newNumThreads):        """Set the current pool size, spawning or terminating threads        if necessary.  Internal use only; assumes the resizing lock is        held."""        # If we need to grow the pool, do so        while newNumThreads > len(self.__threads):            newThread = ThreadPoolThread(self)            self.__threads.append(newThread)            newThread.start()        # If we need to shrink the pool, do so        while newNumThreads < len(self.__threads):            self.__threads[0].goAway()            del self.__threads[0]    def getThreadCount(self):        """Return the number of threads in the pool."""        self.__resizeLock.acquire()        try:            return len(self.__threads)        finally:            self.__resizeLock.release()    def queueTask(self, task, args=None, taskCallback=None):        """Insert a task into the queue.  task must be callable;        args and taskCallback can be None."""        if self.__isJoining == True:            return False        if not callable(task):            return False        self.__taskLock.acquire()        try:            self.__tasks.append((task, args, taskCallback))            return True        finally:            self.__taskLock.release()    def getNextTask(self):        """ Retrieve the next task from the task queue.  For use        only by ThreadPoolThread objects contained in the pool."""        self.__taskLock.acquire()        try:            if self.__tasks == []:                return (None, None, None)            else:                return self.__tasks.pop(0)        finally:            self.__taskLock.release()    def joinAll(self, waitForTasks = True, waitForThreads = True):        """ Clear the task queue and terminate all pooled threads,        optionally allowing the tasks and threads to finish."""        # Mark the pool as joining to prevent any more task queueing        self.__isJoining = True        # Wait for tasks to finish        if waitForTasks:            while self.__tasks != []:                sleep(.1)        # Tell all the threads to quit        self.__resizeLock.acquire()        try:            self.__setThreadCountNolock(0)            self.__isJoining = True            # Wait until all threads have exited            if waitForThreads:                for t in self.__threads:                    t.join()                    del t            # Reset the pool for potential reuse            self.__isJoining = False        finally:            self.__resizeLock.release()class ThreadPoolThread(threading.Thread):    """ Pooled thread class. """    threadSleepTime = 0.1    def __init__(self, pool):        """ Initialize the thread and remember the pool. """        threading.Thread.__init__(self)        self.__pool = pool        self.__isDying = False    def run(self):        """ Until told to quit, retrieve the next task and execute        it, calling the callback if any.  """        while self.__isDying == False:            cmd, args, callback = self.__pool.getNextTask()            # If there's nothing to do, just sleep a bit            if cmd is None:                sleep(ThreadPoolThread.threadSleepTime)            elif callback is None:                cmd(args)            else:                callback(cmd(args))    def goAway(self):        """ Exit the run loop next time through."""        self.__isDying = True# Usage exampleif __name__ == "__main__":    from random import randrange    # Sample task 1: given a start and end value, shuffle integers,    # then sort them    def sortTask(data):        print "SortTask starting for ", data        numbers = range(data[0], data[1])        for a in numbers:            rnd = randrange(0, len(numbers) - 1)            a, numbers[rnd] = numbers[rnd], a        print "SortTask sorting for ", data        numbers.sort()        print "SortTask done for ", data        return "Sorter ", data    # Sample task 2: just sleep for a number of seconds.    def waitTask(data):        print "WaitTask starting for ", data        print "WaitTask sleeping for %d seconds" % data        sleep(data)        return "Waiter", data    # Both tasks use the same callback    def taskCallback(data):        print "Callback called for", data    # Create a pool with three worker threads    pool = ThreadPool(3)    # Insert tasks into the queue and let them run    pool.queueTask(sortTask, (1000, 100000), taskCallback)    pool.queueTask(waitTask, 5, taskCallback)    pool.queueTask(sortTask, (200, 200000), taskCallback)    pool.queueTask(waitTask, 2, taskCallback)    pool.queueTask(sortTask, (3, 30000), taskCallback)    pool.queueTask(waitTask, 7, taskCallback)    # When all tasks are finished, allow the threads to terminate    pool.joinAll()

评论关闭