1 from __future__
import print_function
12 class _HeartBeatThread(threading.Thread):
14 """Periodically send a 'heartbeat' back to the manager, so that it can
15 distinguish between failed nodes and long calculations"""
18 def __init__(self, manager):
19 threading.Thread.__init__(self)
20 self._manager = manager
21 self._event = threading.Event()
24 """Stop the heartbeat"""
29 self._event.wait(self.timeout)
30 if self._event.isSet():
33 self._manager._send(_HeartBeat())
36 class WorkerHandler(object):
38 def __init__(self, manager_addr):
39 self._manager_addr = manager_addr
42 print(
"Connect back to manager at %s:%d with ID %s"
43 % tuple(self._manager_addr))
44 lock = threading.Lock()
45 manager = ManagerCommunicator(self._manager_addr, lock)
46 hb = _HeartBeatThread(manager)
49 self._handle_network_io(manager)
53 def _send_exception_to_manager(self, manager, exc):
55 exc_type, exc_value, tb = sys.exc_info()
56 manager._send(_ErrorWrapper(
57 exc, traceback.format_exception(exc_type, exc_value, tb)))
62 def _handle_network_io(self, manager):
70 if isinstance(obj, _ContextWrapper):
74 setup_args = obj.obj()
75 elif isinstance(obj, _TaskWrapper):
76 manager._send(obj.obj(*setup_args))
77 elif isinstance(obj, _WorkerAction):
81 except Exception
as detail:
83 self._send_exception_to_manager(manager, detail)
88 h = WorkerHandler([sys.argv[-3], int(sys.argv[-2]), sys.argv[-1]])
92 if __name__ ==
'__main__':
Classes for communicating from the manager to workers.
Utilities for the IMP.parallel module.
Distribute IMP tasks to multiple processors or machines.