IMP logo
IMP Reference Guide  develop.78018a392b,2024/05/07
The Integrative Modeling Platform
worker_handler.py
1 from __future__ import print_function
2 import sys
3 import traceback
4 import socket
5 import threading
6 from IMP.parallel import NetworkError
7 from IMP.parallel.manager_communicator import ManagerCommunicator
8 from IMP.parallel.util import _TaskWrapper, _ContextWrapper
9 from IMP.parallel.util import _ErrorWrapper, _HeartBeat, _WorkerAction
10 
11 
12 class _HeartBeatThread(threading.Thread):
13 
14  """Periodically send a 'heartbeat' back to the manager, so that it can
15  distinguish between failed nodes and long calculations"""
16  timeout = 300
17 
18  def __init__(self, manager):
19  threading.Thread.__init__(self)
20  self._manager = manager
21  self._event = threading.Event()
22 
23  def cancel(self):
24  """Stop the heartbeat"""
25  self._event.set()
26 
27  def run(self):
28  while True:
29  self._event.wait(self.timeout)
30  if self._event.isSet():
31  break
32  else:
33  self._manager._send(_HeartBeat())
34 
35 
36 class WorkerHandler(object):
37 
38  def __init__(self, manager_addr):
39  self._manager_addr = manager_addr
40 
41  def run(self):
42  print("Connect back to manager at %s:%d with ID %s"
43  % tuple(self._manager_addr))
44  lock = threading.Lock()
45  manager = ManagerCommunicator(self._manager_addr, lock)
46  hb = _HeartBeatThread(manager)
47  hb.start()
48  try:
49  self._handle_network_io(manager)
50  finally:
51  hb.cancel()
52 
53  def _send_exception_to_manager(self, manager, exc):
54  try:
55  exc_type, exc_value, tb = sys.exc_info()
56  manager._send(_ErrorWrapper(
57  exc, traceback.format_exception(exc_type, exc_value, tb)))
58  except socket.error:
59  # ignore errors encountered while trying to send error to manager
60  pass
61 
62  def _handle_network_io(self, manager):
63  setup_args = ()
64  while True:
65  try:
66  obj = manager._recv()
67  except NetworkError:
68  break
69  try:
70  if isinstance(obj, _ContextWrapper):
71  if obj.obj is None:
72  setup_args = ()
73  else:
74  setup_args = obj.obj()
75  elif isinstance(obj, _TaskWrapper):
76  manager._send(obj.obj(*setup_args))
77  elif isinstance(obj, _WorkerAction):
78  obj.execute()
79  except NetworkError:
80  raise
81  except Exception as detail:
82  # Send all other exceptions back to the manager and reraise
83  self._send_exception_to_manager(manager, detail)
84  raise
85 
86 
87 def main():
88  h = WorkerHandler([sys.argv[-3], int(sys.argv[-2]), sys.argv[-1]])
89  h.run()
90 
91 
92 if __name__ == '__main__':
93  main()
Classes for communicating from the manager to workers.
Utilities for the IMP.parallel module.
Definition: parallel/util.py:1
Distribute IMP tasks to multiple processors or machines.