IMP logo
IMP Reference Guide  develop.63b38c487d,2024/12/22
The Integrative Modeling Platform
worker_handler.py
1 import sys
2 import traceback
3 import socket
4 import threading
5 from IMP.parallel import NetworkError
6 from IMP.parallel.manager_communicator import ManagerCommunicator
7 from IMP.parallel.util import _TaskWrapper, _ContextWrapper
8 from IMP.parallel.util import _ErrorWrapper, _HeartBeat, _WorkerAction
9 
10 
11 class _HeartBeatThread(threading.Thread):
12 
13  """Periodically send a 'heartbeat' back to the manager, so that it can
14  distinguish between failed nodes and long calculations"""
15  timeout = 300
16 
17  def __init__(self, manager):
18  threading.Thread.__init__(self)
19  self._manager = manager
20  self._event = threading.Event()
21 
22  def cancel(self):
23  """Stop the heartbeat"""
24  self._event.set()
25 
26  def run(self):
27  while True:
28  self._event.wait(self.timeout)
29  if self._event.isSet():
30  break
31  else:
32  self._manager._send(_HeartBeat())
33 
34 
35 class WorkerHandler:
36 
37  def __init__(self, manager_addr):
38  self._manager_addr = manager_addr
39 
40  def run(self):
41  print("Connect back to manager at %s:%d with ID %s"
42  % tuple(self._manager_addr))
43  lock = threading.Lock()
44  manager = ManagerCommunicator(self._manager_addr, lock)
45  hb = _HeartBeatThread(manager)
46  hb.start()
47  try:
48  self._handle_network_io(manager)
49  finally:
50  hb.cancel()
51 
52  def _send_exception_to_manager(self, manager, exc):
53  try:
54  exc_type, exc_value, tb = sys.exc_info()
55  manager._send(_ErrorWrapper(
56  exc, traceback.format_exception(exc_type, exc_value, tb)))
57  except socket.error:
58  # ignore errors encountered while trying to send error to manager
59  pass
60 
61  def _handle_network_io(self, manager):
62  setup_args = ()
63  while True:
64  try:
65  obj = manager._recv()
66  except NetworkError:
67  break
68  try:
69  if isinstance(obj, _ContextWrapper):
70  if obj.obj is None:
71  setup_args = ()
72  else:
73  setup_args = obj.obj()
74  elif isinstance(obj, _TaskWrapper):
75  manager._send(obj.obj(*setup_args))
76  elif isinstance(obj, _WorkerAction):
77  obj.execute()
78  except NetworkError:
79  raise
80  except Exception as detail:
81  # Send all other exceptions back to the manager and reraise
82  self._send_exception_to_manager(manager, detail)
83  raise
84 
85 
86 def main():
87  h = WorkerHandler([sys.argv[-3], int(sys.argv[-2]), sys.argv[-1]])
88  h.run()
89 
90 
91 if __name__ == '__main__':
92  main()
Classes for communicating from the manager to workers.
Utilities for the IMP.parallel module.
Definition: parallel/util.py:1
Distribute IMP tasks to multiple processors or machines.