IMP logo
IMP Reference Guide  2.12.0
The Integrative Modeling Platform
parallel/__init__.py
1 # This file was automatically generated by SWIG (http://www.swig.org).
2 # Version 3.0.10
3 #
4 # Do not make changes to this file unless you know what you are doing--modify
5 # the SWIG interface file instead.
6 
7 # This wrapper is part of IMP,
8 # Copyright 2007-2019 IMP Inventors. All rights reserved.
9 
10 from __future__ import print_function, division, absolute_import
11 
12 
13 
14 
15 from sys import version_info as _swig_python_version_info
16 if _swig_python_version_info >= (2, 7, 0):
17  def swig_import_helper():
18  import importlib
19  pkg = __name__.rpartition('.')[0]
20  mname = '.'.join((pkg, '_IMP_parallel')).lstrip('.')
21  try:
22  return importlib.import_module(mname)
23  except ImportError:
24  return importlib.import_module('_IMP_parallel')
25  _IMP_parallel = swig_import_helper()
26  del swig_import_helper
27 elif _swig_python_version_info >= (2, 6, 0):
28  def swig_import_helper():
29  from os.path import dirname
30  import imp
31  fp = None
32  try:
33  fp, pathname, description = imp.find_module('_IMP_parallel', [dirname(__file__)])
34  except ImportError:
35  import _IMP_parallel
36  return _IMP_parallel
37  if fp is not None:
38  try:
39  _mod = imp.load_module('_IMP_parallel', fp, pathname, description)
40  finally:
41  fp.close()
42  return _mod
43  _IMP_parallel = swig_import_helper()
44  del swig_import_helper
45 else:
46  import _IMP_parallel
47 del _swig_python_version_info
48 try:
49  _swig_property = property
50 except NameError:
51  pass # Python < 2.2 doesn't have 'property'.
52 
53 try:
54  import builtins as __builtin__
55 except ImportError:
56  import __builtin__
57 
58 def _swig_setattr_nondynamic(self, class_type, name, value, static=1):
59  if (name == "thisown"):
60  return self.this.own(value)
61  if (name == "this"):
62  if type(value).__name__ == 'SwigPyObject':
63  self.__dict__[name] = value
64  return
65  method = class_type.__swig_setmethods__.get(name, None)
66  if method:
67  return method(self, value)
68  if (not static):
69  object.__setattr__(self, name, value)
70  else:
71  raise AttributeError("You cannot add attributes to %s" % self)
72 
73 
74 def _swig_setattr(self, class_type, name, value):
75  return _swig_setattr_nondynamic(self, class_type, name, value, 0)
76 
77 
78 def _swig_getattr(self, class_type, name):
79  if (name == "thisown"):
80  return self.this.own()
81  method = class_type.__swig_getmethods__.get(name, None)
82  if method:
83  return method(self)
84  raise AttributeError("'%s' object has no attribute '%s'" % (class_type.__name__, name))
85 
86 
87 def _swig_repr(self):
88  try:
89  strthis = "proxy of " + self.this.__repr__()
90  except __builtin__.Exception:
91  strthis = ""
92  return "<%s.%s; %s >" % (self.__class__.__module__, self.__class__.__name__, strthis,)
93 
94 
95 def _swig_setattr_nondynamic_method(set):
96  def set_attr(self, name, value):
97  if (name == "thisown"):
98  return self.this.own(value)
99  if hasattr(self, name) or (name == "this"):
100  set(self, name, value)
101  else:
102  raise AttributeError("You cannot add attributes to %s" % self)
103  return set_attr
104 
105 
106 try:
107  import weakref
108  weakref_proxy = weakref.proxy
109 except __builtin__.Exception:
110  weakref_proxy = lambda x: x
111 
112 
113 class IMP_PARALLEL_SwigPyIterator(object):
114  """Proxy of C++ swig::IMP_PARALLEL_SwigPyIterator class."""
115 
116  thisown = _swig_property(lambda x: x.this.own(), lambda x, v: x.this.own(v), doc='The membership flag')
117 
118  def __init__(self, *args, **kwargs):
119  raise AttributeError("No constructor defined - class is abstract")
120  __repr__ = _swig_repr
121  __swig_destroy__ = _IMP_parallel.delete_IMP_PARALLEL_SwigPyIterator
122  __del__ = lambda self: None
123 
124  def value(self):
125  """value(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
126  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_value(self)
127 
128 
129  def incr(self, n=1):
130  """
131  incr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
132  incr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
133  """
134  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_incr(self, n)
135 
136 
137  def decr(self, n=1):
138  """
139  decr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
140  decr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
141  """
142  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_decr(self, n)
143 
144 
145  def distance(self, x):
146  """distance(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t"""
147  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_distance(self, x)
148 
149 
150  def equal(self, x):
151  """equal(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
152  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_equal(self, x)
153 
154 
155  def copy(self):
156  """copy(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator"""
157  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_copy(self)
158 
159 
160  def next(self):
161  """next(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
162  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_next(self)
163 
164 
165  def __next__(self):
166  """__next__(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
167  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___next__(self)
168 
169 
170  def previous(self):
171  """previous(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
172  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_previous(self)
173 
174 
175  def advance(self, n):
176  """advance(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
177  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_advance(self, n)
178 
179 
180  def __eq__(self, x):
181  """__eq__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
182  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___eq__(self, x)
183 
184 
185  def __ne__(self, x):
186  """__ne__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
187  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___ne__(self, x)
188 
189 
190  def __iadd__(self, n):
191  """__iadd__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
192  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___iadd__(self, n)
193 
194 
195  def __isub__(self, n):
196  """__isub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
197  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___isub__(self, n)
198 
199 
200  def __add__(self, n):
201  """__add__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
202  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___add__(self, n)
203 
204 
205  def __sub__(self, *args):
206  """
207  __sub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator
208  __sub__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t
209  """
210  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___sub__(self, *args)
211 
212  def __iter__(self):
213  return self
214 IMP_PARALLEL_SwigPyIterator_swigregister = _IMP_parallel.IMP_PARALLEL_SwigPyIterator_swigregister
215 IMP_PARALLEL_SwigPyIterator_swigregister(IMP_PARALLEL_SwigPyIterator)
216 
217 
218 _value_types=[]
219 _object_types=[]
220 _raii_types=[]
221 _plural_types=[]
222 
223 IMP_DEBUG = _IMP_parallel.IMP_DEBUG
224 IMP_RELEASE = _IMP_parallel.IMP_RELEASE
225 IMP_SILENT = _IMP_parallel.IMP_SILENT
226 IMP_PROGRESS = _IMP_parallel.IMP_PROGRESS
227 IMP_TERSE = _IMP_parallel.IMP_TERSE
228 IMP_VERBOSE = _IMP_parallel.IMP_VERBOSE
229 IMP_MEMORY = _IMP_parallel.IMP_MEMORY
230 IMP_NONE = _IMP_parallel.IMP_NONE
231 IMP_USAGE = _IMP_parallel.IMP_USAGE
232 IMP_INTERNAL = _IMP_parallel.IMP_INTERNAL
233 IMP_KERNEL_HAS_LOG4CXX = _IMP_parallel.IMP_KERNEL_HAS_LOG4CXX
234 IMP_COMPILER_HAS_AUTO = _IMP_parallel.IMP_COMPILER_HAS_AUTO
235 IMP_COMPILER_HAS_DEBUG_VECTOR = _IMP_parallel.IMP_COMPILER_HAS_DEBUG_VECTOR
236 IMP_COMPILER_HAS_UNIQUE_PTR = _IMP_parallel.IMP_COMPILER_HAS_UNIQUE_PTR
237 IMP_KERNEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_KERNEL_HAS_BOOST_RANDOM
238 IMP_KERNEL_HAS_NUMPY = _IMP_parallel.IMP_KERNEL_HAS_NUMPY
239 IMP_KERNEL_HAS_GPERFTOOLS = _IMP_parallel.IMP_KERNEL_HAS_GPERFTOOLS
240 IMP_KERNEL_HAS_TCMALLOC_HEAPCHECKER = _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPCHECKER
241 IMP_KERNEL_HAS_TCMALLOC_HEAPPROFILER = _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPPROFILER
242 IMPKERNEL_SHOW_WARNINGS = _IMP_parallel.IMPKERNEL_SHOW_WARNINGS
243 
244 import sys
245 class _DirectorObjects(object):
246  """@internal Simple class to keep references to director objects
247  to prevent premature deletion."""
248  def __init__(self):
249  self._objects = []
250  def register(self, obj):
251  """Take a reference to a director object; will only work for
252  refcounted C++ classes"""
253  if hasattr(obj, 'get_ref_count'):
254  self._objects.append(obj)
255  def cleanup(self):
256  """Only drop our reference and allow cleanup by Python if no other
257  Python references exist (we hold 3 references: one in self._objects,
258  one in x, and one in the argument list for getrefcount) *and* no
259  other C++ references exist (the Python object always holds one)"""
260  objs = [x for x in self._objects if sys.getrefcount(x) > 3 \
261  or x.get_ref_count() > 1]
262 # Do in two steps so the references are kept until the end of the
263 # function (deleting references may trigger a fresh call to this method)
264  self._objects = objs
265  def get_object_count(self):
266  """Get number of director objects (useful for testing only)"""
267  return len(self._objects)
268 _director_objects = _DirectorObjects()
269 
270 class _ostream(object):
271  """Proxy of C++ std::ostream class."""
272 
273  thisown = _swig_property(lambda x: x.this.own(), lambda x, v: x.this.own(v), doc='The membership flag')
274 
275  def __init__(self, *args, **kwargs):
276  raise AttributeError("No constructor defined")
277  __repr__ = _swig_repr
278 
279  def write(self, osa_buf):
280  """write(_ostream self, char const * osa_buf)"""
281  return _IMP_parallel._ostream_write(self, osa_buf)
282 
283 _ostream_swigregister = _IMP_parallel._ostream_swigregister
284 _ostream_swigregister(_ostream)
285 
286 IMP_COMPILER_HAS_OVERRIDE = _IMP_parallel.IMP_COMPILER_HAS_OVERRIDE
287 IMP_COMPILER_HAS_FINAL = _IMP_parallel.IMP_COMPILER_HAS_FINAL
288 IMP_HAS_NOEXCEPT = _IMP_parallel.IMP_HAS_NOEXCEPT
289 IMP_C_OPEN_BINARY = _IMP_parallel.IMP_C_OPEN_BINARY
290 import IMP
291 IMP_PARALLEL_HAS_BOOST_FILESYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_FILESYSTEM
292 IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS
293 IMP_PARALLEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_RANDOM
294 IMP_PARALLEL_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_SYSTEM
295 IMP_PARALLEL_HAS_NUMPY = _IMP_parallel.IMP_PARALLEL_HAS_NUMPY
296 IMPPARALLEL_SHOW_WARNINGS = _IMP_parallel.IMPPARALLEL_SHOW_WARNINGS
297 
298 
299 import socket
300 import time
301 import sys
302 import os
303 import re
304 import random
305 import socket
306 import xdrlib
307 try:
308  import cPickle as pickle
309 except ImportError:
310  import pickle
311 from IMP.parallel import slavestate
312 from IMP.parallel.subproc import _run_background, _Popen4
313 from IMP.parallel.util import _ListenSocket, _ErrorWrapper
314 from IMP.parallel.util import _TaskWrapper, _HeartBeat, _ContextWrapper
315 from IMP.parallel.util import _SetPathAction
316 
317 # Save sys.path at import time, so that slaves can import using the same
318 # path that works for the master imports
319 _import_time_path = sys.path[:]
320 
322  """Base class for all errors specific to the parallel module"""
323  pass
324 
325 
326 class _NoMoreTasksError(Error):
327  pass
328 
329 
331  """Error raised if all slaves failed, so tasks cannot be run"""
332  pass
333 
334 
336  """Error raised if a problem occurs with the network"""
337  pass
338 
339 
341  """Error raised if a slave has an unhandled exception"""
342  def __init__(self, exc, traceback, slave):
343  self.exc = exc
344  self.traceback = traceback
345  self.slave = slave
346 
347  def __str__(self):
348  errstr = str(self.exc.__class__).replace("exceptions.", "")
349  return "%s: %s from %s\nRemote traceback:\n%s" \
350  % (errstr, str(self.exc), str(self.slave), self.traceback)
351 
352 class _Communicator(object):
353  """Simple support for sending Python pickled objects over the network"""
354 
355  def __init__(self):
356  self._socket = None
357  self._ibuffer = b''
358 
359  def _send(self, obj):
360  p = xdrlib.Packer()
361  try:
362  p.pack_string(pickle.dumps(obj, -1))
363 # Python < 2.5 can fail trying to send Inf or NaN floats in binary
364 # mode, so fall back to the old protocol in this case:
365  except SystemError:
366  p.pack_string(pickle.dumps(obj, 0))
367  self._socket.sendall(p.get_buffer())
368 
369  def get_data_pending(self):
370  return len(self._ibuffer) > 0
371 
372  def _recv(self):
373  while True:
374  try:
375  obj, self._ibuffer = self._unpickle(self._ibuffer)
376  if isinstance(obj, _ErrorWrapper):
377  raise RemoteError(obj.obj, obj.traceback, self)
378  else:
379  return obj
380  except (IndexError, EOFError):
381  try:
382  data = self._socket.recv(4096)
383  except socket.error as detail:
384  raise NetworkError("Connection lost to %s: %s" \
385  % (str(self), str(detail)))
386  if len(data) > 0:
387  self._ibuffer += data
388  else:
389  raise NetworkError("%s closed connection" % str(self))
390 
391  def _unpickle(self, ibuffer):
392  p = xdrlib.Unpacker(ibuffer)
393  obj = p.unpack_string()
394  return (pickle.loads(obj), ibuffer[p.get_position():])
395 
396 
397 class Slave(_Communicator):
398  """Representation of a single slave.
399  Each slave uses a single thread of execution (i.e. a single CPU core)
400  to run tasks sequentially.
401  Slave is an abstract class; instead of using this class directly, use
402  a subclass such as LocalSlave or SGEQsubSlaveArray."""
403 
404  def __init__(self):
405  _Communicator.__init__(self)
406  self._state = slavestate.init
407  self._context = None
408  self._task = None
409  self.update_contact_time()
410 
411  def _start(self, command, unique_id, output):
412  """Start the slave running on the remote host; override in subclasses"""
413  self._state = slavestate.started
414 
415  def _accept_connection(self, sock):
416  self._socket = sock
417  self._state = slavestate.connected
418  self.update_contact_time()
419 
420  def _set_python_search_path(self, path):
421  self._send(_SetPathAction(path))
422 
423  def update_contact_time(self):
424  self.last_contact_time = time.time()
425 
426  def get_contact_timed_out(self, timeout):
427  return (time.time() - self.last_contact_time) > timeout
428 
429  def _start_task(self, task, context):
430  if not self._ready_for_task(context) and not self._ready_for_task(None):
431  raise TypeError("%s not ready for task" % str(self))
432  if self._context != context:
433  self._context = context
434  self._send(_ContextWrapper(context._startup))
435  self._state = slavestate.running_task
436  self._task = task
437  self._send(_TaskWrapper(task))
438 
439  def _get_finished_task(self):
440  while True:
441  r = self._recv()
442  self.update_contact_time()
443  if isinstance(r, _HeartBeat):
444  if not self.get_data_pending():
445  return None
446  else:
447  break
448  task = self._task
449  task._results = r
450  self._task = None
451  self._state = slavestate.connected
452  return task
453 
454  def _kill(self):
455  task = self._task
456  self._task = None
457  self._context = None
458  self._state = slavestate.dead
459  return task
460 
461  def _ready_to_start(self):
462  return self._state == slavestate.init
463 
464  def _ready_for_task(self, context):
465  return self._state == slavestate.connected \
466  and self._context == context
467 
468  def _running_task(self, context):
469  return self._state == slavestate.running_task \
470  and self._context == context
471 
472 
473 class SlaveArray(object):
474  """Representation of an array of slaves.
475  This is similar to Slave, except that it represents a collection of
476  slaves that are controlled together, such as a batch submission system
477  array job on a compute cluster.
478  Slave is an abstract class; instead of using this class directly, use
479  a subclass such as SGEQsubSlaveArray."""
480 
481  def _get_slaves(self):
482  """Return a list of Slave objects contained within this array"""
483  pass
484 
485  def _start(self):
486  """Do any necessary startup after all contained Slaves have started"""
487  pass
488 
489 
490 class LocalSlave(Slave):
491  """A slave running on the same machine as the master."""
492 
493  def _start(self, command, unique_id, output):
494  Slave._start(self, command, unique_id, output)
495  cmdline = "%s %s" % (command, unique_id)
496  _run_background(cmdline, output)
497 
498  def __repr__(self):
499  return "<LocalSlave>"
500 
501 
502 class _SGEQsubSlave(Slave):
503  def __init__(self, array):
504  Slave.__init__(self)
505  self._jobid = None
506  self._array = array
507 
508  def _start(self, command, unique_id, output):
509  Slave._start(self, command, unique_id, output)
510  self._array._slave_started(unique_id, output, self)
511 
512  def __repr__(self):
513  jobid = self._jobid
514  if jobid is None:
515  jobid = '(unknown)'
516  return "<SGE qsub slave, ID %s>" % jobid
517 
518 
520  """An array of slaves on a Sun Grid Engine system, started with 'qsub'.
521  To use this class, the master process must be running on a machine that
522  can submit Sun Grid Engine (SGE) jobs using the 'qsub' command (this
523  is termed a 'submit host' by SGE). The class starts an SGE job array
524  (every slave has the same SGE job ID, but a different task ID).
525  """
526 
527 
528  standard_options = '-j y -cwd -r n -o sge-errors'
529 
530  def __init__(self, numslave, options):
531  """Constructor.
532  @param numslave The number of slaves, which corresponds to the
533  number of tasks in the SGE job.
534  @param options A string of SGE options that are passed on the 'qsub'
535  command line. This is added to standard_options.
536  """
537  self._numslave = numslave
538  self._options = options
539  self._starting_slaves = []
540  self._jobid = None
541 
542  def _get_slaves(self):
543  """Return a list of Slave objects contained within this array"""
544  return [_SGEQsubSlave(self) for x in range(self._numslave)]
545 
546  def _slave_started(self, command, output, slave):
547  self._starting_slaves.append((command, output, slave))
548 
549  def _start(self, command):
550  qsub = "qsub -S /bin/sh %s %s -t 1-%d" % \
551  (self._options, self.standard_options,
552  len(self._starting_slaves))
553  print(qsub)
554  a = _Popen4(qsub)
555  (inp, out) = (a.stdin, a.stdout)
556  slave_uid = " ".join([repr(s[0]) for s in self._starting_slaves])
557  slave_out = " ".join([repr(s[1]) for s in self._starting_slaves])
558  inp.write("#!/bin/sh\n")
559  inp.write("uid=( '' %s )\n" % slave_uid)
560  inp.write("out=( '' %s )\n" % slave_out)
561  inp.write("myuid=${uid[$SGE_TASK_ID]}\n")
562  inp.write("myout=${out[$SGE_TASK_ID]}\n")
563  inp.write("%s $myuid > $myout 2>&1\n" % command)
564  inp.close()
565  outlines = out.readlines()
566  out.close()
567  for line in outlines:
568  print(line.rstrip('\r\n'))
569  a.require_clean_exit()
570  self._set_jobid(outlines)
571  self._starting_slaves = []
572 
573  def _set_jobid(self, outlines):
574  """Try to figure out the job ID from the SGE qsub output"""
575  if len(outlines) > 0:
576  m = re.compile(r"\d+").search(outlines[0])
577  if m:
578  self._jobid = int(m.group())
579  for (num, slave) in enumerate(self._starting_slaves):
580  slave[2]._jobid = "%d.%d" % (self._jobid, num+1)
581 
582 
583 class _SGEPESlave(Slave):
584  def __init__(self, host):
585  Slave.__init__(self)
586  self._host = host
587 
588  def _start(self, command, unique_id, output):
589  Slave._start(self, command, unique_id, output)
590  cmdline = "qrsh -inherit -V %s %s %s" % (self._host, command, unique_id)
591  _run_background(cmdline, output)
592 
593  def __repr__(self):
594  return "<SGE PE slave on %s>" % self._host
595 
596 
598  """An array of slaves in a Sun Grid Engine system parallel environment.
599  In order to use this class, the master must be run via Sun Grid Engine's
600  'qsub' command and submitted to a parallel environment using the qsub
601  -pe option. This class will start slaves on every node in the parallel
602  environment (including the node running the master). Each slave is
603  started using the 'qrsh' command with the '-inherit' option."""
604 
605  def _get_slaves(self):
606  slaves = []
607 
608  pe = os.environ['PE_HOSTFILE']
609  fh = open(pe, "r")
610  while True:
611  line = fh.readline()
612  if line == '':
613  break
614  (node, num, queue) = line.split(None, 2)
615  for i in range(int(num)):
616  slaves.append(_SGEPESlave(node))
617 # Replace first slave with a local slave, as this is ourself, and SGE
618 # won't let us start this process with qrsh (as we are already
619 # occupying the slot)
620  if len(slaves) > 0:
621  slaves[0] = LocalSlave()
622  return slaves
623 
624 
625 class Context(object):
626  """A collection of tasks that run in the same environment.
627  Context objects are typically created by calling Manager::get_context().
628  """
629  def __init__(self, manager, startup=None):
630  """Constructor."""
631  self._manager = manager
632  self._startup = startup
633  self._tasks = []
634 
635  def add_task(self, task):
636  """Add a task to this context.
637  Tasks are any Python callable object that can be pickled (e.g. a
638  function or a class that implements the \_\_call\_\_ method). When
639  the task is run on the slave its arguments are the return value
640  from this context's startup function."""
641  self._tasks.append(task)
642 
643  def get_results_unordered(self):
644  """Run all of the tasks on available slaves, and return results.
645  If there are more tasks than slaves, subsequent tasks are
646  started only once a running task completes: each slave only runs
647  a single task at a time. As each task completes, the return value(s)
648  from the task callable are returned from this method, as a
649  Python generator. Note that the results are returned in the order
650  that tasks complete, which may not be the same as the order they
651  were submitted in.
653  @exception NoMoreSlavesError there are no slaves available
654  to run the tasks (or they all failed during execution).
655  @exception RemoteError a slave encountered an unhandled exception.
656  @exception NetworkError the master lost (or was unable to
657  establish) communication with any slave.
658  """
659  return self._manager._get_results_unordered(self)
660 
661 
662 class Manager(object):
663  """Manages slaves and contexts.
664  """
665 
666  connect_timeout = 7200
667 
668 # Note: must be higher than that in slave_handler._HeartBeatThread
669  heartbeat_timeout = 7200
670 
671  def __init__(self, python=None, host=None, output='slave%d.output'):
672  """Constructor.
673  @param python If not None, the command to run to start a Python
674  interpreter that can import the IMP module. Otherwise,
675  the same interpreter that the master is currently using
676  is used. This is passed to the shell, so a full command
677  line (including multiple words separated by spaces) can
678  be used if necessary.
679  @param host The hostname that slaves use to connect back to the
680  master. If not specified, the master machine's primary
681  IP address is used. On multi-homed machines, such as
682  compute cluster headnodes, this may need to be changed
683  to allow all slaves to reach the master (typically the
684  name of the machine's internal network address is
685  needed). If only running local slaves, 'localhost' can
686  be used to prohibit connections across the network.
687  @param output A format string used to name slave output files. It is
688  given the numeric slave id, so for example the default
689  value 'slave\%d.output' will yield output files called
690  slave0.output, slave1.output, etc.
691  """
692  if python is None:
693  self._python = sys.executable
694  else:
695  self._python = python
696  self._host = host
697  self._output = output
698  self._all_slaves = []
699  self._starting_slaves = {}
700  self._slave_arrays = []
701  if host:
702  self._host = host
703  else:
704 # Get primary IP address of this machine
705  self._host = socket.gethostbyname_ex(socket.gethostname())[-1][0]
706  self._listen_sock = _ListenSocket(self._host, self.connect_timeout)
707 
708  def add_slave(self, slave):
709  """Add a Slave object."""
710  if hasattr(slave, '_get_slaves'):
711  self._slave_arrays.append(slave)
712  else:
713  self._all_slaves.append(slave)
714 
715  def get_context(self, startup=None):
716  """Create and return a new Context in which tasks can be run.
717  @param startup If not None, a callable (Python function or class
718  that implements the \_\_call\_\_ method) that sets up
719  the slave to run tasks. This method is only called
720  once per slave. The return values from this method
721  are passed to the task object when it runs on
722  the slave.
723  @return A new Context object.
724  """
725  return Context(self, startup)
726 
727  def _get_results_unordered(self, context):
728  """Run all of a context's tasks, and yield results"""
729  self._send_tasks_to_slaves(context)
730  try:
731  while True:
732  for task in self._get_finished_tasks(context):
733  tasks_queued = len(context._tasks)
734  yield task._results
735 # If the user added more tasks while processing these
736 # results, make sure they get sent off to the slaves
737  if len(context._tasks) > tasks_queued:
738  self._send_tasks_to_slaves(context)
739  except _NoMoreTasksError:
740  return
741 
742  def _start_all_slaves(self):
743  for array in self._slave_arrays:
744  self._all_slaves.extend(array._get_slaves())
745 
746  command = ("%s -c \"import IMP.parallel.slave_handler as s; s.main()\" "
747  "%s %d") % (self._python, self._host, self._listen_sock.port)
748 
749  for (num, slave) in enumerate(self._all_slaves):
750  if slave._ready_to_start():
751  unique_id = self._get_unique_id(num)
752  self._starting_slaves[unique_id] = slave
753  slave._start(command, unique_id, self._output % num)
754 
755  for array in self._slave_arrays:
756  array._start(command)
757  self._slave_arrays = []
758 
759  def _get_unique_id(self, num):
760  id = "%d:" % num
761  for i in range(0, 8):
762  id += chr(random.randint(0, 25) + ord('A'))
763  return id
764 
765  def _send_tasks_to_slaves(self, context):
766  self._start_all_slaves()
767 # Prefer slaves that already have the task context
768  available_slaves = [a for a in self._all_slaves
769  if a._ready_for_task(context)] + \
770  [a for a in self._all_slaves
771  if a._ready_for_task(None)]
772  for slave in available_slaves:
773  if len(context._tasks) == 0:
774  break
775  else:
776  self._send_task_to_slave(slave, context)
777 
778  def _send_task_to_slave(self, slave, context):
779  if len(context._tasks) == 0:
780  return
781  t = context._tasks[0]
782  try:
783  slave._start_task(t, context)
784  context._tasks.pop(0)
785  except socket.error as detail:
786  slave._kill()
787 
788  def _get_finished_tasks(self, context):
789  while True:
790  events = self._get_network_events(context)
791  if len(events) == 0:
792  self._kill_all_running_slaves(context)
793  for event in events:
794  task = self._process_event(event, context)
795  if task:
796  yield task
797 
798  def _process_event(self, event, context):
799  if event == self._listen_sock:
800 # New slave just connected
801  (conn, addr) = self._listen_sock.accept()
802  new_slave = self._accept_slave(conn, context)
803  elif event._running_task(context):
804  try:
805  task = event._get_finished_task()
806  if task:
807  self._send_task_to_slave(event, context)
808  return task
809  else: # the slave sent back a heartbeat
810  self._kill_timed_out_slaves(context)
811  except NetworkError as detail:
812  task = event._kill()
813  print("Slave %s failed (%s): rescheduling task %s" \
814  % (str(event), str(detail), str(task)))
815  context._tasks.append(task)
816  self._send_tasks_to_slaves(context)
817  else:
818  pass # Slave not running a task
819 
820  def _kill_timed_out_slaves(self, context):
821  timed_out = [a for a in self._all_slaves if a._running_task(context) \
822  and a.get_contact_timed_out(self.heartbeat_timeout)]
823  for slave in timed_out:
824  task = slave._kill()
825  print("Did not hear from slave %s in %d seconds; rescheduling "
826  "task %s" % (str(slave), self.heartbeat_timeout, str(task)))
827  context._tasks.append(task)
828  if len(timed_out) > 0:
829  self._send_tasks_to_slaves(context)
830 
831  def _kill_all_running_slaves(self, context):
832  running = [a for a in self._all_slaves if a._running_task(context)]
833  for slave in running:
834  task = slave._kill()
835  context._tasks.append(task)
836  raise NetworkError("Did not hear from any running slave in "
837  "%d seconds" % self.heartbeat_timeout)
838 
839  def _accept_slave(self, sock, context):
840  sock.setblocking(True)
841  identifier = sock.recv(1024)
842  if identifier:
843  identifier = identifier.decode('ascii')
844  if identifier and identifier in self._starting_slaves:
845  slave = self._starting_slaves.pop(identifier)
846  slave._accept_connection(sock)
847  print("Identified slave %s " % str(slave))
848  self._init_slave(slave)
849  self._send_task_to_slave(slave, context)
850  return slave
851  else:
852  print("Ignoring request from unknown slave")
853 
854  def _init_slave(self, slave):
855  if _import_time_path[0] != '':
856  slave._set_python_search_path(_import_time_path[0])
857  if sys.path[0] != '' and sys.path[0] != _import_time_path[0]:
858  slave._set_python_search_path(sys.path[0])
859 
860  def _get_network_events(self, context):
861  running = [a for a in self._all_slaves if a._running_task(context)]
862  if len(running) == 0:
863  if len(context._tasks) == 0:
864  raise _NoMoreTasksError()
865  elif len(self._starting_slaves) == 0:
866  raise NoMoreSlavesError("Ran out of slaves to run tasks")
867 # Otherwise, wait for starting slaves to connect back and get tasks
868 
869  return util._poll_events(self._listen_sock, running,
870  self.heartbeat_timeout)
871 
872 
873 
874 def get_module_version():
875  """get_module_version() -> std::string const"""
876  return _IMP_parallel.get_module_version()
877 
878 def get_example_path(fname):
879  """get_example_path(std::string fname) -> std::string"""
880  return _IMP_parallel.get_example_path(fname)
881 
882 def get_data_path(fname):
883  """get_data_path(std::string fname) -> std::string"""
884  return _IMP_parallel.get_data_path(fname)
885 
886 from . import _version_check
887 _version_check.check_version(get_module_version())
888 __version__ = get_module_version()
889 
890 
891 
Representation of an array of slaves.
std::string get_data_path(std::string file_name)
Return the full path to one of this module's data files.
Error raised if a slave has an unhandled exception.
Subprocess handling.
Definition: subproc.py:1
def __init__
Constructor.
An array of slaves in a Sun Grid Engine system parallel environment.
A collection of tasks that run in the same environment.
Representation of a single slave.
std::string get_example_path(std::string file_name)
Return the full path to one of this module's example files.
def add_task
Add a task to this context.
Utilities for the IMP.parallel module.
Definition: parallel/util.py:1
Base class for all errors specific to the parallel module.
The general base class for IMP exceptions.
Definition: exception.h:49
def add_slave
Add a Slave object.
def get_context
Create and return a new Context in which tasks can be run.
Error raised if a problem occurs with the network.
Error raised if all slaves failed, so tasks cannot be run.
def __init__
Constructor.
Distribute IMP tasks to multiple processors or machines.
An array of slaves on a Sun Grid Engine system, started with 'qsub'.
def get_results_unordered
Run all of the tasks on available slaves, and return results.
Manages slaves and contexts.
A slave running on the same machine as the master.