1 """@namespace IMP.parallel.util Utilities for the IMP.parallel module."""
9 class _ListenSocket(socket.socket):
11 def __init__(self, host, timeout):
12 socket.socket.__init__(self, socket.AF_INET, socket.SOCK_STREAM)
13 self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
14 self.port = self._bind_to_random_port(host, timeout)
15 self.settimeout(timeout)
18 def _bind_to_random_port(self, host, timeout):
19 """Bind to a random high-numbered port"""
22 port = random.randint(10000, 60000)
24 self.bind((host, port))
26 except socket.gaierror:
37 class _ContextWrapper(object):
39 def __init__(self, obj):
43 class _TaskWrapper(object):
45 def __init__(self, obj):
49 class _ErrorWrapper(object):
51 def __init__(self, obj, traceback):
53 self.traceback = traceback
56 class _HeartBeat(object):
60 class _WorkerAction(object):
64 class _SetPathAction(_WorkerAction):
66 def __init__(self, path):
70 sys.path.insert(0, self.path)
73 if hasattr(select,
'poll'):
74 def _poll_events(listen_sock, workers, timeout):
75 fileno = listen_sock.fileno()
76 workermap = {fileno: listen_sock}
79 p.register(fileno, select.POLLIN)
80 for worker
in workers:
81 fileno = worker._socket.fileno()
82 workermap[fileno] = worker
83 p.register(fileno, select.POLLIN)
84 ready = p.poll(timeout * 1000)
85 return [workermap[fd[0]]
for fd
in ready]
89 def _poll_events(listen_sock, workers, timeout):
90 fileno = listen_sock.fileno()
91 workermap = {fileno: listen_sock}
94 for worker
in workers:
95 fileno = worker._socket.fileno()
96 workermap[fileno] = worker
98 (ready, rout, rerr) = select.select(waitin, [], [], timeout)
99 return [workermap[fd]
for fd
in ready]