10 class _HeartBeatThread(threading.Thread):
11 """Periodically send a 'heartbeat' back to the master, so that it can
12 distinguish between failed nodes and long calculations"""
15 def __init__(self, master):
16 threading.Thread.__init__(self)
18 self._event = threading.Event()
21 """Stop the heartbeat"""
26 self._event.wait(self.timeout)
27 if self._event.isSet():
30 self._master._send(_HeartBeat())
33 class SlaveHandler(object):
34 def __init__(self, master_addr):
35 self._master_addr = master_addr
38 print "Connect back to master at %s:%d with ID %s" \
39 % tuple(self._master_addr)
40 lock = threading.Lock()
41 master = MasterCommunicator(self._master_addr, lock)
42 hb = _HeartBeatThread(master)
45 self._handle_network_io(master)
49 def _send_exception_to_master(self, master, exc):
51 exc_type, exc_value, tb = sys.exc_info()
52 master._send(_ErrorWrapper(exc,
53 traceback.format_exception(exc_type, exc_value, tb)))
58 def _handle_network_io(self, master):
66 if isinstance(obj, _ContextWrapper):
70 setup_args = obj.obj()
71 elif isinstance(obj, _TaskWrapper):
72 master._send(obj.obj(*setup_args))
73 elif isinstance(obj, _SlaveAction):
77 except Exception, detail:
79 self._send_exception_to_master(master, detail)
84 h = SlaveHandler([sys.argv[-3], int(sys.argv[-2]), sys.argv[-1]])
87 if __name__ ==
'__main__':