11 class _HeartBeatThread(threading.Thread):
13 """Periodically send a 'heartbeat' back to the manager, so that it can
14 distinguish between failed nodes and long calculations"""
17 def __init__(self, manager):
18 threading.Thread.__init__(self)
19 self._manager = manager
20 self._event = threading.Event()
23 """Stop the heartbeat"""
28 self._event.wait(self.timeout)
29 if self._event.isSet():
32 self._manager._send(_HeartBeat())
37 def __init__(self, manager_addr):
38 self._manager_addr = manager_addr
41 print(
"Connect back to manager at %s:%d with ID %s"
42 % tuple(self._manager_addr))
43 lock = threading.Lock()
44 manager = ManagerCommunicator(self._manager_addr, lock)
45 hb = _HeartBeatThread(manager)
48 self._handle_network_io(manager)
52 def _send_exception_to_manager(self, manager, exc):
54 exc_type, exc_value, tb = sys.exc_info()
55 manager._send(_ErrorWrapper(
56 exc, traceback.format_exception(exc_type, exc_value, tb)))
61 def _handle_network_io(self, manager):
69 if isinstance(obj, _ContextWrapper):
73 setup_args = obj.obj()
74 elif isinstance(obj, _TaskWrapper):
75 manager._send(obj.obj(*setup_args))
76 elif isinstance(obj, _WorkerAction):
80 except Exception
as detail:
82 self._send_exception_to_manager(manager, detail)
87 h = WorkerHandler([sys.argv[-3], int(sys.argv[-2]), sys.argv[-1]])
91 if __name__ ==
'__main__':
Classes for communicating from the manager to workers.
Utilities for the IMP.parallel module.
Distribute IMP tasks to multiple processors or machines.