IMP  2.1.0
The Integrative Modeling Platform
util.py
1 """@namespace IMP.parallel.util Utilities for the IMP.parallel module."""
2 
3 import socket
4 import select
5 import sys
6 import random
7 
8 class _ListenSocket(socket.socket):
9  def __init__(self, host, timeout):
10  socket.socket.__init__(self, socket.AF_INET, socket.SOCK_STREAM)
11  self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
12  self.port = self._bind_to_random_port(host, timeout)
13  self.settimeout(timeout)
14  self.listen(15)
15 
16  def _bind_to_random_port(self, host, timeout):
17  """Bind to a random high-numbered port"""
18  tries = 0
19  while True:
20  port = random.randint(10000, 60000)
21  try:
22  self.bind((host, port))
23  # gaierror is a subclass of error, so catch it separately
24  except socket.gaierror:
25  raise
26  except socket.error:
27  tries += 1
28  if tries > 10: raise
29  else:
30  break
31  return port
32 
33 
34 class _ContextWrapper(object):
35  def __init__(self, obj):
36  self.obj = obj
37 
38 
39 class _TaskWrapper(object):
40  def __init__(self, obj):
41  self.obj = obj
42 
43 
44 class _ErrorWrapper(object):
45  def __init__(self, obj, traceback):
46  self.obj = obj
47  self.traceback = traceback
48 
49 
50 class _HeartBeat(object):
51  pass
52 
53 
54 class _SlaveAction(object):
55  pass
56 
57 
58 class _SetPathAction(_SlaveAction):
59  def __init__(self, path):
60  self.path = path
61  def execute(self):
62  sys.path.insert(0, self.path)
63 
64 
65 if hasattr(select, 'poll'):
66  def _poll_events(listen_sock, slaves, timeout):
67  fileno = listen_sock.fileno()
68  slavemap = { fileno: listen_sock }
69 
70  p = select.poll()
71  p.register(fileno, select.POLLIN)
72  for slave in slaves:
73  fileno = slave._socket.fileno()
74  slavemap[fileno] = slave
75  p.register(fileno, select.POLLIN)
76  ready = p.poll(timeout * 1000)
77  return [slavemap[fd[0]] for fd in ready]
78 
79 else:
80  # Use select on systems that don't have poll()
81  def _poll_events(listen_sock, slaves, timeout):
82  fileno = listen_sock.fileno()
83  slavemap = { fileno: listen_sock }
84  waitin = [fileno]
85 
86  for slave in slaves:
87  fileno = slave._socket.fileno()
88  slavemap[fileno] = slave
89  waitin.append(fileno)
90  (ready,rout,rerr) = select.select(waitin, [], [], timeout)
91  return [slavemap[fd] for fd in ready]