IMP logo
IMP Reference Guide  develop.330bebda01,2025/01/21
The Integrative Modeling Platform
parallel/util.py
1 """@namespace IMP.parallel.util Utilities for the IMP.parallel module."""
2 
3 import socket
4 import select
5 import sys
6 import random
7 
8 
9 class _ListenSocket(socket.socket):
10 
11  def __init__(self, host, timeout):
12  socket.socket.__init__(self, socket.AF_INET, socket.SOCK_STREAM)
13  self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
14  self.port = self._bind_to_random_port(host, timeout)
15  self.settimeout(timeout)
16  self.listen(15)
17 
18  def _bind_to_random_port(self, host, timeout):
19  """Bind to a random high-numbered port"""
20  tries = 0
21  while True:
22  port = random.randint(10000, 60000)
23  try:
24  self.bind((host, port))
25  # gaierror is a subclass of error, so catch it separately
26  except socket.gaierror:
27  raise
28  except socket.error:
29  tries += 1
30  if tries > 10:
31  raise
32  else:
33  break
34  return port
35 
36 
37 class _ContextWrapper:
38 
39  def __init__(self, obj):
40  self.obj = obj
41 
42 
43 class _TaskWrapper:
44 
45  def __init__(self, obj):
46  self.obj = obj
47 
48 
49 class _ErrorWrapper:
50 
51  def __init__(self, obj, traceback):
52  self.obj = obj
53  self.traceback = traceback
54 
55 
56 class _HeartBeat:
57  pass
58 
59 
60 class _WorkerAction:
61  pass
62 
63 
64 class _SetPathAction(_WorkerAction):
65 
66  def __init__(self, path):
67  self.path = path
68 
69  def execute(self):
70  sys.path.insert(0, self.path)
71 
72 
73 if hasattr(select, 'poll'):
74  def _poll_events(listen_sock, workers, timeout):
75  fileno = listen_sock.fileno()
76  workermap = {fileno: listen_sock}
77 
78  p = select.poll()
79  p.register(fileno, select.POLLIN)
80  for worker in workers:
81  fileno = worker._socket.fileno()
82  workermap[fileno] = worker
83  p.register(fileno, select.POLLIN)
84  ready = p.poll(timeout * 1000)
85  return [workermap[fd[0]] for fd in ready]
86 
87 else:
88  # Use select on systems that don't have poll()
89  def _poll_events(listen_sock, workers, timeout):
90  fileno = listen_sock.fileno()
91  workermap = {fileno: listen_sock}
92  waitin = [fileno]
93 
94  for worker in workers:
95  fileno = worker._socket.fileno()
96  workermap[fileno] = worker
97  waitin.append(fileno)
98  (ready, rout, rerr) = select.select(waitin, [], [], timeout)
99  return [workermap[fd] for fd in ready]