11 class _HeartBeatThread(threading.Thread):
13 """Periodically send a 'heartbeat' back to the master, so that it can
14 distinguish between failed nodes and long calculations"""
17 def __init__(self, master):
18 threading.Thread.__init__(self)
20 self._event = threading.Event()
23 """Stop the heartbeat"""
28 self._event.wait(self.timeout)
29 if self._event.isSet():
32 self._master._send(_HeartBeat())
35 class SlaveHandler(object):
37 def __init__(self, master_addr):
38 self._master_addr = master_addr
41 print "Connect back to master at %s:%d with ID %s" \
42 % tuple(self._master_addr)
43 lock = threading.Lock()
44 master = MasterCommunicator(self._master_addr, lock)
45 hb = _HeartBeatThread(master)
48 self._handle_network_io(master)
52 def _send_exception_to_master(self, master, exc):
54 exc_type, exc_value, tb = sys.exc_info()
55 master._send(_ErrorWrapper(exc,
56 traceback.format_exception(exc_type, exc_value, tb)))
61 def _handle_network_io(self, master):
69 if isinstance(obj, _ContextWrapper):
73 setup_args = obj.obj()
74 elif isinstance(obj, _TaskWrapper):
75 master._send(obj.obj(*setup_args))
76 elif isinstance(obj, _SlaveAction):
80 except Exception
as detail:
82 self._send_exception_to_master(master, detail)
87 h = SlaveHandler([sys.argv[-3], int(sys.argv[-2]), sys.argv[-1]])
90 if __name__ ==
'__main__':
Classes for communicating from the master to slaves.
Utilities for the IMP.parallel module.
Distribute IMP tasks to multiple processors or machines.