IMP logo
IMP Reference Guide  2.13.0
The Integrative Modeling Platform
parallel/__init__.py
1 import socket
2 import time
3 import sys
4 import os
5 import re
6 import random
7 import socket
8 import xdrlib
9 try:
10  import cPickle as pickle
11 except ImportError:
12  import pickle
13 from IMP.parallel import slavestate
14 from IMP.parallel.subproc import _run_background, _Popen4
15 from IMP.parallel.util import _ListenSocket, _ErrorWrapper
16 from IMP.parallel.util import _TaskWrapper, _HeartBeat, _ContextWrapper
17 from IMP.parallel.util import _SetPathAction
18 
19 # Save sys.path at import time, so that slaves can import using the same
20 # path that works for the master imports
21 _import_time_path = sys.path[:]
22 
24  """Base class for all errors specific to the parallel module"""
25  pass
26 
27 
28 class _NoMoreTasksError(Error):
29  pass
30 
31 
33  """Error raised if all slaves failed, so tasks cannot be run"""
34  pass
35 
36 
38  """Error raised if a problem occurs with the network"""
39  pass
40 
41 
43  """Error raised if a slave has an unhandled exception"""
44  def __init__(self, exc, traceback, slave):
45  self.exc = exc
46  self.traceback = traceback
47  self.slave = slave
48 
49  def __str__(self):
50  errstr = str(self.exc.__class__).replace("exceptions.", "")
51  return "%s: %s from %s\nRemote traceback:\n%s" \
52  % (errstr, str(self.exc), str(self.slave), self.traceback)
53 
54 class _Communicator(object):
55  """Simple support for sending Python pickled objects over the network"""
56 
57  def __init__(self):
58  self._socket = None
59  self._ibuffer = b''
60 
61  def _send(self, obj):
62  p = xdrlib.Packer()
63  try:
64  p.pack_string(pickle.dumps(obj, -1))
65  # Python < 2.5 can fail trying to send Inf or NaN floats in binary
66  # mode, so fall back to the old protocol in this case:
67  except SystemError:
68  p.pack_string(pickle.dumps(obj, 0))
69  self._socket.sendall(p.get_buffer())
70 
71  def get_data_pending(self):
72  return len(self._ibuffer) > 0
73 
74  def _recv(self):
75  while True:
76  try:
77  obj, self._ibuffer = self._unpickle(self._ibuffer)
78  if isinstance(obj, _ErrorWrapper):
79  raise RemoteError(obj.obj, obj.traceback, self)
80  else:
81  return obj
82  except (IndexError, EOFError):
83  try:
84  data = self._socket.recv(4096)
85  except socket.error as detail:
86  raise NetworkError("Connection lost to %s: %s" \
87  % (str(self), str(detail)))
88  if len(data) > 0:
89  self._ibuffer += data
90  else:
91  raise NetworkError("%s closed connection" % str(self))
92 
93  def _unpickle(self, ibuffer):
94  p = xdrlib.Unpacker(ibuffer)
95  obj = p.unpack_string()
96  return (pickle.loads(obj), ibuffer[p.get_position():])
97 
98 
99 class Slave(_Communicator):
100  """Representation of a single slave.
101  Each slave uses a single thread of execution (i.e. a single CPU core)
102  to run tasks sequentially.
103  Slave is an abstract class; instead of using this class directly, use
104  a subclass such as LocalSlave or SGEQsubSlaveArray."""
105 
106  def __init__(self):
107  _Communicator.__init__(self)
108  self._state = slavestate.init
109  self._context = None
110  self._task = None
111  self.update_contact_time()
112 
113  def _start(self, command, unique_id, output):
114  """Start the slave running on the remote host; override in subclasses"""
115  self._state = slavestate.started
116 
117  def _accept_connection(self, sock):
118  self._socket = sock
119  self._state = slavestate.connected
120  self.update_contact_time()
121 
122  def _set_python_search_path(self, path):
123  self._send(_SetPathAction(path))
124 
125  def update_contact_time(self):
126  self.last_contact_time = time.time()
127 
128  def get_contact_timed_out(self, timeout):
129  return (time.time() - self.last_contact_time) > timeout
130 
131  def _start_task(self, task, context):
132  if not self._ready_for_task(context) and not self._ready_for_task(None):
133  raise TypeError("%s not ready for task" % str(self))
134  if self._context != context:
135  self._context = context
136  self._send(_ContextWrapper(context._startup))
137  self._state = slavestate.running_task
138  self._task = task
139  self._send(_TaskWrapper(task))
140 
141  def _get_finished_task(self):
142  while True:
143  r = self._recv()
144  self.update_contact_time()
145  if isinstance(r, _HeartBeat):
146  if not self.get_data_pending():
147  return None
148  else:
149  break
150  task = self._task
151  task._results = r
152  self._task = None
153  self._state = slavestate.connected
154  return task
155 
156  def _kill(self):
157  task = self._task
158  self._task = None
159  self._context = None
160  self._state = slavestate.dead
161  return task
162 
163  def _ready_to_start(self):
164  return self._state == slavestate.init
165 
166  def _ready_for_task(self, context):
167  return self._state == slavestate.connected \
168  and self._context == context
169 
170  def _running_task(self, context):
171  return self._state == slavestate.running_task \
172  and self._context == context
173 
174 
175 class SlaveArray(object):
176  """Representation of an array of slaves.
177  This is similar to Slave, except that it represents a collection of
178  slaves that are controlled together, such as a batch submission system
179  array job on a compute cluster.
180  Slave is an abstract class; instead of using this class directly, use
181  a subclass such as SGEQsubSlaveArray."""
182 
183  def _get_slaves(self):
184  """Return a list of Slave objects contained within this array"""
185  pass
186 
187  def _start(self):
188  """Do any necessary startup after all contained Slaves have started"""
189  pass
190 
191 
192 class LocalSlave(Slave):
193  """A slave running on the same machine as the master."""
194 
195  def _start(self, command, unique_id, output):
196  Slave._start(self, command, unique_id, output)
197  cmdline = "%s %s" % (command, unique_id)
198  _run_background(cmdline, output)
199 
200  def __repr__(self):
201  return "<LocalSlave>"
202 
203 
204 class _SGEQsubSlave(Slave):
205  def __init__(self, array):
206  Slave.__init__(self)
207  self._jobid = None
208  self._array = array
209 
210  def _start(self, command, unique_id, output):
211  Slave._start(self, command, unique_id, output)
212  self._array._slave_started(unique_id, output, self)
213 
214  def __repr__(self):
215  jobid = self._jobid
216  if jobid is None:
217  jobid = '(unknown)'
218  return "<SGE qsub slave, ID %s>" % jobid
219 
220 
222  """An array of slaves on a Sun Grid Engine system, started with 'qsub'.
223  To use this class, the master process must be running on a machine that
224  can submit Sun Grid Engine (SGE) jobs using the 'qsub' command (this
225  is termed a 'submit host' by SGE). The class starts an SGE job array
226  (every slave has the same SGE job ID, but a different task ID).
227  """
228 
229 
230  standard_options = '-j y -cwd -r n -o sge-errors'
231 
232  def __init__(self, numslave, options):
233  """Constructor.
234  @param numslave The number of slaves, which corresponds to the
235  number of tasks in the SGE job.
236  @param options A string of SGE options that are passed on the 'qsub'
237  command line. This is added to standard_options.
238  """
239  self._numslave = numslave
240  self._options = options
241  self._starting_slaves = []
242  self._jobid = None
243 
244  def _get_slaves(self):
245  """Return a list of Slave objects contained within this array"""
246  return [_SGEQsubSlave(self) for x in range(self._numslave)]
247 
248  def _slave_started(self, command, output, slave):
249  self._starting_slaves.append((command, output, slave))
250 
251  def _start(self, command):
252  qsub = "qsub -S /bin/sh %s %s -t 1-%d" % \
253  (self._options, self.standard_options,
254  len(self._starting_slaves))
255  print(qsub)
256  a = _Popen4(qsub)
257  (inp, out) = (a.stdin, a.stdout)
258  slave_uid = " ".join([repr(s[0]) for s in self._starting_slaves])
259  slave_out = " ".join([repr(s[1]) for s in self._starting_slaves])
260  inp.write("#!/bin/sh\n")
261  inp.write("uid=( '' %s )\n" % slave_uid)
262  inp.write("out=( '' %s )\n" % slave_out)
263  inp.write("myuid=${uid[$SGE_TASK_ID]}\n")
264  inp.write("myout=${out[$SGE_TASK_ID]}\n")
265  inp.write("%s $myuid > $myout 2>&1\n" % command)
266  inp.close()
267  outlines = out.readlines()
268  out.close()
269  for line in outlines:
270  print(line.rstrip('\r\n'))
271  a.require_clean_exit()
272  self._set_jobid(outlines)
273  self._starting_slaves = []
274 
275  def _set_jobid(self, outlines):
276  """Try to figure out the job ID from the SGE qsub output"""
277  if len(outlines) > 0:
278  m = re.compile(r"\d+").search(outlines[0])
279  if m:
280  self._jobid = int(m.group())
281  for (num, slave) in enumerate(self._starting_slaves):
282  slave[2]._jobid = "%d.%d" % (self._jobid, num+1)
283 
284 
285 class _SGEPESlave(Slave):
286  def __init__(self, host):
287  Slave.__init__(self)
288  self._host = host
289 
290  def _start(self, command, unique_id, output):
291  Slave._start(self, command, unique_id, output)
292  cmdline = "qrsh -inherit -V %s %s %s" % (self._host, command, unique_id)
293  _run_background(cmdline, output)
294 
295  def __repr__(self):
296  return "<SGE PE slave on %s>" % self._host
297 
298 
300  """An array of slaves in a Sun Grid Engine system parallel environment.
301  In order to use this class, the master must be run via Sun Grid Engine's
302  'qsub' command and submitted to a parallel environment using the qsub
303  -pe option. This class will start slaves on every node in the parallel
304  environment (including the node running the master). Each slave is
305  started using the 'qrsh' command with the '-inherit' option."""
306 
307  def _get_slaves(self):
308  slaves = []
309 
310  pe = os.environ['PE_HOSTFILE']
311  fh = open(pe, "r")
312  while True:
313  line = fh.readline()
314  if line == '':
315  break
316  (node, num, queue) = line.split(None, 2)
317  for i in range(int(num)):
318  slaves.append(_SGEPESlave(node))
319  # Replace first slave with a local slave, as this is ourself, and SGE
320  # won't let us start this process with qrsh (as we are already
321  # occupying the slot)
322  if len(slaves) > 0:
323  slaves[0] = LocalSlave()
324  return slaves
325 
326 
327 class Context(object):
328  """A collection of tasks that run in the same environment.
329  Context objects are typically created by calling Manager::get_context().
330  """
331  def __init__(self, manager, startup=None):
332  """Constructor."""
333  self._manager = manager
334  self._startup = startup
335  self._tasks = []
336 
337  def add_task(self, task):
338  """Add a task to this context.
339  Tasks are any Python callable object that can be pickled (e.g. a
340  function or a class that implements the \_\_call\_\_ method). When
341  the task is run on the slave its arguments are the return value
342  from this context's startup function."""
343  self._tasks.append(task)
344 
345  def get_results_unordered(self):
346  """Run all of the tasks on available slaves, and return results.
347  If there are more tasks than slaves, subsequent tasks are
348  started only once a running task completes: each slave only runs
349  a single task at a time. As each task completes, the return value(s)
350  from the task callable are returned from this method, as a
351  Python generator. Note that the results are returned in the order
352  that tasks complete, which may not be the same as the order they
353  were submitted in.
355  @exception NoMoreSlavesError there are no slaves available
356  to run the tasks (or they all failed during execution).
357  @exception RemoteError a slave encountered an unhandled exception.
358  @exception NetworkError the master lost (or was unable to
359  establish) communication with any slave.
360  """
361  return self._manager._get_results_unordered(self)
362 
363 
364 class Manager(object):
365  """Manages slaves and contexts.
366  """
367 
368  connect_timeout = 7200
369 
370  # Note: must be higher than that in slave_handler._HeartBeatThread
371  heartbeat_timeout = 7200
372 
373  def __init__(self, python=None, host=None, output='slave%d.output'):
374  """Constructor.
375  @param python If not None, the command to run to start a Python
376  interpreter that can import the IMP module. Otherwise,
377  the same interpreter that the master is currently using
378  is used. This is passed to the shell, so a full command
379  line (including multiple words separated by spaces) can
380  be used if necessary.
381  @param host The hostname that slaves use to connect back to the
382  master. If not specified, the master machine's primary
383  IP address is used. On multi-homed machines, such as
384  compute cluster headnodes, this may need to be changed
385  to allow all slaves to reach the master (typically the
386  name of the machine's internal network address is
387  needed). If only running local slaves, 'localhost' can
388  be used to prohibit connections across the network.
389  @param output A format string used to name slave output files. It is
390  given the numeric slave id, so for example the default
391  value 'slave\%d.output' will yield output files called
392  slave0.output, slave1.output, etc.
393  """
394  if python is None:
395  self._python = sys.executable
396  else:
397  self._python = python
398  self._host = host
399  self._output = output
400  self._all_slaves = []
401  self._starting_slaves = {}
402  self._slave_arrays = []
403  if host:
404  self._host = host
405  else:
406  # Get primary IP address of this machine
407  self._host = socket.gethostbyname_ex(socket.gethostname())[-1][0]
408  self._listen_sock = _ListenSocket(self._host, self.connect_timeout)
409 
410  def add_slave(self, slave):
411  """Add a Slave object."""
412  if hasattr(slave, '_get_slaves'):
413  self._slave_arrays.append(slave)
414  else:
415  self._all_slaves.append(slave)
416 
417  def get_context(self, startup=None):
418  """Create and return a new Context in which tasks can be run.
419  @param startup If not None, a callable (Python function or class
420  that implements the \_\_call\_\_ method) that sets up
421  the slave to run tasks. This method is only called
422  once per slave. The return values from this method
423  are passed to the task object when it runs on
424  the slave.
425  @return A new Context object.
426  """
427  return Context(self, startup)
428 
429  def _get_results_unordered(self, context):
430  """Run all of a context's tasks, and yield results"""
431  self._send_tasks_to_slaves(context)
432  try:
433  while True:
434  for task in self._get_finished_tasks(context):
435  tasks_queued = len(context._tasks)
436  yield task._results
437  # If the user added more tasks while processing these
438  # results, make sure they get sent off to the slaves
439  if len(context._tasks) > tasks_queued:
440  self._send_tasks_to_slaves(context)
441  except _NoMoreTasksError:
442  return
443 
444  def _start_all_slaves(self):
445  for array in self._slave_arrays:
446  self._all_slaves.extend(array._get_slaves())
447 
448  command = ("%s -c \"import IMP.parallel.slave_handler as s; s.main()\" "
449  "%s %d") % (self._python, self._host, self._listen_sock.port)
450 
451  for (num, slave) in enumerate(self._all_slaves):
452  if slave._ready_to_start():
453  unique_id = self._get_unique_id(num)
454  self._starting_slaves[unique_id] = slave
455  slave._start(command, unique_id, self._output % num)
456 
457  for array in self._slave_arrays:
458  array._start(command)
459  self._slave_arrays = []
460 
461  def _get_unique_id(self, num):
462  id = "%d:" % num
463  for i in range(0, 8):
464  id += chr(random.randint(0, 25) + ord('A'))
465  return id
466 
467  def _send_tasks_to_slaves(self, context):
468  self._start_all_slaves()
469  # Prefer slaves that already have the task context
470  available_slaves = [a for a in self._all_slaves
471  if a._ready_for_task(context)] + \
472  [a for a in self._all_slaves
473  if a._ready_for_task(None)]
474  for slave in available_slaves:
475  if len(context._tasks) == 0:
476  break
477  else:
478  self._send_task_to_slave(slave, context)
479 
480  def _send_task_to_slave(self, slave, context):
481  if len(context._tasks) == 0:
482  return
483  t = context._tasks[0]
484  try:
485  slave._start_task(t, context)
486  context._tasks.pop(0)
487  except socket.error as detail:
488  slave._kill()
489 
490  def _get_finished_tasks(self, context):
491  while True:
492  events = self._get_network_events(context)
493  if len(events) == 0:
494  self._kill_all_running_slaves(context)
495  for event in events:
496  task = self._process_event(event, context)
497  if task:
498  yield task
499 
500  def _process_event(self, event, context):
501  if event == self._listen_sock:
502  # New slave just connected
503  (conn, addr) = self._listen_sock.accept()
504  new_slave = self._accept_slave(conn, context)
505  elif event._running_task(context):
506  try:
507  task = event._get_finished_task()
508  if task:
509  self._send_task_to_slave(event, context)
510  return task
511  else: # the slave sent back a heartbeat
512  self._kill_timed_out_slaves(context)
513  except NetworkError as detail:
514  task = event._kill()
515  print("Slave %s failed (%s): rescheduling task %s" \
516  % (str(event), str(detail), str(task)))
517  context._tasks.append(task)
518  self._send_tasks_to_slaves(context)
519  else:
520  pass # Slave not running a task
521 
522  def _kill_timed_out_slaves(self, context):
523  timed_out = [a for a in self._all_slaves if a._running_task(context) \
524  and a.get_contact_timed_out(self.heartbeat_timeout)]
525  for slave in timed_out:
526  task = slave._kill()
527  print("Did not hear from slave %s in %d seconds; rescheduling "
528  "task %s" % (str(slave), self.heartbeat_timeout, str(task)))
529  context._tasks.append(task)
530  if len(timed_out) > 0:
531  self._send_tasks_to_slaves(context)
532 
533  def _kill_all_running_slaves(self, context):
534  running = [a for a in self._all_slaves if a._running_task(context)]
535  for slave in running:
536  task = slave._kill()
537  context._tasks.append(task)
538  raise NetworkError("Did not hear from any running slave in "
539  "%d seconds" % self.heartbeat_timeout)
540 
541  def _accept_slave(self, sock, context):
542  sock.setblocking(True)
543  identifier = sock.recv(1024)
544  if identifier:
545  identifier = identifier.decode('ascii')
546  if identifier and identifier in self._starting_slaves:
547  slave = self._starting_slaves.pop(identifier)
548  slave._accept_connection(sock)
549  print("Identified slave %s " % str(slave))
550  self._init_slave(slave)
551  self._send_task_to_slave(slave, context)
552  return slave
553  else:
554  print("Ignoring request from unknown slave")
555 
556  def _init_slave(self, slave):
557  if _import_time_path[0] != '':
558  slave._set_python_search_path(_import_time_path[0])
559  if sys.path[0] != '' and sys.path[0] != _import_time_path[0]:
560  slave._set_python_search_path(sys.path[0])
561 
562  def _get_network_events(self, context):
563  running = [a for a in self._all_slaves if a._running_task(context)]
564  if len(running) == 0:
565  if len(context._tasks) == 0:
566  raise _NoMoreTasksError()
567  elif len(self._starting_slaves) == 0:
568  raise NoMoreSlavesError("Ran out of slaves to run tasks")
569  # Otherwise, wait for starting slaves to connect back and get tasks
570 
571  return util._poll_events(self._listen_sock, running,
572  self.heartbeat_timeout)
573 
574 
576  '''Return the version of this module, as a string'''
577  return "2.13.0"
578 
579 def get_module_name():
580  '''Return the fully-qualified name of this module'''
581  return "IMP::parallel"
582 
583 def get_data_path(fname):
584  '''Return the full path to one of this module's data files'''
585  import IMP
586  return IMP._get_module_data_path("parallel", fname)
587 
588 def get_example_path(fname):
589  '''Return the full path to one of this module's example files'''
590  import IMP
591  return IMP._get_module_example_path("parallel", fname)
Representation of an array of slaves.
Error raised if a slave has an unhandled exception.
def get_module_name
Return the fully-qualified name of this module.
Subprocess handling.
Definition: subproc.py:1
def __init__
Constructor.
An array of slaves in a Sun Grid Engine system parallel environment.
A collection of tasks that run in the same environment.
Representation of a single slave.
def add_task
Add a task to this context.
Utilities for the IMP.parallel module.
Definition: parallel/util.py:1
Base class for all errors specific to the parallel module.
The general base class for IMP exceptions.
Definition: exception.h:49
def get_module_version
Return the version of this module, as a string.
def get_example_path
Return the full path to one of this module's example files.
def add_slave
Add a Slave object.
def get_context
Create and return a new Context in which tasks can be run.
def get_data_path
Return the full path to one of this module's data files.
Error raised if a problem occurs with the network.
Error raised if all slaves failed, so tasks cannot be run.
def __init__
Constructor.
Distribute IMP tasks to multiple processors or machines.
An array of slaves on a Sun Grid Engine system, started with 'qsub'.
def get_results_unordered
Run all of the tasks on available slaves, and return results.
Manages slaves and contexts.
A slave running on the same machine as the master.