IMP logo
IMP Reference Guide  2.6.2
The Integrative Modeling Platform
parallel/__init__.py
1 # This file was automatically generated by SWIG (http://www.swig.org).
2 # Version 3.0.8
3 #
4 # Do not make changes to this file unless you know what you are doing--modify
5 # the SWIG interface file instead.
6 
7 # This wrapper is part of IMP,
8 # Copyright 2007-2016 IMP Inventors. All rights reserved.
9 
10 from __future__ import print_function, division, absolute_import
11 
12 
13 
14 
15 from sys import version_info
16 if version_info >= (2, 6, 0):
17  def swig_import_helper():
18  from os.path import dirname
19  import imp
20  fp = None
21  try:
22  fp, pathname, description = imp.find_module('_IMP_parallel', [dirname(__file__)])
23  except ImportError:
24  import _IMP_parallel
25  return _IMP_parallel
26  if fp is not None:
27  try:
28  _mod = imp.load_module('_IMP_parallel', fp, pathname, description)
29  finally:
30  fp.close()
31  return _mod
32  _IMP_parallel = swig_import_helper()
33  del swig_import_helper
34 else:
35  import _IMP_parallel
36 del version_info
37 try:
38  _swig_property = property
39 except NameError:
40  pass # Python < 2.2 doesn't have 'property'.
41 
42 
43 def _swig_setattr_nondynamic(self, class_type, name, value, static=1):
44  if (name == "thisown"):
45  return self.this.own(value)
46  if (name == "this"):
47  if type(value).__name__ == 'SwigPyObject':
48  self.__dict__[name] = value
49  return
50  method = class_type.__swig_setmethods__.get(name, None)
51  if method:
52  return method(self, value)
53  if (not static):
54  object.__setattr__(self, name, value)
55  else:
56  raise AttributeError("You cannot add attributes to %s" % self)
57 
58 
59 def _swig_setattr(self, class_type, name, value):
60  return _swig_setattr_nondynamic(self, class_type, name, value, 0)
61 
62 
63 def _swig_getattr_nondynamic(self, class_type, name, static=1):
64  if (name == "thisown"):
65  return self.this.own()
66  method = class_type.__swig_getmethods__.get(name, None)
67  if method:
68  return method(self)
69  if (not static):
70  return object.__getattr__(self, name)
71  else:
72  raise AttributeError(name)
73 
74 def _swig_getattr(self, class_type, name):
75  return _swig_getattr_nondynamic(self, class_type, name, 0)
76 
77 
78 def _swig_repr(self):
79  try:
80  strthis = "proxy of " + self.this.__repr__()
81  except Exception:
82  strthis = ""
83  return "<%s.%s; %s >" % (self.__class__.__module__, self.__class__.__name__, strthis,)
84 
85 try:
86  _object = object
87  _newclass = 1
88 except AttributeError:
89  class _object:
90  pass
91  _newclass = 0
92 
93 
94 
95 def _swig_setattr_nondynamic_method(set):
96  def set_attr(self, name, value):
97  if (name == "thisown"):
98  return self.this.own(value)
99  if hasattr(self, name) or (name == "this"):
100  set(self, name, value)
101  else:
102  raise AttributeError("You cannot add attributes to %s" % self)
103  return set_attr
104 
105 
106 try:
107  import weakref
108  weakref_proxy = weakref.proxy
109 except Exception:
110  weakref_proxy = lambda x: x
111 
112 
113 class IMP_PARALLEL_SwigPyIterator(object):
114  """Proxy of C++ swig::IMP_PARALLEL_SwigPyIterator class."""
115 
116  thisown = _swig_property(lambda x: x.this.own(), lambda x, v: x.this.own(v), doc='The membership flag')
117 
118  def __init__(self, *args, **kwargs):
119  raise AttributeError("No constructor defined - class is abstract")
120  __repr__ = _swig_repr
121  __swig_destroy__ = _IMP_parallel.delete_IMP_PARALLEL_SwigPyIterator
122  __del__ = lambda self: None
123 
124  def value(self):
125  """value(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
126  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_value(self)
127 
128 
129  def incr(self, n=1):
130  """
131  incr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
132  incr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
133  """
134  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_incr(self, n)
135 
136 
137  def decr(self, n=1):
138  """
139  decr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
140  decr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
141  """
142  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_decr(self, n)
143 
144 
145  def distance(self, x):
146  """distance(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t"""
147  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_distance(self, x)
148 
149 
150  def equal(self, x):
151  """equal(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
152  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_equal(self, x)
153 
154 
155  def copy(self):
156  """copy(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator"""
157  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_copy(self)
158 
159 
160  def next(self):
161  """next(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
162  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_next(self)
163 
164 
165  def __next__(self):
166  """__next__(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
167  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___next__(self)
168 
169 
170  def previous(self):
171  """previous(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
172  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_previous(self)
173 
174 
175  def advance(self, n):
176  """advance(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
177  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_advance(self, n)
178 
179 
180  def __eq__(self, x):
181  """__eq__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
182  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___eq__(self, x)
183 
184 
185  def __ne__(self, x):
186  """__ne__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
187  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___ne__(self, x)
188 
189 
190  def __iadd__(self, n):
191  """__iadd__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
192  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___iadd__(self, n)
193 
194 
195  def __isub__(self, n):
196  """__isub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
197  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___isub__(self, n)
198 
199 
200  def __add__(self, n):
201  """__add__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
202  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___add__(self, n)
203 
204 
205  def __sub__(self, *args):
206  """
207  __sub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator
208  __sub__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t
209  """
210  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___sub__(self, *args)
211 
212  def __iter__(self):
213  return self
214 IMP_PARALLEL_SwigPyIterator_swigregister = _IMP_parallel.IMP_PARALLEL_SwigPyIterator_swigregister
215 IMP_PARALLEL_SwigPyIterator_swigregister(IMP_PARALLEL_SwigPyIterator)
216 
217 
218 _value_types=[]
219 _object_types=[]
220 _raii_types=[]
221 _plural_types=[]
222 
223 
224 _IMP_parallel.IMP_DEBUG_swigconstant(_IMP_parallel)
225 IMP_DEBUG = _IMP_parallel.IMP_DEBUG
226 
227 _IMP_parallel.IMP_RELEASE_swigconstant(_IMP_parallel)
228 IMP_RELEASE = _IMP_parallel.IMP_RELEASE
229 
230 _IMP_parallel.IMP_SILENT_swigconstant(_IMP_parallel)
231 IMP_SILENT = _IMP_parallel.IMP_SILENT
232 
233 _IMP_parallel.IMP_PROGRESS_swigconstant(_IMP_parallel)
234 IMP_PROGRESS = _IMP_parallel.IMP_PROGRESS
235 
236 _IMP_parallel.IMP_TERSE_swigconstant(_IMP_parallel)
237 IMP_TERSE = _IMP_parallel.IMP_TERSE
238 
239 _IMP_parallel.IMP_VERBOSE_swigconstant(_IMP_parallel)
240 IMP_VERBOSE = _IMP_parallel.IMP_VERBOSE
241 
242 _IMP_parallel.IMP_MEMORY_swigconstant(_IMP_parallel)
243 IMP_MEMORY = _IMP_parallel.IMP_MEMORY
244 
245 _IMP_parallel.IMP_NONE_swigconstant(_IMP_parallel)
246 IMP_NONE = _IMP_parallel.IMP_NONE
247 
248 _IMP_parallel.IMP_USAGE_swigconstant(_IMP_parallel)
249 IMP_USAGE = _IMP_parallel.IMP_USAGE
250 
251 _IMP_parallel.IMP_INTERNAL_swigconstant(_IMP_parallel)
252 IMP_INTERNAL = _IMP_parallel.IMP_INTERNAL
253 
254 _IMP_parallel.IMP_KERNEL_HAS_LOG4CXX_swigconstant(_IMP_parallel)
255 IMP_KERNEL_HAS_LOG4CXX = _IMP_parallel.IMP_KERNEL_HAS_LOG4CXX
256 
257 _IMP_parallel.IMP_COMPILER_HAS_AUTO_swigconstant(_IMP_parallel)
258 IMP_COMPILER_HAS_AUTO = _IMP_parallel.IMP_COMPILER_HAS_AUTO
259 
260 _IMP_parallel.IMP_COMPILER_HAS_DEBUG_VECTOR_swigconstant(_IMP_parallel)
261 IMP_COMPILER_HAS_DEBUG_VECTOR = _IMP_parallel.IMP_COMPILER_HAS_DEBUG_VECTOR
262 
263 _IMP_parallel.IMP_COMPILER_HAS_UNIQUE_PTR_swigconstant(_IMP_parallel)
264 IMP_COMPILER_HAS_UNIQUE_PTR = _IMP_parallel.IMP_COMPILER_HAS_UNIQUE_PTR
265 
266 _IMP_parallel.IMP_KERNEL_HAS_BOOST_RANDOM_swigconstant(_IMP_parallel)
267 IMP_KERNEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_KERNEL_HAS_BOOST_RANDOM
268 
269 _IMP_parallel.IMP_KERNEL_HAS_GPERFTOOLS_swigconstant(_IMP_parallel)
270 IMP_KERNEL_HAS_GPERFTOOLS = _IMP_parallel.IMP_KERNEL_HAS_GPERFTOOLS
271 
272 _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPCHECKER_swigconstant(_IMP_parallel)
273 IMP_KERNEL_HAS_TCMALLOC_HEAPCHECKER = _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPCHECKER
274 
275 _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPPROFILER_swigconstant(_IMP_parallel)
276 IMP_KERNEL_HAS_TCMALLOC_HEAPPROFILER = _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPPROFILER
277 
278 _IMP_parallel.IMPKERNEL_SHOW_WARNINGS_swigconstant(_IMP_parallel)
279 IMPKERNEL_SHOW_WARNINGS = _IMP_parallel.IMPKERNEL_SHOW_WARNINGS
280 
281 import sys
282 class _DirectorObjects(object):
283  """@internal Simple class to keep references to director objects
284  to prevent premature deletion."""
285  def __init__(self):
286  self._objects = []
287  def register(self, obj):
288  """Take a reference to a director object; will only work for
289  refcounted C++ classes"""
290  if hasattr(obj, 'get_ref_count'):
291  self._objects.append(obj)
292  def cleanup(self):
293  """Only drop our reference and allow cleanup by Python if no other
294  Python references exist (we hold 3 references: one in self._objects,
295  one in x, and one in the argument list for getrefcount) *and* no
296  other C++ references exist (the Python object always holds one)"""
297  objs = [x for x in self._objects if sys.getrefcount(x) > 3 \
298  or x.get_ref_count() > 1]
299 # Do in two steps so the references are kept until the end of the
300 # function (deleting references may trigger a fresh call to this method)
301  self._objects = objs
302  def get_object_count(self):
303  """Get number of director objects (useful for testing only)"""
304  return len(self._objects)
305 _director_objects = _DirectorObjects()
306 
307 class _ostream(object):
308  """Proxy of C++ std::ostream class."""
309 
310  thisown = _swig_property(lambda x: x.this.own(), lambda x, v: x.this.own(v), doc='The membership flag')
311 
312  def __init__(self, *args, **kwargs):
313  raise AttributeError("No constructor defined")
314  __repr__ = _swig_repr
315 
316  def write(self, osa_buf):
317  """write(_ostream self, char const * osa_buf)"""
318  return _IMP_parallel._ostream_write(self, osa_buf)
319 
320 _ostream_swigregister = _IMP_parallel._ostream_swigregister
321 _ostream_swigregister(_ostream)
322 
323 
324 _IMP_parallel.IMP_COMPILER_HAS_OVERRIDE_swigconstant(_IMP_parallel)
325 IMP_COMPILER_HAS_OVERRIDE = _IMP_parallel.IMP_COMPILER_HAS_OVERRIDE
326 
327 _IMP_parallel.IMP_COMPILER_HAS_FINAL_swigconstant(_IMP_parallel)
328 IMP_COMPILER_HAS_FINAL = _IMP_parallel.IMP_COMPILER_HAS_FINAL
329 
330 _IMP_parallel.IMP_HAS_NOEXCEPT_swigconstant(_IMP_parallel)
331 IMP_HAS_NOEXCEPT = _IMP_parallel.IMP_HAS_NOEXCEPT
332 import IMP
333 
334 _IMP_parallel.IMP_PARALLEL_HAS_BOOST_FILESYSTEM_swigconstant(_IMP_parallel)
335 IMP_PARALLEL_HAS_BOOST_FILESYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_FILESYSTEM
336 
337 _IMP_parallel.IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS_swigconstant(_IMP_parallel)
338 IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS
339 
340 _IMP_parallel.IMP_PARALLEL_HAS_BOOST_RANDOM_swigconstant(_IMP_parallel)
341 IMP_PARALLEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_RANDOM
342 
343 _IMP_parallel.IMP_PARALLEL_HAS_BOOST_SYSTEM_swigconstant(_IMP_parallel)
344 IMP_PARALLEL_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_SYSTEM
345 
346 _IMP_parallel.IMPPARALLEL_SHOW_WARNINGS_swigconstant(_IMP_parallel)
347 IMPPARALLEL_SHOW_WARNINGS = _IMP_parallel.IMPPARALLEL_SHOW_WARNINGS
348 
349 
350 import socket
351 import time
352 import sys
353 import os
354 import re
355 import random
356 import socket
357 import xdrlib
358 try:
359  import cPickle as pickle
360 except ImportError:
361  import pickle
362 from IMP.parallel import slavestate
363 from IMP.parallel.subproc import _run_background, _Popen4
364 from IMP.parallel.util import _ListenSocket, _ErrorWrapper
365 from IMP.parallel.util import _TaskWrapper, _HeartBeat, _ContextWrapper
366 from IMP.parallel.util import _SetPathAction
367 
368 # Save sys.path at import time, so that slaves can import using the same
369 # path that works for the master imports
370 _import_time_path = sys.path[:]
371 
373  """Base class for all errors specific to the parallel module"""
374  pass
375 
376 
377 class _NoMoreTasksError(Error):
378  pass
379 
380 
382  """Error raised if all slaves failed, so tasks cannot be run"""
383  pass
384 
385 
387  """Error raised if a problem occurs with the network"""
388  pass
389 
390 
392  """Error raised if a slave has an unhandled exception"""
393  def __init__(self, exc, traceback, slave):
394  self.exc = exc
395  self.traceback = traceback
396  self.slave = slave
397 
398  def __str__(self):
399  errstr = str(self.exc.__class__).replace("exceptions.", "")
400  return "%s: %s from %s\nRemote traceback:\n%s" \
401  % (errstr, str(self.exc), str(self.slave), self.traceback)
402 
403 class _Communicator(object):
404  """Simple support for sending Python pickled objects over the network"""
405 
406  def __init__(self):
407  self._socket = None
408  self._ibuffer = b''
409 
410  def _send(self, obj):
411  p = xdrlib.Packer()
412  try:
413  p.pack_string(pickle.dumps(obj, -1))
414 # Python < 2.5 can fail trying to send Inf or NaN floats in binary
415 # mode, so fall back to the old protocol in this case:
416  except SystemError:
417  p.pack_string(pickle.dumps(obj, 0))
418  self._socket.sendall(p.get_buffer())
419 
420  def get_data_pending(self):
421  return len(self._ibuffer) > 0
422 
423  def _recv(self):
424  while True:
425  try:
426  obj, self._ibuffer = self._unpickle(self._ibuffer)
427  if isinstance(obj, _ErrorWrapper):
428  raise RemoteError(obj.obj, obj.traceback, self)
429  else:
430  return obj
431  except (IndexError, EOFError):
432  try:
433  data = self._socket.recv(4096)
434  except socket.error as detail:
435  raise NetworkError("Connection lost to %s: %s" \
436  % (str(self), str(detail)))
437  if len(data) > 0:
438  self._ibuffer += data
439  else:
440  raise NetworkError("%s closed connection" % str(self))
441 
442  def _unpickle(self, ibuffer):
443  p = xdrlib.Unpacker(ibuffer)
444  obj = p.unpack_string()
445  return (pickle.loads(obj), ibuffer[p.get_position():])
446 
447 
448 class Slave(_Communicator):
449  """Representation of a single slave.
450  Each slave uses a single thread of execution (i.e. a single CPU core)
451  to run tasks sequentially.
452  Slave is an abstract class; instead of using this class directly, use
453  a subclass such as LocalSlave or SGEQsubSlaveArray."""
454 
455  def __init__(self):
456  _Communicator.__init__(self)
457  self._state = slavestate.init
458  self._context = None
459  self._task = None
460  self.update_contact_time()
461 
462  def _start(self, command, unique_id, output):
463  """Start the slave running on the remote host; override in subclasses"""
464  self._state = slavestate.started
465 
466  def _accept_connection(self, sock):
467  self._socket = sock
468  self._state = slavestate.connected
469  self.update_contact_time()
470 
471  def _set_python_search_path(self, path):
472  self._send(_SetPathAction(path))
473 
474  def update_contact_time(self):
475  self.last_contact_time = time.time()
476 
477  def get_contact_timed_out(self, timeout):
478  return (time.time() - self.last_contact_time) > timeout
479 
480  def _start_task(self, task, context):
481  if not self._ready_for_task(context) and not self._ready_for_task(None):
482  raise TypeError("%s not ready for task" % str(self))
483  if self._context != context:
484  self._context = context
485  self._send(_ContextWrapper(context._startup))
486  self._state = slavestate.running_task
487  self._task = task
488  self._send(_TaskWrapper(task))
489 
490  def _get_finished_task(self):
491  while True:
492  r = self._recv()
493  self.update_contact_time()
494  if isinstance(r, _HeartBeat):
495  if not self.get_data_pending():
496  return None
497  else:
498  break
499  task = self._task
500  task._results = r
501  self._task = None
502  self._state = slavestate.connected
503  return task
504 
505  def _kill(self):
506  task = self._task
507  self._task = None
508  self._context = None
509  self._state = slavestate.dead
510  return task
511 
512  def _ready_to_start(self):
513  return self._state == slavestate.init
514 
515  def _ready_for_task(self, context):
516  return self._state == slavestate.connected \
517  and self._context == context
518 
519  def _running_task(self, context):
520  return self._state == slavestate.running_task \
521  and self._context == context
522 
523 
524 class SlaveArray(object):
525  """Representation of an array of slaves.
526  This is similar to Slave, except that it represents a collection of
527  slaves that are controlled together, such as a batch submission system
528  array job on a compute cluster.
529  Slave is an abstract class; instead of using this class directly, use
530  a subclass such as SGEQsubSlaveArray."""
531 
532  def _get_slaves(self):
533  """Return a list of Slave objects contained within this array"""
534  pass
535 
536  def _start(self):
537  """Do any necessary startup after all contained Slaves have started"""
538  pass
539 
540 
541 class LocalSlave(Slave):
542  """A slave running on the same machine as the master."""
543 
544  def _start(self, command, unique_id, output):
545  Slave._start(self, command, unique_id, output)
546  cmdline = "%s %s" % (command, unique_id)
547  _run_background(cmdline, output)
548 
549  def __repr__(self):
550  return "<LocalSlave>"
551 
552 
553 class _SGEQsubSlave(Slave):
554  def __init__(self, array):
555  Slave.__init__(self)
556  self._jobid = None
557  self._array = array
558 
559  def _start(self, command, unique_id, output):
560  Slave._start(self, command, unique_id, output)
561  self._array._slave_started(unique_id, output, self)
562 
563  def __repr__(self):
564  jobid = self._jobid
565  if jobid is None:
566  jobid = '(unknown)'
567  return "<SGE qsub slave, ID %s>" % jobid
568 
569 
571  """An array of slaves on a Sun Grid Engine system, started with 'qsub'.
572  To use this class, the master process must be running on a machine that
573  can submit Sun Grid Engine (SGE) jobs using the 'qsub' command (this
574  is termed a 'submit host' by SGE). The class starts an SGE job array
575  (every slave has the same SGE job ID, but a different task ID).
576  """
577 
578 
579  standard_options = '-j y -cwd -r n -o sge-errors'
580 
581  def __init__(self, numslave, options):
582  """Constructor.
583  @param numslave The number of slaves, which corresponds to the
584  number of tasks in the SGE job.
585  @param options A string of SGE options that are passed on the 'qsub'
586  command line. This is added to standard_options.
587  """
588  self._numslave = numslave
589  self._options = options
590  self._starting_slaves = []
591  self._jobid = None
592 
593  def _get_slaves(self):
594  """Return a list of Slave objects contained within this array"""
595  return [_SGEQsubSlave(self) for x in range(self._numslave)]
596 
597  def _slave_started(self, command, output, slave):
598  self._starting_slaves.append((command, output, slave))
599 
600  def _start(self, command):
601  qsub = "qsub -S /bin/sh %s %s -t 1-%d" % \
602  (self._options, self.standard_options,
603  len(self._starting_slaves))
604  print(qsub)
605  a = _Popen4(qsub)
606  (inp, out) = (a.stdin, a.stdout)
607  slave_uid = " ".join([repr(s[0]) for s in self._starting_slaves])
608  slave_out = " ".join([repr(s[1]) for s in self._starting_slaves])
609  inp.write("#!/bin/sh\n")
610  inp.write("uid=( '' %s )\n" % slave_uid)
611  inp.write("out=( '' %s )\n" % slave_out)
612  inp.write("myuid=${uid[$SGE_TASK_ID]}\n")
613  inp.write("myout=${out[$SGE_TASK_ID]}\n")
614  inp.write("%s $myuid > $myout 2>&1\n" % command)
615  inp.close()
616  outlines = out.readlines()
617  out.close()
618  for line in outlines:
619  print(line.rstrip('\r\n'))
620  a.require_clean_exit()
621  self._set_jobid(outlines)
622  self._starting_slaves = []
623 
624  def _set_jobid(self, outlines):
625  """Try to figure out the job ID from the SGE qsub output"""
626  if len(outlines) > 0:
627  m = re.compile(r"\d+").search(outlines[0])
628  if m:
629  self._jobid = int(m.group())
630  for (num, slave) in enumerate(self._starting_slaves):
631  slave[2]._jobid = "%d.%d" % (self._jobid, num+1)
632 
633 
634 class _SGEPESlave(Slave):
635  def __init__(self, host):
636  Slave.__init__(self)
637  self._host = host
638 
639  def _start(self, command, unique_id, output):
640  Slave._start(self, command, unique_id, output)
641  cmdline = "qrsh -inherit -V %s %s %s" % (self._host, command, unique_id)
642  _run_background(cmdline, output)
643 
644  def __repr__(self):
645  return "<SGE PE slave on %s>" % self._host
646 
647 
649  """An array of slaves in a Sun Grid Engine system parallel environment.
650  In order to use this class, the master must be run via Sun Grid Engine's
651  'qsub' command and submitted to a parallel environment using the qsub
652  -pe option. This class will start slaves on every node in the parallel
653  environment (including the node running the master). Each slave is
654  started using the 'qrsh' command with the '-inherit' option."""
655 
656  def _get_slaves(self):
657  slaves = []
658 
659  pe = os.environ['PE_HOSTFILE']
660  fh = open(pe, "r")
661  while True:
662  line = fh.readline()
663  if line == '':
664  break
665  (node, num, queue) = line.split(None, 2)
666  for i in range(int(num)):
667  slaves.append(_SGEPESlave(node))
668 # Replace first slave with a local slave, as this is ourself, and SGE
669 # won't let us start this process with qrsh (as we are already
670 # occupying the slot)
671  if len(slaves) > 0:
672  slaves[0] = LocalSlave()
673  return slaves
674 
675 
676 class Context(object):
677  """A collection of tasks that run in the same environment.
678  Context objects are typically created by calling Manager::get_context().
679  """
680  def __init__(self, manager, startup=None):
681  """Constructor."""
682  self._manager = manager
683  self._startup = startup
684  self._tasks = []
685 
686  def add_task(self, task):
687  """Add a task to this context.
688  Tasks are any Python callable object that can be pickled (e.g. a
689  function or a class that implements the \_\_call\_\_ method). When
690  the task is run on the slave its arguments are the return value
691  from this context's startup function."""
692  self._tasks.append(task)
693 
694  def get_results_unordered(self):
695  """Run all of the tasks on available slaves, and return results.
696  If there are more tasks than slaves, subsequent tasks are
697  started only once a running task completes: each slave only runs
698  a single task at a time. As each task completes, the return value(s)
699  from the task callable are returned from this method, as a
700  Python generator. Note that the results are returned in the order
701  that tasks complete, which may not be the same as the order they
702  were submitted in.
704  \exception NoMoreSlavesError there are no slaves available
705  to run the tasks (or they all failed during execution).
706  \exception RemoteError a slave encountered an unhandled exception.
707  \exception NetworkError the master lost (or was unable to
708  establish) communication with any slave.
709  """
710  return self._manager._get_results_unordered(self)
711 
712 
713 class Manager(object):
714  """Manages slaves and contexts.
715  """
716 
717  connect_timeout = 7200
718 
719 # Note: must be higher than that in slave_handler._HeartBeatThread
720  heartbeat_timeout = 7200
721 
722  def __init__(self, python=None, host=None, output='slave%d.output'):
723  """Constructor.
724  @param python If not None, the command to run to start a Python
725  interpreter that can import the IMP module. Otherwise,
726  the same interpreter that the master is currently using
727  is used. This is passed to the shell, so a full command
728  line (including multiple words separated by spaces) can
729  be used if necessary.
730  @param host The hostname that slaves use to connect back to the
731  master. If not specified, the master machine's primary
732  IP address is used. On multi-homed machines, such as
733  compute cluster headnodes, this may need to be changed
734  to allow all slaves to reach the master (typically the
735  name of the machine's internal network address is
736  needed). If only running local slaves, 'localhost' can
737  be used to prohibit connections across the network.
738  @param output A format string used to name slave output files. It is
739  given the numeric slave id, so for example the default
740  value 'slave\%d.output' will yield output files called
741  slave0.output, slave1.output, etc.
742  """
743  if python is None:
744  self._python = sys.executable
745  else:
746  self._python = python
747  self._host = host
748  self._output = output
749  self._all_slaves = []
750  self._starting_slaves = {}
751  self._slave_arrays = []
752  if host:
753  self._host = host
754  else:
755 # Get primary IP address of this machine
756  self._host = socket.gethostbyname_ex(socket.gethostname())[-1][0]
757  self._listen_sock = _ListenSocket(self._host, self.connect_timeout)
758 
759  def add_slave(self, slave):
760  """Add a Slave object."""
761  if hasattr(slave, '_get_slaves'):
762  self._slave_arrays.append(slave)
763  else:
764  self._all_slaves.append(slave)
765 
766  def get_context(self, startup=None):
767  """Create and return a new Context in which tasks can be run.
768  @param startup If not None, a callable (Python function or class
769  that implements the \_\_call\_\_ method) that sets up
770  the slave to run tasks. This method is only called
771  once per slave. The return values from this method
772  are passed to the task object when it runs on
773  the slave.
774  @return A new Context object.
775  """
776  return Context(self, startup)
777 
778  def _get_results_unordered(self, context):
779  """Run all of a context's tasks, and yield results"""
780  self._send_tasks_to_slaves(context)
781  try:
782  while True:
783  for task in self._get_finished_tasks(context):
784  tasks_queued = len(context._tasks)
785  yield task._results
786 # If the user added more tasks while processing these
787 # results, make sure they get sent off to the slaves
788  if len(context._tasks) > tasks_queued:
789  self._send_tasks_to_slaves(context)
790  except _NoMoreTasksError:
791  return
792 
793  def _start_all_slaves(self):
794  for array in self._slave_arrays:
795  self._all_slaves.extend(array._get_slaves())
796 
797  command = ("%s -c \"import IMP.parallel.slave_handler as s; s.main()\" "
798  "%s %d") % (self._python, self._host, self._listen_sock.port)
799 
800  for (num, slave) in enumerate(self._all_slaves):
801  if slave._ready_to_start():
802  unique_id = self._get_unique_id(num)
803  self._starting_slaves[unique_id] = slave
804  slave._start(command, unique_id, self._output % num)
805 
806  for array in self._slave_arrays:
807  array._start(command)
808  self._slave_arrays = []
809 
810  def _get_unique_id(self, num):
811  id = "%d:" % num
812  for i in range(0, 8):
813  id += chr(random.randint(0, 25) + ord('A'))
814  return id
815 
816  def _send_tasks_to_slaves(self, context):
817  self._start_all_slaves()
818 # Prefer slaves that already have the task context
819  available_slaves = [a for a in self._all_slaves
820  if a._ready_for_task(context)] + \
821  [a for a in self._all_slaves
822  if a._ready_for_task(None)]
823  for slave in available_slaves:
824  if len(context._tasks) == 0:
825  break
826  else:
827  self._send_task_to_slave(slave, context)
828 
829  def _send_task_to_slave(self, slave, context):
830  if len(context._tasks) == 0:
831  return
832  t = context._tasks[0]
833  try:
834  slave._start_task(t, context)
835  context._tasks.pop(0)
836  except socket.error as detail:
837  slave._kill()
838 
839  def _get_finished_tasks(self, context):
840  while True:
841  events = self._get_network_events(context)
842  if len(events) == 0:
843  self._kill_all_running_slaves(context)
844  for event in events:
845  task = self._process_event(event, context)
846  if task:
847  yield task
848 
849  def _process_event(self, event, context):
850  if event == self._listen_sock:
851 # New slave just connected
852  (conn, addr) = self._listen_sock.accept()
853  new_slave = self._accept_slave(conn, context)
854  elif event._running_task(context):
855  try:
856  task = event._get_finished_task()
857  if task:
858  self._send_task_to_slave(event, context)
859  return task
860  else: # the slave sent back a heartbeat
861  self._kill_timed_out_slaves(context)
862  except NetworkError as detail:
863  task = event._kill()
864  print("Slave %s failed (%s): rescheduling task %s" \
865  % (str(event), str(detail), str(task)))
866  context._tasks.append(task)
867  self._send_tasks_to_slaves(context)
868  else:
869  pass # Slave not running a task
870 
871  def _kill_timed_out_slaves(self, context):
872  timed_out = [a for a in self._all_slaves if a._running_task(context) \
873  and a.get_contact_timed_out(self.heartbeat_timeout)]
874  for slave in timed_out:
875  task = slave._kill()
876  print("Did not hear from slave %s in %d seconds; rescheduling "
877  "task %s" % (str(slave), self.heartbeat_timeout, str(task)))
878  context._tasks.append(task)
879  if len(timed_out) > 0:
880  self._send_tasks_to_slaves(context)
881 
882  def _kill_all_running_slaves(self, context):
883  running = [a for a in self._all_slaves if a._running_task(context)]
884  for slave in running:
885  task = slave._kill()
886  context._tasks.append(task)
887  raise NetworkError("Did not hear from any running slave in "
888  "%d seconds" % self.heartbeat_timeout)
889 
890  def _accept_slave(self, sock, context):
891  sock.setblocking(True)
892  identifier = sock.recv(1024)
893  if identifier:
894  identifier = identifier.decode('ascii')
895  if identifier and identifier in self._starting_slaves:
896  slave = self._starting_slaves.pop(identifier)
897  slave._accept_connection(sock)
898  print("Identified slave %s " % str(slave))
899  self._init_slave(slave)
900  self._send_task_to_slave(slave, context)
901  return slave
902  else:
903  print("Ignoring request from unknown slave")
904 
905  def _init_slave(self, slave):
906  if _import_time_path[0] != '':
907  slave._set_python_search_path(_import_time_path[0])
908  if sys.path[0] != '' and sys.path[0] != _import_time_path[0]:
909  slave._set_python_search_path(sys.path[0])
910 
911  def _get_network_events(self, context):
912  running = [a for a in self._all_slaves if a._running_task(context)]
913  if len(running) == 0:
914  if len(context._tasks) == 0:
915  raise _NoMoreTasksError()
916  elif len(self._starting_slaves) == 0:
917  raise NoMoreSlavesError("Ran out of slaves to run tasks")
918 # Otherwise, wait for starting slaves to connect back and get tasks
919 
920  return util._poll_events(self._listen_sock, running,
921  self.heartbeat_timeout)
922 
923 
924 
925 def get_module_version():
926  """get_module_version() -> std::string const"""
927  return _IMP_parallel.get_module_version()
928 
929 def get_example_path(fname):
930  """get_example_path(std::string fname) -> std::string"""
931  return _IMP_parallel.get_example_path(fname)
932 
933 def get_data_path(fname):
934  """get_data_path(std::string fname) -> std::string"""
935  return _IMP_parallel.get_data_path(fname)
936 
937 from . import _version_check
938 _version_check.check_version(get_module_version())
939 __version__ = get_module_version()
940 
941 
942 
Representation of an array of slaves.
std::string get_data_path(std::string file_name)
Return the full path to one of this module's data files.
Error raised if a slave has an unhandled exception.
Subprocess handling.
Definition: subproc.py:1
def __init__
Constructor.
An array of slaves in a Sun Grid Engine system parallel environment.
A collection of tasks that run in the same environment.
Representation of a single slave.
std::string get_example_path(std::string file_name)
Return the full path to one of this module's example files.
def add_task
Add a task to this context.
Utilities for the IMP.parallel module.
Definition: util.py:1
Base class for all errors specific to the parallel module.
The general base class for IMP exceptions.
Definition: exception.h:49
def add_slave
Add a Slave object.
def get_context
Create and return a new Context in which tasks can be run.
Error raised if a problem occurs with the network.
Error raised if all slaves failed, so tasks cannot be run.
def __init__
Constructor.
Distribute IMP tasks to multiple processors or machines.
An array of slaves on a Sun Grid Engine system, started with 'qsub'.
def get_results_unordered
Run all of the tasks on available slaves, and return results.
Manages slaves and contexts.
A slave running on the same machine as the master.