IMP  2.2.1
The Integrative Modeling Platform
slave_handler.py
1 import sys
2 import traceback
3 import socket
4 import threading
5 from IMP.parallel import NetworkError
6 from IMP.parallel.master_communicator import MasterCommunicator
7 from IMP.parallel.util import _TaskWrapper, _ContextWrapper
8 from IMP.parallel.util import _ErrorWrapper, _HeartBeat, _SlaveAction
9 
10 
11 class _HeartBeatThread(threading.Thread):
12 
13  """Periodically send a 'heartbeat' back to the master, so that it can
14  distinguish between failed nodes and long calculations"""
15  timeout = 300
16 
17  def __init__(self, master):
18  threading.Thread.__init__(self)
19  self._master = master
20  self._event = threading.Event()
21 
22  def cancel(self):
23  """Stop the heartbeat"""
24  self._event.set()
25 
26  def run(self):
27  while True:
28  self._event.wait(self.timeout)
29  if self._event.isSet():
30  break
31  else:
32  self._master._send(_HeartBeat())
33 
34 
35 class SlaveHandler(object):
36 
37  def __init__(self, master_addr):
38  self._master_addr = master_addr
39 
40  def run(self):
41  print "Connect back to master at %s:%d with ID %s" \
42  % tuple(self._master_addr)
43  lock = threading.Lock()
44  master = MasterCommunicator(self._master_addr, lock)
45  hb = _HeartBeatThread(master)
46  hb.start()
47  try:
48  self._handle_network_io(master)
49  finally:
50  hb.cancel()
51 
52  def _send_exception_to_master(self, master, exc):
53  try:
54  exc_type, exc_value, tb = sys.exc_info()
55  master._send(_ErrorWrapper(exc,
56  traceback.format_exception(exc_type, exc_value, tb)))
57  except socket.error:
58  # ignore errors encountered while trying to send error to master
59  pass
60 
61  def _handle_network_io(self, master):
62  setup_args = ()
63  while True:
64  try:
65  obj = master._recv()
66  except NetworkError:
67  break
68  try:
69  if isinstance(obj, _ContextWrapper):
70  if obj.obj is None:
71  setup_args = ()
72  else:
73  setup_args = obj.obj()
74  elif isinstance(obj, _TaskWrapper):
75  master._send(obj.obj(*setup_args))
76  elif isinstance(obj, _SlaveAction):
77  obj.execute()
78  except NetworkError:
79  raise
80  except Exception, detail:
81  # Send all other exceptions back to the master and reraise
82  self._send_exception_to_master(master, detail)
83  raise
84 
85 
86 def main():
87  h = SlaveHandler([sys.argv[-3], int(sys.argv[-2]), sys.argv[-1]])
88  h.run()
89 
90 if __name__ == '__main__':
91  main()
Classes for communicating from the master to slaves.
Utilities for the IMP.parallel module.
Definition: util.py:1
See IMP.parallel for more information.