IMP logo
IMP Reference Guide  2.12.0
The Integrative Modeling Platform
slave_handler.py
1 from __future__ import print_function
2 import sys
3 import traceback
4 import socket
5 import threading
6 from IMP.parallel import NetworkError
7 from IMP.parallel.master_communicator import MasterCommunicator
8 from IMP.parallel.util import _TaskWrapper, _ContextWrapper
9 from IMP.parallel.util import _ErrorWrapper, _HeartBeat, _SlaveAction
10 
11 
12 class _HeartBeatThread(threading.Thread):
13 
14  """Periodically send a 'heartbeat' back to the master, so that it can
15  distinguish between failed nodes and long calculations"""
16  timeout = 300
17 
18  def __init__(self, master):
19  threading.Thread.__init__(self)
20  self._master = master
21  self._event = threading.Event()
22 
23  def cancel(self):
24  """Stop the heartbeat"""
25  self._event.set()
26 
27  def run(self):
28  while True:
29  self._event.wait(self.timeout)
30  if self._event.isSet():
31  break
32  else:
33  self._master._send(_HeartBeat())
34 
35 
36 class SlaveHandler(object):
37 
38  def __init__(self, master_addr):
39  self._master_addr = master_addr
40 
41  def run(self):
42  print("Connect back to master at %s:%d with ID %s" \
43  % tuple(self._master_addr))
44  lock = threading.Lock()
45  master = MasterCommunicator(self._master_addr, lock)
46  hb = _HeartBeatThread(master)
47  hb.start()
48  try:
49  self._handle_network_io(master)
50  finally:
51  hb.cancel()
52 
53  def _send_exception_to_master(self, master, exc):
54  try:
55  exc_type, exc_value, tb = sys.exc_info()
56  master._send(_ErrorWrapper(exc,
57  traceback.format_exception(exc_type, exc_value, tb)))
58  except socket.error:
59  # ignore errors encountered while trying to send error to master
60  pass
61 
62  def _handle_network_io(self, master):
63  setup_args = ()
64  while True:
65  try:
66  obj = master._recv()
67  except NetworkError:
68  break
69  try:
70  if isinstance(obj, _ContextWrapper):
71  if obj.obj is None:
72  setup_args = ()
73  else:
74  setup_args = obj.obj()
75  elif isinstance(obj, _TaskWrapper):
76  master._send(obj.obj(*setup_args))
77  elif isinstance(obj, _SlaveAction):
78  obj.execute()
79  except NetworkError:
80  raise
81  except Exception as detail:
82  # Send all other exceptions back to the master and reraise
83  self._send_exception_to_master(master, detail)
84  raise
85 
86 
87 def main():
88  h = SlaveHandler([sys.argv[-3], int(sys.argv[-2]), sys.argv[-1]])
89  h.run()
90 
91 if __name__ == '__main__':
92  main()
Classes for communicating from the master to slaves.
Utilities for the IMP.parallel module.
Definition: parallel/util.py:1
Distribute IMP tasks to multiple processors or machines.