IMP logo
IMP Reference Guide  2.10.1
The Integrative Modeling Platform
parallel/__init__.py
1 # This file was automatically generated by SWIG (http://www.swig.org).
2 # Version 3.0.10
3 #
4 # Do not make changes to this file unless you know what you are doing--modify
5 # the SWIG interface file instead.
6 
7 # This wrapper is part of IMP,
8 # Copyright 2007-2018 IMP Inventors. All rights reserved.
9 
10 from __future__ import print_function, division, absolute_import
11 
12 
13 
14 
15 from sys import version_info as _swig_python_version_info
16 if _swig_python_version_info >= (2, 7, 0):
17  def swig_import_helper():
18  import importlib
19  pkg = __name__.rpartition('.')[0]
20  mname = '.'.join((pkg, '_IMP_parallel')).lstrip('.')
21  try:
22  return importlib.import_module(mname)
23  except ImportError:
24  return importlib.import_module('_IMP_parallel')
25  _IMP_parallel = swig_import_helper()
26  del swig_import_helper
27 elif _swig_python_version_info >= (2, 6, 0):
28  def swig_import_helper():
29  from os.path import dirname
30  import imp
31  fp = None
32  try:
33  fp, pathname, description = imp.find_module('_IMP_parallel', [dirname(__file__)])
34  except ImportError:
35  import _IMP_parallel
36  return _IMP_parallel
37  if fp is not None:
38  try:
39  _mod = imp.load_module('_IMP_parallel', fp, pathname, description)
40  finally:
41  fp.close()
42  return _mod
43  _IMP_parallel = swig_import_helper()
44  del swig_import_helper
45 else:
46  import _IMP_parallel
47 del _swig_python_version_info
48 try:
49  _swig_property = property
50 except NameError:
51  pass # Python < 2.2 doesn't have 'property'.
52 
53 try:
54  import builtins as __builtin__
55 except ImportError:
56  import __builtin__
57 
58 def _swig_setattr_nondynamic(self, class_type, name, value, static=1):
59  if (name == "thisown"):
60  return self.this.own(value)
61  if (name == "this"):
62  if type(value).__name__ == 'SwigPyObject':
63  self.__dict__[name] = value
64  return
65  method = class_type.__swig_setmethods__.get(name, None)
66  if method:
67  return method(self, value)
68  if (not static):
69  object.__setattr__(self, name, value)
70  else:
71  raise AttributeError("You cannot add attributes to %s" % self)
72 
73 
74 def _swig_setattr(self, class_type, name, value):
75  return _swig_setattr_nondynamic(self, class_type, name, value, 0)
76 
77 
78 def _swig_getattr(self, class_type, name):
79  if (name == "thisown"):
80  return self.this.own()
81  method = class_type.__swig_getmethods__.get(name, None)
82  if method:
83  return method(self)
84  raise AttributeError("'%s' object has no attribute '%s'" % (class_type.__name__, name))
85 
86 
87 def _swig_repr(self):
88  try:
89  strthis = "proxy of " + self.this.__repr__()
90  except __builtin__.Exception:
91  strthis = ""
92  return "<%s.%s; %s >" % (self.__class__.__module__, self.__class__.__name__, strthis,)
93 
94 
95 def _swig_setattr_nondynamic_method(set):
96  def set_attr(self, name, value):
97  if (name == "thisown"):
98  return self.this.own(value)
99  if hasattr(self, name) or (name == "this"):
100  set(self, name, value)
101  else:
102  raise AttributeError("You cannot add attributes to %s" % self)
103  return set_attr
104 
105 
106 try:
107  import weakref
108  weakref_proxy = weakref.proxy
109 except __builtin__.Exception:
110  weakref_proxy = lambda x: x
111 
112 
113 class IMP_PARALLEL_SwigPyIterator(object):
114  """Proxy of C++ swig::IMP_PARALLEL_SwigPyIterator class."""
115 
116  thisown = _swig_property(lambda x: x.this.own(), lambda x, v: x.this.own(v), doc='The membership flag')
117 
118  def __init__(self, *args, **kwargs):
119  raise AttributeError("No constructor defined - class is abstract")
120  __repr__ = _swig_repr
121  __swig_destroy__ = _IMP_parallel.delete_IMP_PARALLEL_SwigPyIterator
122  __del__ = lambda self: None
123 
124  def value(self):
125  """value(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
126  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_value(self)
127 
128 
129  def incr(self, n=1):
130  """
131  incr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
132  incr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
133  """
134  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_incr(self, n)
135 
136 
137  def decr(self, n=1):
138  """
139  decr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
140  decr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
141  """
142  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_decr(self, n)
143 
144 
145  def distance(self, x):
146  """distance(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t"""
147  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_distance(self, x)
148 
149 
150  def equal(self, x):
151  """equal(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
152  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_equal(self, x)
153 
154 
155  def copy(self):
156  """copy(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator"""
157  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_copy(self)
158 
159 
160  def next(self):
161  """next(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
162  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_next(self)
163 
164 
165  def __next__(self):
166  """__next__(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
167  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___next__(self)
168 
169 
170  def previous(self):
171  """previous(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
172  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_previous(self)
173 
174 
175  def advance(self, n):
176  """advance(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
177  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_advance(self, n)
178 
179 
180  def __eq__(self, x):
181  """__eq__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
182  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___eq__(self, x)
183 
184 
185  def __ne__(self, x):
186  """__ne__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
187  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___ne__(self, x)
188 
189 
190  def __iadd__(self, n):
191  """__iadd__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
192  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___iadd__(self, n)
193 
194 
195  def __isub__(self, n):
196  """__isub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
197  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___isub__(self, n)
198 
199 
200  def __add__(self, n):
201  """__add__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
202  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___add__(self, n)
203 
204 
205  def __sub__(self, *args):
206  """
207  __sub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator
208  __sub__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t
209  """
210  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___sub__(self, *args)
211 
212  def __iter__(self):
213  return self
214 IMP_PARALLEL_SwigPyIterator_swigregister = _IMP_parallel.IMP_PARALLEL_SwigPyIterator_swigregister
215 IMP_PARALLEL_SwigPyIterator_swigregister(IMP_PARALLEL_SwigPyIterator)
216 
217 
218 _value_types=[]
219 _object_types=[]
220 _raii_types=[]
221 _plural_types=[]
222 
223 IMP_DEBUG = _IMP_parallel.IMP_DEBUG
224 IMP_RELEASE = _IMP_parallel.IMP_RELEASE
225 IMP_SILENT = _IMP_parallel.IMP_SILENT
226 IMP_PROGRESS = _IMP_parallel.IMP_PROGRESS
227 IMP_TERSE = _IMP_parallel.IMP_TERSE
228 IMP_VERBOSE = _IMP_parallel.IMP_VERBOSE
229 IMP_MEMORY = _IMP_parallel.IMP_MEMORY
230 IMP_NONE = _IMP_parallel.IMP_NONE
231 IMP_USAGE = _IMP_parallel.IMP_USAGE
232 IMP_INTERNAL = _IMP_parallel.IMP_INTERNAL
233 IMP_KERNEL_HAS_LOG4CXX = _IMP_parallel.IMP_KERNEL_HAS_LOG4CXX
234 IMP_COMPILER_HAS_AUTO = _IMP_parallel.IMP_COMPILER_HAS_AUTO
235 IMP_COMPILER_HAS_DEBUG_VECTOR = _IMP_parallel.IMP_COMPILER_HAS_DEBUG_VECTOR
236 IMP_COMPILER_HAS_UNIQUE_PTR = _IMP_parallel.IMP_COMPILER_HAS_UNIQUE_PTR
237 IMP_KERNEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_KERNEL_HAS_BOOST_RANDOM
238 IMP_KERNEL_HAS_GPERFTOOLS = _IMP_parallel.IMP_KERNEL_HAS_GPERFTOOLS
239 IMP_KERNEL_HAS_TCMALLOC_HEAPCHECKER = _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPCHECKER
240 IMP_KERNEL_HAS_TCMALLOC_HEAPPROFILER = _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPPROFILER
241 IMPKERNEL_SHOW_WARNINGS = _IMP_parallel.IMPKERNEL_SHOW_WARNINGS
242 
243 import sys
244 class _DirectorObjects(object):
245  """@internal Simple class to keep references to director objects
246  to prevent premature deletion."""
247  def __init__(self):
248  self._objects = []
249  def register(self, obj):
250  """Take a reference to a director object; will only work for
251  refcounted C++ classes"""
252  if hasattr(obj, 'get_ref_count'):
253  self._objects.append(obj)
254  def cleanup(self):
255  """Only drop our reference and allow cleanup by Python if no other
256  Python references exist (we hold 3 references: one in self._objects,
257  one in x, and one in the argument list for getrefcount) *and* no
258  other C++ references exist (the Python object always holds one)"""
259  objs = [x for x in self._objects if sys.getrefcount(x) > 3 \
260  or x.get_ref_count() > 1]
261 # Do in two steps so the references are kept until the end of the
262 # function (deleting references may trigger a fresh call to this method)
263  self._objects = objs
264  def get_object_count(self):
265  """Get number of director objects (useful for testing only)"""
266  return len(self._objects)
267 _director_objects = _DirectorObjects()
268 
269 class _ostream(object):
270  """Proxy of C++ std::ostream class."""
271 
272  thisown = _swig_property(lambda x: x.this.own(), lambda x, v: x.this.own(v), doc='The membership flag')
273 
274  def __init__(self, *args, **kwargs):
275  raise AttributeError("No constructor defined")
276  __repr__ = _swig_repr
277 
278  def write(self, osa_buf):
279  """write(_ostream self, char const * osa_buf)"""
280  return _IMP_parallel._ostream_write(self, osa_buf)
281 
282 _ostream_swigregister = _IMP_parallel._ostream_swigregister
283 _ostream_swigregister(_ostream)
284 
285 IMP_COMPILER_HAS_OVERRIDE = _IMP_parallel.IMP_COMPILER_HAS_OVERRIDE
286 IMP_COMPILER_HAS_FINAL = _IMP_parallel.IMP_COMPILER_HAS_FINAL
287 IMP_HAS_NOEXCEPT = _IMP_parallel.IMP_HAS_NOEXCEPT
288 IMP_C_OPEN_BINARY = _IMP_parallel.IMP_C_OPEN_BINARY
289 import IMP
290 IMP_PARALLEL_HAS_BOOST_FILESYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_FILESYSTEM
291 IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS
292 IMP_PARALLEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_RANDOM
293 IMP_PARALLEL_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_SYSTEM
294 IMPPARALLEL_SHOW_WARNINGS = _IMP_parallel.IMPPARALLEL_SHOW_WARNINGS
295 
296 
297 import socket
298 import time
299 import sys
300 import os
301 import re
302 import random
303 import socket
304 import xdrlib
305 try:
306  import cPickle as pickle
307 except ImportError:
308  import pickle
309 from IMP.parallel import slavestate
310 from IMP.parallel.subproc import _run_background, _Popen4
311 from IMP.parallel.util import _ListenSocket, _ErrorWrapper
312 from IMP.parallel.util import _TaskWrapper, _HeartBeat, _ContextWrapper
313 from IMP.parallel.util import _SetPathAction
314 
315 # Save sys.path at import time, so that slaves can import using the same
316 # path that works for the master imports
317 _import_time_path = sys.path[:]
318 
320  """Base class for all errors specific to the parallel module"""
321  pass
322 
323 
324 class _NoMoreTasksError(Error):
325  pass
326 
327 
329  """Error raised if all slaves failed, so tasks cannot be run"""
330  pass
331 
332 
334  """Error raised if a problem occurs with the network"""
335  pass
336 
337 
339  """Error raised if a slave has an unhandled exception"""
340  def __init__(self, exc, traceback, slave):
341  self.exc = exc
342  self.traceback = traceback
343  self.slave = slave
344 
345  def __str__(self):
346  errstr = str(self.exc.__class__).replace("exceptions.", "")
347  return "%s: %s from %s\nRemote traceback:\n%s" \
348  % (errstr, str(self.exc), str(self.slave), self.traceback)
349 
350 class _Communicator(object):
351  """Simple support for sending Python pickled objects over the network"""
352 
353  def __init__(self):
354  self._socket = None
355  self._ibuffer = b''
356 
357  def _send(self, obj):
358  p = xdrlib.Packer()
359  try:
360  p.pack_string(pickle.dumps(obj, -1))
361 # Python < 2.5 can fail trying to send Inf or NaN floats in binary
362 # mode, so fall back to the old protocol in this case:
363  except SystemError:
364  p.pack_string(pickle.dumps(obj, 0))
365  self._socket.sendall(p.get_buffer())
366 
367  def get_data_pending(self):
368  return len(self._ibuffer) > 0
369 
370  def _recv(self):
371  while True:
372  try:
373  obj, self._ibuffer = self._unpickle(self._ibuffer)
374  if isinstance(obj, _ErrorWrapper):
375  raise RemoteError(obj.obj, obj.traceback, self)
376  else:
377  return obj
378  except (IndexError, EOFError):
379  try:
380  data = self._socket.recv(4096)
381  except socket.error as detail:
382  raise NetworkError("Connection lost to %s: %s" \
383  % (str(self), str(detail)))
384  if len(data) > 0:
385  self._ibuffer += data
386  else:
387  raise NetworkError("%s closed connection" % str(self))
388 
389  def _unpickle(self, ibuffer):
390  p = xdrlib.Unpacker(ibuffer)
391  obj = p.unpack_string()
392  return (pickle.loads(obj), ibuffer[p.get_position():])
393 
394 
395 class Slave(_Communicator):
396  """Representation of a single slave.
397  Each slave uses a single thread of execution (i.e. a single CPU core)
398  to run tasks sequentially.
399  Slave is an abstract class; instead of using this class directly, use
400  a subclass such as LocalSlave or SGEQsubSlaveArray."""
401 
402  def __init__(self):
403  _Communicator.__init__(self)
404  self._state = slavestate.init
405  self._context = None
406  self._task = None
407  self.update_contact_time()
408 
409  def _start(self, command, unique_id, output):
410  """Start the slave running on the remote host; override in subclasses"""
411  self._state = slavestate.started
412 
413  def _accept_connection(self, sock):
414  self._socket = sock
415  self._state = slavestate.connected
416  self.update_contact_time()
417 
418  def _set_python_search_path(self, path):
419  self._send(_SetPathAction(path))
420 
421  def update_contact_time(self):
422  self.last_contact_time = time.time()
423 
424  def get_contact_timed_out(self, timeout):
425  return (time.time() - self.last_contact_time) > timeout
426 
427  def _start_task(self, task, context):
428  if not self._ready_for_task(context) and not self._ready_for_task(None):
429  raise TypeError("%s not ready for task" % str(self))
430  if self._context != context:
431  self._context = context
432  self._send(_ContextWrapper(context._startup))
433  self._state = slavestate.running_task
434  self._task = task
435  self._send(_TaskWrapper(task))
436 
437  def _get_finished_task(self):
438  while True:
439  r = self._recv()
440  self.update_contact_time()
441  if isinstance(r, _HeartBeat):
442  if not self.get_data_pending():
443  return None
444  else:
445  break
446  task = self._task
447  task._results = r
448  self._task = None
449  self._state = slavestate.connected
450  return task
451 
452  def _kill(self):
453  task = self._task
454  self._task = None
455  self._context = None
456  self._state = slavestate.dead
457  return task
458 
459  def _ready_to_start(self):
460  return self._state == slavestate.init
461 
462  def _ready_for_task(self, context):
463  return self._state == slavestate.connected \
464  and self._context == context
465 
466  def _running_task(self, context):
467  return self._state == slavestate.running_task \
468  and self._context == context
469 
470 
471 class SlaveArray(object):
472  """Representation of an array of slaves.
473  This is similar to Slave, except that it represents a collection of
474  slaves that are controlled together, such as a batch submission system
475  array job on a compute cluster.
476  Slave is an abstract class; instead of using this class directly, use
477  a subclass such as SGEQsubSlaveArray."""
478 
479  def _get_slaves(self):
480  """Return a list of Slave objects contained within this array"""
481  pass
482 
483  def _start(self):
484  """Do any necessary startup after all contained Slaves have started"""
485  pass
486 
487 
488 class LocalSlave(Slave):
489  """A slave running on the same machine as the master."""
490 
491  def _start(self, command, unique_id, output):
492  Slave._start(self, command, unique_id, output)
493  cmdline = "%s %s" % (command, unique_id)
494  _run_background(cmdline, output)
495 
496  def __repr__(self):
497  return "<LocalSlave>"
498 
499 
500 class _SGEQsubSlave(Slave):
501  def __init__(self, array):
502  Slave.__init__(self)
503  self._jobid = None
504  self._array = array
505 
506  def _start(self, command, unique_id, output):
507  Slave._start(self, command, unique_id, output)
508  self._array._slave_started(unique_id, output, self)
509 
510  def __repr__(self):
511  jobid = self._jobid
512  if jobid is None:
513  jobid = '(unknown)'
514  return "<SGE qsub slave, ID %s>" % jobid
515 
516 
518  """An array of slaves on a Sun Grid Engine system, started with 'qsub'.
519  To use this class, the master process must be running on a machine that
520  can submit Sun Grid Engine (SGE) jobs using the 'qsub' command (this
521  is termed a 'submit host' by SGE). The class starts an SGE job array
522  (every slave has the same SGE job ID, but a different task ID).
523  """
524 
525 
526  standard_options = '-j y -cwd -r n -o sge-errors'
527 
528  def __init__(self, numslave, options):
529  """Constructor.
530  @param numslave The number of slaves, which corresponds to the
531  number of tasks in the SGE job.
532  @param options A string of SGE options that are passed on the 'qsub'
533  command line. This is added to standard_options.
534  """
535  self._numslave = numslave
536  self._options = options
537  self._starting_slaves = []
538  self._jobid = None
539 
540  def _get_slaves(self):
541  """Return a list of Slave objects contained within this array"""
542  return [_SGEQsubSlave(self) for x in range(self._numslave)]
543 
544  def _slave_started(self, command, output, slave):
545  self._starting_slaves.append((command, output, slave))
546 
547  def _start(self, command):
548  qsub = "qsub -S /bin/sh %s %s -t 1-%d" % \
549  (self._options, self.standard_options,
550  len(self._starting_slaves))
551  print(qsub)
552  a = _Popen4(qsub)
553  (inp, out) = (a.stdin, a.stdout)
554  slave_uid = " ".join([repr(s[0]) for s in self._starting_slaves])
555  slave_out = " ".join([repr(s[1]) for s in self._starting_slaves])
556  inp.write("#!/bin/sh\n")
557  inp.write("uid=( '' %s )\n" % slave_uid)
558  inp.write("out=( '' %s )\n" % slave_out)
559  inp.write("myuid=${uid[$SGE_TASK_ID]}\n")
560  inp.write("myout=${out[$SGE_TASK_ID]}\n")
561  inp.write("%s $myuid > $myout 2>&1\n" % command)
562  inp.close()
563  outlines = out.readlines()
564  out.close()
565  for line in outlines:
566  print(line.rstrip('\r\n'))
567  a.require_clean_exit()
568  self._set_jobid(outlines)
569  self._starting_slaves = []
570 
571  def _set_jobid(self, outlines):
572  """Try to figure out the job ID from the SGE qsub output"""
573  if len(outlines) > 0:
574  m = re.compile(r"\d+").search(outlines[0])
575  if m:
576  self._jobid = int(m.group())
577  for (num, slave) in enumerate(self._starting_slaves):
578  slave[2]._jobid = "%d.%d" % (self._jobid, num+1)
579 
580 
581 class _SGEPESlave(Slave):
582  def __init__(self, host):
583  Slave.__init__(self)
584  self._host = host
585 
586  def _start(self, command, unique_id, output):
587  Slave._start(self, command, unique_id, output)
588  cmdline = "qrsh -inherit -V %s %s %s" % (self._host, command, unique_id)
589  _run_background(cmdline, output)
590 
591  def __repr__(self):
592  return "<SGE PE slave on %s>" % self._host
593 
594 
596  """An array of slaves in a Sun Grid Engine system parallel environment.
597  In order to use this class, the master must be run via Sun Grid Engine's
598  'qsub' command and submitted to a parallel environment using the qsub
599  -pe option. This class will start slaves on every node in the parallel
600  environment (including the node running the master). Each slave is
601  started using the 'qrsh' command with the '-inherit' option."""
602 
603  def _get_slaves(self):
604  slaves = []
605 
606  pe = os.environ['PE_HOSTFILE']
607  fh = open(pe, "r")
608  while True:
609  line = fh.readline()
610  if line == '':
611  break
612  (node, num, queue) = line.split(None, 2)
613  for i in range(int(num)):
614  slaves.append(_SGEPESlave(node))
615 # Replace first slave with a local slave, as this is ourself, and SGE
616 # won't let us start this process with qrsh (as we are already
617 # occupying the slot)
618  if len(slaves) > 0:
619  slaves[0] = LocalSlave()
620  return slaves
621 
622 
623 class Context(object):
624  """A collection of tasks that run in the same environment.
625  Context objects are typically created by calling Manager::get_context().
626  """
627  def __init__(self, manager, startup=None):
628  """Constructor."""
629  self._manager = manager
630  self._startup = startup
631  self._tasks = []
632 
633  def add_task(self, task):
634  """Add a task to this context.
635  Tasks are any Python callable object that can be pickled (e.g. a
636  function or a class that implements the \_\_call\_\_ method). When
637  the task is run on the slave its arguments are the return value
638  from this context's startup function."""
639  self._tasks.append(task)
640 
641  def get_results_unordered(self):
642  """Run all of the tasks on available slaves, and return results.
643  If there are more tasks than slaves, subsequent tasks are
644  started only once a running task completes: each slave only runs
645  a single task at a time. As each task completes, the return value(s)
646  from the task callable are returned from this method, as a
647  Python generator. Note that the results are returned in the order
648  that tasks complete, which may not be the same as the order they
649  were submitted in.
651  @exception NoMoreSlavesError there are no slaves available
652  to run the tasks (or they all failed during execution).
653  @exception RemoteError a slave encountered an unhandled exception.
654  @exception NetworkError the master lost (or was unable to
655  establish) communication with any slave.
656  """
657  return self._manager._get_results_unordered(self)
658 
659 
660 class Manager(object):
661  """Manages slaves and contexts.
662  """
663 
664  connect_timeout = 7200
665 
666 # Note: must be higher than that in slave_handler._HeartBeatThread
667  heartbeat_timeout = 7200
668 
669  def __init__(self, python=None, host=None, output='slave%d.output'):
670  """Constructor.
671  @param python If not None, the command to run to start a Python
672  interpreter that can import the IMP module. Otherwise,
673  the same interpreter that the master is currently using
674  is used. This is passed to the shell, so a full command
675  line (including multiple words separated by spaces) can
676  be used if necessary.
677  @param host The hostname that slaves use to connect back to the
678  master. If not specified, the master machine's primary
679  IP address is used. On multi-homed machines, such as
680  compute cluster headnodes, this may need to be changed
681  to allow all slaves to reach the master (typically the
682  name of the machine's internal network address is
683  needed). If only running local slaves, 'localhost' can
684  be used to prohibit connections across the network.
685  @param output A format string used to name slave output files. It is
686  given the numeric slave id, so for example the default
687  value 'slave\%d.output' will yield output files called
688  slave0.output, slave1.output, etc.
689  """
690  if python is None:
691  self._python = sys.executable
692  else:
693  self._python = python
694  self._host = host
695  self._output = output
696  self._all_slaves = []
697  self._starting_slaves = {}
698  self._slave_arrays = []
699  if host:
700  self._host = host
701  else:
702 # Get primary IP address of this machine
703  self._host = socket.gethostbyname_ex(socket.gethostname())[-1][0]
704  self._listen_sock = _ListenSocket(self._host, self.connect_timeout)
705 
706  def add_slave(self, slave):
707  """Add a Slave object."""
708  if hasattr(slave, '_get_slaves'):
709  self._slave_arrays.append(slave)
710  else:
711  self._all_slaves.append(slave)
712 
713  def get_context(self, startup=None):
714  """Create and return a new Context in which tasks can be run.
715  @param startup If not None, a callable (Python function or class
716  that implements the \_\_call\_\_ method) that sets up
717  the slave to run tasks. This method is only called
718  once per slave. The return values from this method
719  are passed to the task object when it runs on
720  the slave.
721  @return A new Context object.
722  """
723  return Context(self, startup)
724 
725  def _get_results_unordered(self, context):
726  """Run all of a context's tasks, and yield results"""
727  self._send_tasks_to_slaves(context)
728  try:
729  while True:
730  for task in self._get_finished_tasks(context):
731  tasks_queued = len(context._tasks)
732  yield task._results
733 # If the user added more tasks while processing these
734 # results, make sure they get sent off to the slaves
735  if len(context._tasks) > tasks_queued:
736  self._send_tasks_to_slaves(context)
737  except _NoMoreTasksError:
738  return
739 
740  def _start_all_slaves(self):
741  for array in self._slave_arrays:
742  self._all_slaves.extend(array._get_slaves())
743 
744  command = ("%s -c \"import IMP.parallel.slave_handler as s; s.main()\" "
745  "%s %d") % (self._python, self._host, self._listen_sock.port)
746 
747  for (num, slave) in enumerate(self._all_slaves):
748  if slave._ready_to_start():
749  unique_id = self._get_unique_id(num)
750  self._starting_slaves[unique_id] = slave
751  slave._start(command, unique_id, self._output % num)
752 
753  for array in self._slave_arrays:
754  array._start(command)
755  self._slave_arrays = []
756 
757  def _get_unique_id(self, num):
758  id = "%d:" % num
759  for i in range(0, 8):
760  id += chr(random.randint(0, 25) + ord('A'))
761  return id
762 
763  def _send_tasks_to_slaves(self, context):
764  self._start_all_slaves()
765 # Prefer slaves that already have the task context
766  available_slaves = [a for a in self._all_slaves
767  if a._ready_for_task(context)] + \
768  [a for a in self._all_slaves
769  if a._ready_for_task(None)]
770  for slave in available_slaves:
771  if len(context._tasks) == 0:
772  break
773  else:
774  self._send_task_to_slave(slave, context)
775 
776  def _send_task_to_slave(self, slave, context):
777  if len(context._tasks) == 0:
778  return
779  t = context._tasks[0]
780  try:
781  slave._start_task(t, context)
782  context._tasks.pop(0)
783  except socket.error as detail:
784  slave._kill()
785 
786  def _get_finished_tasks(self, context):
787  while True:
788  events = self._get_network_events(context)
789  if len(events) == 0:
790  self._kill_all_running_slaves(context)
791  for event in events:
792  task = self._process_event(event, context)
793  if task:
794  yield task
795 
796  def _process_event(self, event, context):
797  if event == self._listen_sock:
798 # New slave just connected
799  (conn, addr) = self._listen_sock.accept()
800  new_slave = self._accept_slave(conn, context)
801  elif event._running_task(context):
802  try:
803  task = event._get_finished_task()
804  if task:
805  self._send_task_to_slave(event, context)
806  return task
807  else: # the slave sent back a heartbeat
808  self._kill_timed_out_slaves(context)
809  except NetworkError as detail:
810  task = event._kill()
811  print("Slave %s failed (%s): rescheduling task %s" \
812  % (str(event), str(detail), str(task)))
813  context._tasks.append(task)
814  self._send_tasks_to_slaves(context)
815  else:
816  pass # Slave not running a task
817 
818  def _kill_timed_out_slaves(self, context):
819  timed_out = [a for a in self._all_slaves if a._running_task(context) \
820  and a.get_contact_timed_out(self.heartbeat_timeout)]
821  for slave in timed_out:
822  task = slave._kill()
823  print("Did not hear from slave %s in %d seconds; rescheduling "
824  "task %s" % (str(slave), self.heartbeat_timeout, str(task)))
825  context._tasks.append(task)
826  if len(timed_out) > 0:
827  self._send_tasks_to_slaves(context)
828 
829  def _kill_all_running_slaves(self, context):
830  running = [a for a in self._all_slaves if a._running_task(context)]
831  for slave in running:
832  task = slave._kill()
833  context._tasks.append(task)
834  raise NetworkError("Did not hear from any running slave in "
835  "%d seconds" % self.heartbeat_timeout)
836 
837  def _accept_slave(self, sock, context):
838  sock.setblocking(True)
839  identifier = sock.recv(1024)
840  if identifier:
841  identifier = identifier.decode('ascii')
842  if identifier and identifier in self._starting_slaves:
843  slave = self._starting_slaves.pop(identifier)
844  slave._accept_connection(sock)
845  print("Identified slave %s " % str(slave))
846  self._init_slave(slave)
847  self._send_task_to_slave(slave, context)
848  return slave
849  else:
850  print("Ignoring request from unknown slave")
851 
852  def _init_slave(self, slave):
853  if _import_time_path[0] != '':
854  slave._set_python_search_path(_import_time_path[0])
855  if sys.path[0] != '' and sys.path[0] != _import_time_path[0]:
856  slave._set_python_search_path(sys.path[0])
857 
858  def _get_network_events(self, context):
859  running = [a for a in self._all_slaves if a._running_task(context)]
860  if len(running) == 0:
861  if len(context._tasks) == 0:
862  raise _NoMoreTasksError()
863  elif len(self._starting_slaves) == 0:
864  raise NoMoreSlavesError("Ran out of slaves to run tasks")
865 # Otherwise, wait for starting slaves to connect back and get tasks
866 
867  return util._poll_events(self._listen_sock, running,
868  self.heartbeat_timeout)
869 
870 
871 
872 def get_module_version():
873  """get_module_version() -> std::string const"""
874  return _IMP_parallel.get_module_version()
875 
876 def get_example_path(fname):
877  """get_example_path(std::string fname) -> std::string"""
878  return _IMP_parallel.get_example_path(fname)
879 
880 def get_data_path(fname):
881  """get_data_path(std::string fname) -> std::string"""
882  return _IMP_parallel.get_data_path(fname)
883 
884 from . import _version_check
885 _version_check.check_version(get_module_version())
886 __version__ = get_module_version()
887 
888 
889 
Representation of an array of slaves.
std::string get_data_path(std::string file_name)
Return the full path to one of this module's data files.
Error raised if a slave has an unhandled exception.
Subprocess handling.
Definition: subproc.py:1
def __init__
Constructor.
An array of slaves in a Sun Grid Engine system parallel environment.
A collection of tasks that run in the same environment.
Representation of a single slave.
std::string get_example_path(std::string file_name)
Return the full path to one of this module's example files.
def add_task
Add a task to this context.
Utilities for the IMP.parallel module.
Definition: parallel/util.py:1
Base class for all errors specific to the parallel module.
The general base class for IMP exceptions.
Definition: exception.h:49
def add_slave
Add a Slave object.
def get_context
Create and return a new Context in which tasks can be run.
Error raised if a problem occurs with the network.
Error raised if all slaves failed, so tasks cannot be run.
def __init__
Constructor.
Distribute IMP tasks to multiple processors or machines.
An array of slaves on a Sun Grid Engine system, started with 'qsub'.
def get_results_unordered
Run all of the tasks on available slaves, and return results.
Manages slaves and contexts.
A slave running on the same machine as the master.