1 """@namespace IMP.parallel.util Utilities for the IMP.parallel module."""
8 class _ListenSocket(socket.socket):
9 def __init__(self, host, timeout):
10 socket.socket.__init__(self, socket.AF_INET, socket.SOCK_STREAM)
11 self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
12 self.port = self._bind_to_random_port(host, timeout)
13 self.settimeout(timeout)
16 def _bind_to_random_port(self, host, timeout):
17 """Bind to a random high-numbered port"""
20 port = random.randint(10000, 60000)
22 self.bind((host, port))
24 except socket.gaierror:
34 class _ContextWrapper(object):
35 def __init__(self, obj):
39 class _TaskWrapper(object):
40 def __init__(self, obj):
44 class _ErrorWrapper(object):
45 def __init__(self, obj, traceback):
47 self.traceback = traceback
50 class _HeartBeat(object):
54 class _SlaveAction(object):
58 class _SetPathAction(_SlaveAction):
59 def __init__(self, path):
62 sys.path.insert(0, self.path)
65 if hasattr(select,
'poll'):
66 def _poll_events(listen_sock, slaves, timeout):
67 fileno = listen_sock.fileno()
68 slavemap = { fileno: listen_sock }
71 p.register(fileno, select.POLLIN)
73 fileno = slave._socket.fileno()
74 slavemap[fileno] = slave
75 p.register(fileno, select.POLLIN)
76 ready = p.poll(timeout * 1000)
77 return [slavemap[fd[0]]
for fd
in ready]
81 def _poll_events(listen_sock, slaves, timeout):
82 fileno = listen_sock.fileno()
83 slavemap = { fileno: listen_sock }
87 fileno = slave._socket.fileno()
88 slavemap[fileno] = slave
90 (ready,rout,rerr) = select.select(waitin, [], [], timeout)
91 return [slavemap[fd]
for fd
in ready]