python事件信号调度,python事件调度,#idle_queue.
文章由Byrx.net分享于2019-03-23 10:03:17
python事件信号调度,python事件调度,#idle_queue.
#idle_queue.pyimport Queue#Global queue, import it from anywhere, you get the same object instance.idle_loop = Queue.Queue()def idle_add(func, *args, **kwargs): def idle(): func(*args, **kwargs) return False idle_loop.put(idle)#weak_ref.pyimport weakrefclass _BoundMethodWeakref: def __init__(self, func): self.func_name = func.__name__ self.wref = weakref.ref(func.__self__) #__self__ returns the class def __call__(self): func_cls = self.wref() if func_cls is None: #lost reference return None else: func = getattr(func_cls, self.func_name) return funcdef weak_ref(callback): if hasattr(callback, '__self__') and callback.__self__ is not None: #is a bound method? return _BoundMethodWeakref(callback) else: return weakref.ref(callback)#event.pyimport threading#import logging#logger = logging.getLogger(__name__)import idle_queuefrom weak_ref import weak_refclass Event: def __init__(self, name): self.name = name self.callbacks = [] self.lock = threading.Lock() def connect(self, callback): with self.lock: callback = weak_ref(callback) self.callbacks.append(callback) def disconnect(self, callback): with self.lock: for index, weakref_callback in enumerate(self.callbacks): if callback == weakref_callback(): del self.callbacks[index] break def emit(self, *args, **kwargs): with self.lock: #logger.debug("Event emitted: {}".format(self.name)) for weakref_callback in self.callbacks[:]: callback = weakref_callback() if callback is not None: idle_queue.idle_add(callback, *args, **kwargs) else: #lost reference self.callbacks.remove(weakref_callback) #if not self.callbacks: #logger.debug("No signals assosiated to: {}".format(self.name))#events.pyimport Eventclass _Events: #add some signals, example: #do_something = Event('do something') #args: my_arg1, my_arg_list2, my_arg_str3 quit_app = Event('quit app') #args: some_argevents = _Events()if __name__ == "__main__": def quit(some_arg): print some_arg sys.exit(0) events.quit_app.connect(quit) #connect the callback/slot #events.quit_app.disconnect(quit) #disconnect something = "goodbye" events.quit_app.emit(something) #emit the signal #this should go in your main thread loop if your are using a gui. #example: http://code.activestate.com/recipes/578299-pyqt-pyside-thread-safe-global-queue-main-loop-int/ callback = idle_loop.get() callback() #dispatch the event
评论关闭