1 from __future__
import print_function
12 class _HeartBeatThread(threading.Thread):
14 """Periodically send a 'heartbeat' back to the master, so that it can
15 distinguish between failed nodes and long calculations"""
18 def __init__(self, master):
19 threading.Thread.__init__(self)
21 self._event = threading.Event()
24 """Stop the heartbeat"""
29 self._event.wait(self.timeout)
30 if self._event.isSet():
33 self._master._send(_HeartBeat())
36 class SlaveHandler(object):
38 def __init__(self, master_addr):
39 self._master_addr = master_addr
42 print(
"Connect back to master at %s:%d with ID %s" \
43 % tuple(self._master_addr))
44 lock = threading.Lock()
45 master = MasterCommunicator(self._master_addr, lock)
46 hb = _HeartBeatThread(master)
49 self._handle_network_io(master)
53 def _send_exception_to_master(self, master, exc):
55 exc_type, exc_value, tb = sys.exc_info()
56 master._send(_ErrorWrapper(exc,
57 traceback.format_exception(exc_type, exc_value, tb)))
62 def _handle_network_io(self, master):
70 if isinstance(obj, _ContextWrapper):
74 setup_args = obj.obj()
75 elif isinstance(obj, _TaskWrapper):
76 master._send(obj.obj(*setup_args))
77 elif isinstance(obj, _SlaveAction):
81 except Exception
as detail:
83 self._send_exception_to_master(master, detail)
88 h = SlaveHandler([sys.argv[-3], int(sys.argv[-2]), sys.argv[-1]])
91 if __name__ ==
'__main__':
Classes for communicating from the master to slaves.
Utilities for the IMP.parallel module.
Distribute IMP tasks to multiple processors or machines.