IMP  2.0.1
The Integrative Modeling Platform
slave_handler.py
1 import sys
2 import traceback
3 import socket
4 import threading
5 from IMP.parallel import NetworkError
6 from IMP.parallel.master_communicator import MasterCommunicator
7 from IMP.parallel.util import _TaskWrapper, _ContextWrapper
8 from IMP.parallel.util import _ErrorWrapper, _HeartBeat, _SlaveAction
9 
10 class _HeartBeatThread(threading.Thread):
11  """Periodically send a 'heartbeat' back to the master, so that it can
12  distinguish between failed nodes and long calculations"""
13  timeout = 300
14 
15  def __init__(self, master):
16  threading.Thread.__init__(self)
17  self._master = master
18  self._event = threading.Event()
19 
20  def cancel(self):
21  """Stop the heartbeat"""
22  self._event.set()
23 
24  def run(self):
25  while True:
26  self._event.wait(self.timeout)
27  if self._event.isSet():
28  break
29  else:
30  self._master._send(_HeartBeat())
31 
32 
33 class SlaveHandler(object):
34  def __init__(self, master_addr):
35  self._master_addr = master_addr
36 
37  def run(self):
38  print "Connect back to master at %s:%d with ID %s" \
39  % tuple(self._master_addr)
40  lock = threading.Lock()
41  master = MasterCommunicator(self._master_addr, lock)
42  hb = _HeartBeatThread(master)
43  hb.start()
44  try:
45  self._handle_network_io(master)
46  finally:
47  hb.cancel()
48 
49  def _send_exception_to_master(self, master, exc):
50  try:
51  exc_type, exc_value, tb = sys.exc_info()
52  master._send(_ErrorWrapper(exc,
53  traceback.format_exception(exc_type, exc_value, tb)))
54  except socket.error:
55  # ignore errors encountered while trying to send error to master
56  pass
57 
58  def _handle_network_io(self, master):
59  setup_args = ()
60  while True:
61  try:
62  obj = master._recv()
63  except NetworkError:
64  break
65  try:
66  if isinstance(obj, _ContextWrapper):
67  if obj.obj is None:
68  setup_args = ()
69  else:
70  setup_args = obj.obj()
71  elif isinstance(obj, _TaskWrapper):
72  master._send(obj.obj(*setup_args))
73  elif isinstance(obj, _SlaveAction):
74  obj.execute()
75  except NetworkError:
76  raise
77  except Exception, detail:
78  # Send all other exceptions back to the master and reraise
79  self._send_exception_to_master(master, detail)
80  raise
81 
82 
83 def main():
84  h = SlaveHandler([sys.argv[-3], int(sys.argv[-2]), sys.argv[-1]])
85  h.run()
86 
87 if __name__ == '__main__':
88  main()