6 class _ListenSocket(socket.socket):
7 def __init__(self, host, timeout):
8 socket.socket.__init__(self, socket.AF_INET, socket.SOCK_STREAM)
9 self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
10 self.port = self._bind_to_random_port(host, timeout)
11 self.settimeout(timeout)
14 def _bind_to_random_port(self, host, timeout):
15 """Bind to a random high-numbered port"""
18 port = random.randint(10000, 60000)
20 self.bind((host, port))
22 except socket.gaierror:
32 class _ContextWrapper(object):
33 def __init__(self, obj):
37 class _TaskWrapper(object):
38 def __init__(self, obj):
42 class _ErrorWrapper(object):
43 def __init__(self, obj, traceback):
45 self.traceback = traceback
48 class _HeartBeat(object):
52 class _SlaveAction(object):
56 class _SetPathAction(_SlaveAction):
57 def __init__(self, path):
60 sys.path.insert(0, self.path)
63 if hasattr(select,
'poll'):
64 def _poll_events(listen_sock, slaves, timeout):
65 fileno = listen_sock.fileno()
66 slavemap = { fileno: listen_sock }
69 p.register(fileno, select.POLLIN)
71 fileno = slave._socket.fileno()
72 slavemap[fileno] = slave
73 p.register(fileno, select.POLLIN)
74 ready = p.poll(timeout * 1000)
75 return [slavemap[fd[0]]
for fd
in ready]
79 def _poll_events(listen_sock, slaves, timeout):
80 fileno = listen_sock.fileno()
81 slavemap = { fileno: listen_sock }
85 fileno = slave._socket.fileno()
86 slavemap[fileno] = slave
88 (ready,rout,rerr) = select.select(waitin, [], [], timeout)
89 return [slavemap[fd]
for fd
in ready]