IMP logo
IMP Reference Guide  2.7.0
The Integrative Modeling Platform
parallel/__init__.py
1 # This file was automatically generated by SWIG (http://www.swig.org).
2 # Version 3.0.10
3 #
4 # Do not make changes to this file unless you know what you are doing--modify
5 # the SWIG interface file instead.
6 
7 # This wrapper is part of IMP,
8 # Copyright 2007-2017 IMP Inventors. All rights reserved.
9 
10 from __future__ import print_function, division, absolute_import
11 
12 
13 
14 
15 from sys import version_info as _swig_python_version_info
16 if _swig_python_version_info >= (2, 7, 0):
17  def swig_import_helper():
18  import importlib
19  pkg = __name__.rpartition('.')[0]
20  mname = '.'.join((pkg, '_IMP_parallel')).lstrip('.')
21  try:
22  return importlib.import_module(mname)
23  except ImportError:
24  return importlib.import_module('_IMP_parallel')
25  _IMP_parallel = swig_import_helper()
26  del swig_import_helper
27 elif _swig_python_version_info >= (2, 6, 0):
28  def swig_import_helper():
29  from os.path import dirname
30  import imp
31  fp = None
32  try:
33  fp, pathname, description = imp.find_module('_IMP_parallel', [dirname(__file__)])
34  except ImportError:
35  import _IMP_parallel
36  return _IMP_parallel
37  if fp is not None:
38  try:
39  _mod = imp.load_module('_IMP_parallel', fp, pathname, description)
40  finally:
41  fp.close()
42  return _mod
43  _IMP_parallel = swig_import_helper()
44  del swig_import_helper
45 else:
46  import _IMP_parallel
47 del _swig_python_version_info
48 try:
49  _swig_property = property
50 except NameError:
51  pass # Python < 2.2 doesn't have 'property'.
52 
53 try:
54  import builtins as __builtin__
55 except ImportError:
56  import __builtin__
57 
58 def _swig_setattr_nondynamic(self, class_type, name, value, static=1):
59  if (name == "thisown"):
60  return self.this.own(value)
61  if (name == "this"):
62  if type(value).__name__ == 'SwigPyObject':
63  self.__dict__[name] = value
64  return
65  method = class_type.__swig_setmethods__.get(name, None)
66  if method:
67  return method(self, value)
68  if (not static):
69  object.__setattr__(self, name, value)
70  else:
71  raise AttributeError("You cannot add attributes to %s" % self)
72 
73 
74 def _swig_setattr(self, class_type, name, value):
75  return _swig_setattr_nondynamic(self, class_type, name, value, 0)
76 
77 
78 def _swig_getattr(self, class_type, name):
79  if (name == "thisown"):
80  return self.this.own()
81  method = class_type.__swig_getmethods__.get(name, None)
82  if method:
83  return method(self)
84  raise AttributeError("'%s' object has no attribute '%s'" % (class_type.__name__, name))
85 
86 
87 def _swig_repr(self):
88  try:
89  strthis = "proxy of " + self.this.__repr__()
90  except __builtin__.Exception:
91  strthis = ""
92  return "<%s.%s; %s >" % (self.__class__.__module__, self.__class__.__name__, strthis,)
93 
94 
95 def _swig_setattr_nondynamic_method(set):
96  def set_attr(self, name, value):
97  if (name == "thisown"):
98  return self.this.own(value)
99  if hasattr(self, name) or (name == "this"):
100  set(self, name, value)
101  else:
102  raise AttributeError("You cannot add attributes to %s" % self)
103  return set_attr
104 
105 
106 try:
107  import weakref
108  weakref_proxy = weakref.proxy
109 except __builtin__.Exception:
110  weakref_proxy = lambda x: x
111 
112 
113 class IMP_PARALLEL_SwigPyIterator(object):
114  """Proxy of C++ swig::IMP_PARALLEL_SwigPyIterator class."""
115 
116  thisown = _swig_property(lambda x: x.this.own(), lambda x, v: x.this.own(v), doc='The membership flag')
117 
118  def __init__(self, *args, **kwargs):
119  raise AttributeError("No constructor defined - class is abstract")
120  __repr__ = _swig_repr
121  __swig_destroy__ = _IMP_parallel.delete_IMP_PARALLEL_SwigPyIterator
122  __del__ = lambda self: None
123 
124  def value(self):
125  """value(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
126  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_value(self)
127 
128 
129  def incr(self, n=1):
130  """
131  incr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
132  incr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
133  """
134  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_incr(self, n)
135 
136 
137  def decr(self, n=1):
138  """
139  decr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
140  decr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
141  """
142  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_decr(self, n)
143 
144 
145  def distance(self, x):
146  """distance(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t"""
147  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_distance(self, x)
148 
149 
150  def equal(self, x):
151  """equal(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
152  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_equal(self, x)
153 
154 
155  def copy(self):
156  """copy(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator"""
157  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_copy(self)
158 
159 
160  def next(self):
161  """next(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
162  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_next(self)
163 
164 
165  def __next__(self):
166  """__next__(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
167  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___next__(self)
168 
169 
170  def previous(self):
171  """previous(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
172  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_previous(self)
173 
174 
175  def advance(self, n):
176  """advance(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
177  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_advance(self, n)
178 
179 
180  def __eq__(self, x):
181  """__eq__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
182  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___eq__(self, x)
183 
184 
185  def __ne__(self, x):
186  """__ne__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
187  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___ne__(self, x)
188 
189 
190  def __iadd__(self, n):
191  """__iadd__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
192  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___iadd__(self, n)
193 
194 
195  def __isub__(self, n):
196  """__isub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
197  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___isub__(self, n)
198 
199 
200  def __add__(self, n):
201  """__add__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
202  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___add__(self, n)
203 
204 
205  def __sub__(self, *args):
206  """
207  __sub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator
208  __sub__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t
209  """
210  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___sub__(self, *args)
211 
212  def __iter__(self):
213  return self
214 IMP_PARALLEL_SwigPyIterator_swigregister = _IMP_parallel.IMP_PARALLEL_SwigPyIterator_swigregister
215 IMP_PARALLEL_SwigPyIterator_swigregister(IMP_PARALLEL_SwigPyIterator)
216 
217 
218 _value_types=[]
219 _object_types=[]
220 _raii_types=[]
221 _plural_types=[]
222 
223 IMP_DEBUG = _IMP_parallel.IMP_DEBUG
224 IMP_RELEASE = _IMP_parallel.IMP_RELEASE
225 IMP_SILENT = _IMP_parallel.IMP_SILENT
226 IMP_PROGRESS = _IMP_parallel.IMP_PROGRESS
227 IMP_TERSE = _IMP_parallel.IMP_TERSE
228 IMP_VERBOSE = _IMP_parallel.IMP_VERBOSE
229 IMP_MEMORY = _IMP_parallel.IMP_MEMORY
230 IMP_NONE = _IMP_parallel.IMP_NONE
231 IMP_USAGE = _IMP_parallel.IMP_USAGE
232 IMP_INTERNAL = _IMP_parallel.IMP_INTERNAL
233 IMP_KERNEL_HAS_LOG4CXX = _IMP_parallel.IMP_KERNEL_HAS_LOG4CXX
234 IMP_COMPILER_HAS_AUTO = _IMP_parallel.IMP_COMPILER_HAS_AUTO
235 IMP_COMPILER_HAS_DEBUG_VECTOR = _IMP_parallel.IMP_COMPILER_HAS_DEBUG_VECTOR
236 IMP_COMPILER_HAS_UNIQUE_PTR = _IMP_parallel.IMP_COMPILER_HAS_UNIQUE_PTR
237 IMP_KERNEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_KERNEL_HAS_BOOST_RANDOM
238 IMP_KERNEL_HAS_GPERFTOOLS = _IMP_parallel.IMP_KERNEL_HAS_GPERFTOOLS
239 IMP_KERNEL_HAS_TCMALLOC_HEAPCHECKER = _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPCHECKER
240 IMP_KERNEL_HAS_TCMALLOC_HEAPPROFILER = _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPPROFILER
241 IMPKERNEL_SHOW_WARNINGS = _IMP_parallel.IMPKERNEL_SHOW_WARNINGS
242 
243 import sys
244 class _DirectorObjects(object):
245  """@internal Simple class to keep references to director objects
246  to prevent premature deletion."""
247  def __init__(self):
248  self._objects = []
249  def register(self, obj):
250  """Take a reference to a director object; will only work for
251  refcounted C++ classes"""
252  if hasattr(obj, 'get_ref_count'):
253  self._objects.append(obj)
254  def cleanup(self):
255  """Only drop our reference and allow cleanup by Python if no other
256  Python references exist (we hold 3 references: one in self._objects,
257  one in x, and one in the argument list for getrefcount) *and* no
258  other C++ references exist (the Python object always holds one)"""
259  objs = [x for x in self._objects if sys.getrefcount(x) > 3 \
260  or x.get_ref_count() > 1]
261 # Do in two steps so the references are kept until the end of the
262 # function (deleting references may trigger a fresh call to this method)
263  self._objects = objs
264  def get_object_count(self):
265  """Get number of director objects (useful for testing only)"""
266  return len(self._objects)
267 _director_objects = _DirectorObjects()
268 
269 class _ostream(object):
270  """Proxy of C++ std::ostream class."""
271 
272  thisown = _swig_property(lambda x: x.this.own(), lambda x, v: x.this.own(v), doc='The membership flag')
273 
274  def __init__(self, *args, **kwargs):
275  raise AttributeError("No constructor defined")
276  __repr__ = _swig_repr
277 
278  def write(self, osa_buf):
279  """write(_ostream self, char const * osa_buf)"""
280  return _IMP_parallel._ostream_write(self, osa_buf)
281 
282 _ostream_swigregister = _IMP_parallel._ostream_swigregister
283 _ostream_swigregister(_ostream)
284 
285 IMP_COMPILER_HAS_OVERRIDE = _IMP_parallel.IMP_COMPILER_HAS_OVERRIDE
286 IMP_COMPILER_HAS_FINAL = _IMP_parallel.IMP_COMPILER_HAS_FINAL
287 IMP_HAS_NOEXCEPT = _IMP_parallel.IMP_HAS_NOEXCEPT
288 import IMP
289 IMP_PARALLEL_HAS_BOOST_FILESYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_FILESYSTEM
290 IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS
291 IMP_PARALLEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_RANDOM
292 IMP_PARALLEL_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_SYSTEM
293 IMPPARALLEL_SHOW_WARNINGS = _IMP_parallel.IMPPARALLEL_SHOW_WARNINGS
294 
295 
296 import socket
297 import time
298 import sys
299 import os
300 import re
301 import random
302 import socket
303 import xdrlib
304 try:
305  import cPickle as pickle
306 except ImportError:
307  import pickle
308 from IMP.parallel import slavestate
309 from IMP.parallel.subproc import _run_background, _Popen4
310 from IMP.parallel.util import _ListenSocket, _ErrorWrapper
311 from IMP.parallel.util import _TaskWrapper, _HeartBeat, _ContextWrapper
312 from IMP.parallel.util import _SetPathAction
313 
314 # Save sys.path at import time, so that slaves can import using the same
315 # path that works for the master imports
316 _import_time_path = sys.path[:]
317 
319  """Base class for all errors specific to the parallel module"""
320  pass
321 
322 
323 class _NoMoreTasksError(Error):
324  pass
325 
326 
328  """Error raised if all slaves failed, so tasks cannot be run"""
329  pass
330 
331 
333  """Error raised if a problem occurs with the network"""
334  pass
335 
336 
338  """Error raised if a slave has an unhandled exception"""
339  def __init__(self, exc, traceback, slave):
340  self.exc = exc
341  self.traceback = traceback
342  self.slave = slave
343 
344  def __str__(self):
345  errstr = str(self.exc.__class__).replace("exceptions.", "")
346  return "%s: %s from %s\nRemote traceback:\n%s" \
347  % (errstr, str(self.exc), str(self.slave), self.traceback)
348 
349 class _Communicator(object):
350  """Simple support for sending Python pickled objects over the network"""
351 
352  def __init__(self):
353  self._socket = None
354  self._ibuffer = b''
355 
356  def _send(self, obj):
357  p = xdrlib.Packer()
358  try:
359  p.pack_string(pickle.dumps(obj, -1))
360 # Python < 2.5 can fail trying to send Inf or NaN floats in binary
361 # mode, so fall back to the old protocol in this case:
362  except SystemError:
363  p.pack_string(pickle.dumps(obj, 0))
364  self._socket.sendall(p.get_buffer())
365 
366  def get_data_pending(self):
367  return len(self._ibuffer) > 0
368 
369  def _recv(self):
370  while True:
371  try:
372  obj, self._ibuffer = self._unpickle(self._ibuffer)
373  if isinstance(obj, _ErrorWrapper):
374  raise RemoteError(obj.obj, obj.traceback, self)
375  else:
376  return obj
377  except (IndexError, EOFError):
378  try:
379  data = self._socket.recv(4096)
380  except socket.error as detail:
381  raise NetworkError("Connection lost to %s: %s" \
382  % (str(self), str(detail)))
383  if len(data) > 0:
384  self._ibuffer += data
385  else:
386  raise NetworkError("%s closed connection" % str(self))
387 
388  def _unpickle(self, ibuffer):
389  p = xdrlib.Unpacker(ibuffer)
390  obj = p.unpack_string()
391  return (pickle.loads(obj), ibuffer[p.get_position():])
392 
393 
394 class Slave(_Communicator):
395  """Representation of a single slave.
396  Each slave uses a single thread of execution (i.e. a single CPU core)
397  to run tasks sequentially.
398  Slave is an abstract class; instead of using this class directly, use
399  a subclass such as LocalSlave or SGEQsubSlaveArray."""
400 
401  def __init__(self):
402  _Communicator.__init__(self)
403  self._state = slavestate.init
404  self._context = None
405  self._task = None
406  self.update_contact_time()
407 
408  def _start(self, command, unique_id, output):
409  """Start the slave running on the remote host; override in subclasses"""
410  self._state = slavestate.started
411 
412  def _accept_connection(self, sock):
413  self._socket = sock
414  self._state = slavestate.connected
415  self.update_contact_time()
416 
417  def _set_python_search_path(self, path):
418  self._send(_SetPathAction(path))
419 
420  def update_contact_time(self):
421  self.last_contact_time = time.time()
422 
423  def get_contact_timed_out(self, timeout):
424  return (time.time() - self.last_contact_time) > timeout
425 
426  def _start_task(self, task, context):
427  if not self._ready_for_task(context) and not self._ready_for_task(None):
428  raise TypeError("%s not ready for task" % str(self))
429  if self._context != context:
430  self._context = context
431  self._send(_ContextWrapper(context._startup))
432  self._state = slavestate.running_task
433  self._task = task
434  self._send(_TaskWrapper(task))
435 
436  def _get_finished_task(self):
437  while True:
438  r = self._recv()
439  self.update_contact_time()
440  if isinstance(r, _HeartBeat):
441  if not self.get_data_pending():
442  return None
443  else:
444  break
445  task = self._task
446  task._results = r
447  self._task = None
448  self._state = slavestate.connected
449  return task
450 
451  def _kill(self):
452  task = self._task
453  self._task = None
454  self._context = None
455  self._state = slavestate.dead
456  return task
457 
458  def _ready_to_start(self):
459  return self._state == slavestate.init
460 
461  def _ready_for_task(self, context):
462  return self._state == slavestate.connected \
463  and self._context == context
464 
465  def _running_task(self, context):
466  return self._state == slavestate.running_task \
467  and self._context == context
468 
469 
470 class SlaveArray(object):
471  """Representation of an array of slaves.
472  This is similar to Slave, except that it represents a collection of
473  slaves that are controlled together, such as a batch submission system
474  array job on a compute cluster.
475  Slave is an abstract class; instead of using this class directly, use
476  a subclass such as SGEQsubSlaveArray."""
477 
478  def _get_slaves(self):
479  """Return a list of Slave objects contained within this array"""
480  pass
481 
482  def _start(self):
483  """Do any necessary startup after all contained Slaves have started"""
484  pass
485 
486 
487 class LocalSlave(Slave):
488  """A slave running on the same machine as the master."""
489 
490  def _start(self, command, unique_id, output):
491  Slave._start(self, command, unique_id, output)
492  cmdline = "%s %s" % (command, unique_id)
493  _run_background(cmdline, output)
494 
495  def __repr__(self):
496  return "<LocalSlave>"
497 
498 
499 class _SGEQsubSlave(Slave):
500  def __init__(self, array):
501  Slave.__init__(self)
502  self._jobid = None
503  self._array = array
504 
505  def _start(self, command, unique_id, output):
506  Slave._start(self, command, unique_id, output)
507  self._array._slave_started(unique_id, output, self)
508 
509  def __repr__(self):
510  jobid = self._jobid
511  if jobid is None:
512  jobid = '(unknown)'
513  return "<SGE qsub slave, ID %s>" % jobid
514 
515 
517  """An array of slaves on a Sun Grid Engine system, started with 'qsub'.
518  To use this class, the master process must be running on a machine that
519  can submit Sun Grid Engine (SGE) jobs using the 'qsub' command (this
520  is termed a 'submit host' by SGE). The class starts an SGE job array
521  (every slave has the same SGE job ID, but a different task ID).
522  """
523 
524 
525  standard_options = '-j y -cwd -r n -o sge-errors'
526 
527  def __init__(self, numslave, options):
528  """Constructor.
529  @param numslave The number of slaves, which corresponds to the
530  number of tasks in the SGE job.
531  @param options A string of SGE options that are passed on the 'qsub'
532  command line. This is added to standard_options.
533  """
534  self._numslave = numslave
535  self._options = options
536  self._starting_slaves = []
537  self._jobid = None
538 
539  def _get_slaves(self):
540  """Return a list of Slave objects contained within this array"""
541  return [_SGEQsubSlave(self) for x in range(self._numslave)]
542 
543  def _slave_started(self, command, output, slave):
544  self._starting_slaves.append((command, output, slave))
545 
546  def _start(self, command):
547  qsub = "qsub -S /bin/sh %s %s -t 1-%d" % \
548  (self._options, self.standard_options,
549  len(self._starting_slaves))
550  print(qsub)
551  a = _Popen4(qsub)
552  (inp, out) = (a.stdin, a.stdout)
553  slave_uid = " ".join([repr(s[0]) for s in self._starting_slaves])
554  slave_out = " ".join([repr(s[1]) for s in self._starting_slaves])
555  inp.write("#!/bin/sh\n")
556  inp.write("uid=( '' %s )\n" % slave_uid)
557  inp.write("out=( '' %s )\n" % slave_out)
558  inp.write("myuid=${uid[$SGE_TASK_ID]}\n")
559  inp.write("myout=${out[$SGE_TASK_ID]}\n")
560  inp.write("%s $myuid > $myout 2>&1\n" % command)
561  inp.close()
562  outlines = out.readlines()
563  out.close()
564  for line in outlines:
565  print(line.rstrip('\r\n'))
566  a.require_clean_exit()
567  self._set_jobid(outlines)
568  self._starting_slaves = []
569 
570  def _set_jobid(self, outlines):
571  """Try to figure out the job ID from the SGE qsub output"""
572  if len(outlines) > 0:
573  m = re.compile(r"\d+").search(outlines[0])
574  if m:
575  self._jobid = int(m.group())
576  for (num, slave) in enumerate(self._starting_slaves):
577  slave[2]._jobid = "%d.%d" % (self._jobid, num+1)
578 
579 
580 class _SGEPESlave(Slave):
581  def __init__(self, host):
582  Slave.__init__(self)
583  self._host = host
584 
585  def _start(self, command, unique_id, output):
586  Slave._start(self, command, unique_id, output)
587  cmdline = "qrsh -inherit -V %s %s %s" % (self._host, command, unique_id)
588  _run_background(cmdline, output)
589 
590  def __repr__(self):
591  return "<SGE PE slave on %s>" % self._host
592 
593 
595  """An array of slaves in a Sun Grid Engine system parallel environment.
596  In order to use this class, the master must be run via Sun Grid Engine's
597  'qsub' command and submitted to a parallel environment using the qsub
598  -pe option. This class will start slaves on every node in the parallel
599  environment (including the node running the master). Each slave is
600  started using the 'qrsh' command with the '-inherit' option."""
601 
602  def _get_slaves(self):
603  slaves = []
604 
605  pe = os.environ['PE_HOSTFILE']
606  fh = open(pe, "r")
607  while True:
608  line = fh.readline()
609  if line == '':
610  break
611  (node, num, queue) = line.split(None, 2)
612  for i in range(int(num)):
613  slaves.append(_SGEPESlave(node))
614 # Replace first slave with a local slave, as this is ourself, and SGE
615 # won't let us start this process with qrsh (as we are already
616 # occupying the slot)
617  if len(slaves) > 0:
618  slaves[0] = LocalSlave()
619  return slaves
620 
621 
622 class Context(object):
623  """A collection of tasks that run in the same environment.
624  Context objects are typically created by calling Manager::get_context().
625  """
626  def __init__(self, manager, startup=None):
627  """Constructor."""
628  self._manager = manager
629  self._startup = startup
630  self._tasks = []
631 
632  def add_task(self, task):
633  """Add a task to this context.
634  Tasks are any Python callable object that can be pickled (e.g. a
635  function or a class that implements the \_\_call\_\_ method). When
636  the task is run on the slave its arguments are the return value
637  from this context's startup function."""
638  self._tasks.append(task)
639 
640  def get_results_unordered(self):
641  """Run all of the tasks on available slaves, and return results.
642  If there are more tasks than slaves, subsequent tasks are
643  started only once a running task completes: each slave only runs
644  a single task at a time. As each task completes, the return value(s)
645  from the task callable are returned from this method, as a
646  Python generator. Note that the results are returned in the order
647  that tasks complete, which may not be the same as the order they
648  were submitted in.
650  \exception NoMoreSlavesError there are no slaves available
651  to run the tasks (or they all failed during execution).
652  \exception RemoteError a slave encountered an unhandled exception.
653  \exception NetworkError the master lost (or was unable to
654  establish) communication with any slave.
655  """
656  return self._manager._get_results_unordered(self)
657 
658 
659 class Manager(object):
660  """Manages slaves and contexts.
661  """
662 
663  connect_timeout = 7200
664 
665 # Note: must be higher than that in slave_handler._HeartBeatThread
666  heartbeat_timeout = 7200
667 
668  def __init__(self, python=None, host=None, output='slave%d.output'):
669  """Constructor.
670  @param python If not None, the command to run to start a Python
671  interpreter that can import the IMP module. Otherwise,
672  the same interpreter that the master is currently using
673  is used. This is passed to the shell, so a full command
674  line (including multiple words separated by spaces) can
675  be used if necessary.
676  @param host The hostname that slaves use to connect back to the
677  master. If not specified, the master machine's primary
678  IP address is used. On multi-homed machines, such as
679  compute cluster headnodes, this may need to be changed
680  to allow all slaves to reach the master (typically the
681  name of the machine's internal network address is
682  needed). If only running local slaves, 'localhost' can
683  be used to prohibit connections across the network.
684  @param output A format string used to name slave output files. It is
685  given the numeric slave id, so for example the default
686  value 'slave\%d.output' will yield output files called
687  slave0.output, slave1.output, etc.
688  """
689  if python is None:
690  self._python = sys.executable
691  else:
692  self._python = python
693  self._host = host
694  self._output = output
695  self._all_slaves = []
696  self._starting_slaves = {}
697  self._slave_arrays = []
698  if host:
699  self._host = host
700  else:
701 # Get primary IP address of this machine
702  self._host = socket.gethostbyname_ex(socket.gethostname())[-1][0]
703  self._listen_sock = _ListenSocket(self._host, self.connect_timeout)
704 
705  def add_slave(self, slave):
706  """Add a Slave object."""
707  if hasattr(slave, '_get_slaves'):
708  self._slave_arrays.append(slave)
709  else:
710  self._all_slaves.append(slave)
711 
712  def get_context(self, startup=None):
713  """Create and return a new Context in which tasks can be run.
714  @param startup If not None, a callable (Python function or class
715  that implements the \_\_call\_\_ method) that sets up
716  the slave to run tasks. This method is only called
717  once per slave. The return values from this method
718  are passed to the task object when it runs on
719  the slave.
720  @return A new Context object.
721  """
722  return Context(self, startup)
723 
724  def _get_results_unordered(self, context):
725  """Run all of a context's tasks, and yield results"""
726  self._send_tasks_to_slaves(context)
727  try:
728  while True:
729  for task in self._get_finished_tasks(context):
730  tasks_queued = len(context._tasks)
731  yield task._results
732 # If the user added more tasks while processing these
733 # results, make sure they get sent off to the slaves
734  if len(context._tasks) > tasks_queued:
735  self._send_tasks_to_slaves(context)
736  except _NoMoreTasksError:
737  return
738 
739  def _start_all_slaves(self):
740  for array in self._slave_arrays:
741  self._all_slaves.extend(array._get_slaves())
742 
743  command = ("%s -c \"import IMP.parallel.slave_handler as s; s.main()\" "
744  "%s %d") % (self._python, self._host, self._listen_sock.port)
745 
746  for (num, slave) in enumerate(self._all_slaves):
747  if slave._ready_to_start():
748  unique_id = self._get_unique_id(num)
749  self._starting_slaves[unique_id] = slave
750  slave._start(command, unique_id, self._output % num)
751 
752  for array in self._slave_arrays:
753  array._start(command)
754  self._slave_arrays = []
755 
756  def _get_unique_id(self, num):
757  id = "%d:" % num
758  for i in range(0, 8):
759  id += chr(random.randint(0, 25) + ord('A'))
760  return id
761 
762  def _send_tasks_to_slaves(self, context):
763  self._start_all_slaves()
764 # Prefer slaves that already have the task context
765  available_slaves = [a for a in self._all_slaves
766  if a._ready_for_task(context)] + \
767  [a for a in self._all_slaves
768  if a._ready_for_task(None)]
769  for slave in available_slaves:
770  if len(context._tasks) == 0:
771  break
772  else:
773  self._send_task_to_slave(slave, context)
774 
775  def _send_task_to_slave(self, slave, context):
776  if len(context._tasks) == 0:
777  return
778  t = context._tasks[0]
779  try:
780  slave._start_task(t, context)
781  context._tasks.pop(0)
782  except socket.error as detail:
783  slave._kill()
784 
785  def _get_finished_tasks(self, context):
786  while True:
787  events = self._get_network_events(context)
788  if len(events) == 0:
789  self._kill_all_running_slaves(context)
790  for event in events:
791  task = self._process_event(event, context)
792  if task:
793  yield task
794 
795  def _process_event(self, event, context):
796  if event == self._listen_sock:
797 # New slave just connected
798  (conn, addr) = self._listen_sock.accept()
799  new_slave = self._accept_slave(conn, context)
800  elif event._running_task(context):
801  try:
802  task = event._get_finished_task()
803  if task:
804  self._send_task_to_slave(event, context)
805  return task
806  else: # the slave sent back a heartbeat
807  self._kill_timed_out_slaves(context)
808  except NetworkError as detail:
809  task = event._kill()
810  print("Slave %s failed (%s): rescheduling task %s" \
811  % (str(event), str(detail), str(task)))
812  context._tasks.append(task)
813  self._send_tasks_to_slaves(context)
814  else:
815  pass # Slave not running a task
816 
817  def _kill_timed_out_slaves(self, context):
818  timed_out = [a for a in self._all_slaves if a._running_task(context) \
819  and a.get_contact_timed_out(self.heartbeat_timeout)]
820  for slave in timed_out:
821  task = slave._kill()
822  print("Did not hear from slave %s in %d seconds; rescheduling "
823  "task %s" % (str(slave), self.heartbeat_timeout, str(task)))
824  context._tasks.append(task)
825  if len(timed_out) > 0:
826  self._send_tasks_to_slaves(context)
827 
828  def _kill_all_running_slaves(self, context):
829  running = [a for a in self._all_slaves if a._running_task(context)]
830  for slave in running:
831  task = slave._kill()
832  context._tasks.append(task)
833  raise NetworkError("Did not hear from any running slave in "
834  "%d seconds" % self.heartbeat_timeout)
835 
836  def _accept_slave(self, sock, context):
837  sock.setblocking(True)
838  identifier = sock.recv(1024)
839  if identifier:
840  identifier = identifier.decode('ascii')
841  if identifier and identifier in self._starting_slaves:
842  slave = self._starting_slaves.pop(identifier)
843  slave._accept_connection(sock)
844  print("Identified slave %s " % str(slave))
845  self._init_slave(slave)
846  self._send_task_to_slave(slave, context)
847  return slave
848  else:
849  print("Ignoring request from unknown slave")
850 
851  def _init_slave(self, slave):
852  if _import_time_path[0] != '':
853  slave._set_python_search_path(_import_time_path[0])
854  if sys.path[0] != '' and sys.path[0] != _import_time_path[0]:
855  slave._set_python_search_path(sys.path[0])
856 
857  def _get_network_events(self, context):
858  running = [a for a in self._all_slaves if a._running_task(context)]
859  if len(running) == 0:
860  if len(context._tasks) == 0:
861  raise _NoMoreTasksError()
862  elif len(self._starting_slaves) == 0:
863  raise NoMoreSlavesError("Ran out of slaves to run tasks")
864 # Otherwise, wait for starting slaves to connect back and get tasks
865 
866  return util._poll_events(self._listen_sock, running,
867  self.heartbeat_timeout)
868 
869 
870 
871 def get_module_version():
872  """get_module_version() -> std::string const"""
873  return _IMP_parallel.get_module_version()
874 
875 def get_example_path(fname):
876  """get_example_path(std::string fname) -> std::string"""
877  return _IMP_parallel.get_example_path(fname)
878 
879 def get_data_path(fname):
880  """get_data_path(std::string fname) -> std::string"""
881  return _IMP_parallel.get_data_path(fname)
882 
883 from . import _version_check
884 _version_check.check_version(get_module_version())
885 __version__ = get_module_version()
886 
887 
888 
Representation of an array of slaves.
std::string get_data_path(std::string file_name)
Return the full path to one of this module's data files.
Error raised if a slave has an unhandled exception.
Subprocess handling.
Definition: subproc.py:1
def __init__
Constructor.
An array of slaves in a Sun Grid Engine system parallel environment.
A collection of tasks that run in the same environment.
Representation of a single slave.
std::string get_example_path(std::string file_name)
Return the full path to one of this module's example files.
def add_task
Add a task to this context.
Utilities for the IMP.parallel module.
Definition: util.py:1
Base class for all errors specific to the parallel module.
The general base class for IMP exceptions.
Definition: exception.h:49
def add_slave
Add a Slave object.
def get_context
Create and return a new Context in which tasks can be run.
Error raised if a problem occurs with the network.
Error raised if all slaves failed, so tasks cannot be run.
def __init__
Constructor.
Distribute IMP tasks to multiple processors or machines.
An array of slaves on a Sun Grid Engine system, started with 'qsub'.
def get_results_unordered
Run all of the tasks on available slaves, and return results.
Manages slaves and contexts.
A slave running on the same machine as the master.