IMP logo
IMP Reference Guide  2.5.0
The Integrative Modeling Platform
parallel/__init__.py
1 # This file was automatically generated by SWIG (http://www.swig.org).
2 # Version 3.0.5
3 #
4 # Do not make changes to this file unless you know what you are doing--modify
5 # the SWIG interface file instead.
6 
7 # This wrapper is part of IMP,
8 # Copyright 2007-2015 IMP Inventors. All rights reserved.
9 
10 from __future__ import print_function, division, absolute_import
11 
12 
13 
14 
15 from sys import version_info
16 if version_info >= (2, 6, 0):
17  def swig_import_helper():
18  from os.path import dirname
19  import imp
20  fp = None
21  try:
22  fp, pathname, description = imp.find_module('_IMP_parallel', [dirname(__file__)])
23  except ImportError:
24  import _IMP_parallel
25  return _IMP_parallel
26  if fp is not None:
27  try:
28  _mod = imp.load_module('_IMP_parallel', fp, pathname, description)
29  finally:
30  fp.close()
31  return _mod
32  _IMP_parallel = swig_import_helper()
33  del swig_import_helper
34 else:
35  import _IMP_parallel
36 del version_info
37 try:
38  _swig_property = property
39 except NameError:
40  pass # Python < 2.2 doesn't have 'property'.
41 
42 
43 def _swig_setattr_nondynamic(self, class_type, name, value, static=1):
44  if (name == "thisown"):
45  return self.this.own(value)
46  if (name == "this"):
47  if type(value).__name__ == 'SwigPyObject':
48  self.__dict__[name] = value
49  return
50  method = class_type.__swig_setmethods__.get(name, None)
51  if method:
52  return method(self, value)
53  if (not static):
54  object.__setattr__(self, name, value)
55  else:
56  raise AttributeError("You cannot add attributes to %s" % self)
57 
58 
59 def _swig_setattr(self, class_type, name, value):
60  return _swig_setattr_nondynamic(self, class_type, name, value, 0)
61 
62 
63 def _swig_getattr_nondynamic(self, class_type, name, static=1):
64  if (name == "thisown"):
65  return self.this.own()
66  method = class_type.__swig_getmethods__.get(name, None)
67  if method:
68  return method(self)
69  if (not static):
70  return object.__getattr__(self, name)
71  else:
72  raise AttributeError(name)
73 
74 def _swig_getattr(self, class_type, name):
75  return _swig_getattr_nondynamic(self, class_type, name, 0)
76 
77 
78 def _swig_repr(self):
79  try:
80  strthis = "proxy of " + self.this.__repr__()
81  except:
82  strthis = ""
83  return "<%s.%s; %s >" % (self.__class__.__module__, self.__class__.__name__, strthis,)
84 
85 try:
86  _object = object
87  _newclass = 1
88 except AttributeError:
89  class _object:
90  pass
91  _newclass = 0
92 
93 
94 
95 def _swig_setattr_nondynamic_method(set):
96  def set_attr(self, name, value):
97  if (name == "thisown"):
98  return self.this.own(value)
99  if hasattr(self, name) or (name == "this"):
100  set(self, name, value)
101  else:
102  raise AttributeError("You cannot add attributes to %s" % self)
103  return set_attr
104 
105 
106 try:
107  import weakref
108  weakref_proxy = weakref.proxy
109 except:
110  weakref_proxy = lambda x: x
111 
112 
113 class IMP_PARALLEL_SwigPyIterator(object):
114  """Proxy of C++ swig::IMP_PARALLEL_SwigPyIterator class"""
115  thisown = _swig_property(lambda x: x.this.own(), lambda x, v: x.this.own(v), doc='The membership flag')
116 
117  def __init__(self, *args, **kwargs):
118  raise AttributeError("No constructor defined - class is abstract")
119  __repr__ = _swig_repr
120  __swig_destroy__ = _IMP_parallel.delete_IMP_PARALLEL_SwigPyIterator
121  __del__ = lambda self: None
122 
123  def value(self):
124  """value(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
125  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_value(self)
126 
127 
128  def incr(self, n=1):
129  """
130  incr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
131  incr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
132  """
133  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_incr(self, n)
134 
135 
136  def decr(self, n=1):
137  """
138  decr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
139  decr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
140  """
141  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_decr(self, n)
142 
143 
144  def distance(self, x):
145  """distance(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t"""
146  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_distance(self, x)
147 
148 
149  def equal(self, x):
150  """equal(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
151  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_equal(self, x)
152 
153 
154  def copy(self):
155  """copy(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator"""
156  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_copy(self)
157 
158 
159  def next(self):
160  """next(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
161  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_next(self)
162 
163 
164  def __next__(self):
165  """__next__(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
166  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___next__(self)
167 
168 
169  def previous(self):
170  """previous(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
171  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_previous(self)
172 
173 
174  def advance(self, n):
175  """advance(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
176  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_advance(self, n)
177 
178 
179  def __eq__(self, x):
180  """__eq__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
181  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___eq__(self, x)
182 
183 
184  def __ne__(self, x):
185  """__ne__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
186  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___ne__(self, x)
187 
188 
189  def __iadd__(self, n):
190  """__iadd__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
191  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___iadd__(self, n)
192 
193 
194  def __isub__(self, n):
195  """__isub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
196  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___isub__(self, n)
197 
198 
199  def __add__(self, n):
200  """__add__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
201  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___add__(self, n)
202 
203 
204  def __sub__(self, *args):
205  """
206  __sub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator
207  __sub__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t
208  """
209  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___sub__(self, *args)
210 
211  def __iter__(self):
212  return self
213 IMP_PARALLEL_SwigPyIterator_swigregister = _IMP_parallel.IMP_PARALLEL_SwigPyIterator_swigregister
214 IMP_PARALLEL_SwigPyIterator_swigregister(IMP_PARALLEL_SwigPyIterator)
215 
216 _value_types=[]
217 _object_types=[]
218 _raii_types=[]
219 _plural_types=[]
220 
221 
222 _IMP_parallel.IMP_DEBUG_swigconstant(_IMP_parallel)
223 IMP_DEBUG = _IMP_parallel.IMP_DEBUG
224 
225 _IMP_parallel.IMP_RELEASE_swigconstant(_IMP_parallel)
226 IMP_RELEASE = _IMP_parallel.IMP_RELEASE
227 
228 _IMP_parallel.IMP_SILENT_swigconstant(_IMP_parallel)
229 IMP_SILENT = _IMP_parallel.IMP_SILENT
230 
231 _IMP_parallel.IMP_PROGRESS_swigconstant(_IMP_parallel)
232 IMP_PROGRESS = _IMP_parallel.IMP_PROGRESS
233 
234 _IMP_parallel.IMP_TERSE_swigconstant(_IMP_parallel)
235 IMP_TERSE = _IMP_parallel.IMP_TERSE
236 
237 _IMP_parallel.IMP_VERBOSE_swigconstant(_IMP_parallel)
238 IMP_VERBOSE = _IMP_parallel.IMP_VERBOSE
239 
240 _IMP_parallel.IMP_MEMORY_swigconstant(_IMP_parallel)
241 IMP_MEMORY = _IMP_parallel.IMP_MEMORY
242 
243 _IMP_parallel.IMP_NONE_swigconstant(_IMP_parallel)
244 IMP_NONE = _IMP_parallel.IMP_NONE
245 
246 _IMP_parallel.IMP_USAGE_swigconstant(_IMP_parallel)
247 IMP_USAGE = _IMP_parallel.IMP_USAGE
248 
249 _IMP_parallel.IMP_INTERNAL_swigconstant(_IMP_parallel)
250 IMP_INTERNAL = _IMP_parallel.IMP_INTERNAL
251 
252 _IMP_parallel.IMP_KERNEL_HAS_LOG4CXX_swigconstant(_IMP_parallel)
253 IMP_KERNEL_HAS_LOG4CXX = _IMP_parallel.IMP_KERNEL_HAS_LOG4CXX
254 
255 _IMP_parallel.IMP_COMPILER_HAS_AUTO_swigconstant(_IMP_parallel)
256 IMP_COMPILER_HAS_AUTO = _IMP_parallel.IMP_COMPILER_HAS_AUTO
257 
258 _IMP_parallel.IMP_COMPILER_HAS_DEBUG_VECTOR_swigconstant(_IMP_parallel)
259 IMP_COMPILER_HAS_DEBUG_VECTOR = _IMP_parallel.IMP_COMPILER_HAS_DEBUG_VECTOR
260 
261 _IMP_parallel.IMP_KERNEL_HAS_BOOST_RANDOM_swigconstant(_IMP_parallel)
262 IMP_KERNEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_KERNEL_HAS_BOOST_RANDOM
263 
264 _IMP_parallel.IMP_KERNEL_HAS_GPERFTOOLS_swigconstant(_IMP_parallel)
265 IMP_KERNEL_HAS_GPERFTOOLS = _IMP_parallel.IMP_KERNEL_HAS_GPERFTOOLS
266 
267 _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPCHECKER_swigconstant(_IMP_parallel)
268 IMP_KERNEL_HAS_TCMALLOC_HEAPCHECKER = _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPCHECKER
269 
270 _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPPROFILER_swigconstant(_IMP_parallel)
271 IMP_KERNEL_HAS_TCMALLOC_HEAPPROFILER = _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPPROFILER
272 
273 _IMP_parallel.IMPKERNEL_SHOW_WARNINGS_swigconstant(_IMP_parallel)
274 IMPKERNEL_SHOW_WARNINGS = _IMP_parallel.IMPKERNEL_SHOW_WARNINGS
275 import sys
276 class _DirectorObjects(object):
277  """@internal Simple class to keep references to director objects
278  to prevent premature deletion."""
279  def __init__(self):
280  self._objects = []
281  def register(self, obj):
282  """Take a reference to a director object; will only work for
283  refcounted C++ classes"""
284  if hasattr(obj, 'get_ref_count'):
285  self._objects.append(obj)
286  def cleanup(self):
287  """Only drop our reference and allow cleanup by Python if no other
288  Python references exist (we hold 3 references: one in self._objects,
289  one in x, and one in the argument list for getrefcount) *and* no
290  other C++ references exist (the Python object always holds one)"""
291  objs = [x for x in self._objects if sys.getrefcount(x) > 3 \
292  or x.get_ref_count() > 1]
293  # Do in two steps so the references are kept until the end of the
294  # function (deleting references may trigger a fresh call to this method)
295  self._objects = objs
296  def get_object_count(self):
297  """Get number of director objects (useful for testing only)"""
298  return len(self._objects)
299 _director_objects = _DirectorObjects()
300 
301 class _ostream(object):
302  """Proxy of C++ std::ostream class"""
303  thisown = _swig_property(lambda x: x.this.own(), lambda x, v: x.this.own(v), doc='The membership flag')
304 
305  def __init__(self, *args, **kwargs):
306  raise AttributeError("No constructor defined")
307  __repr__ = _swig_repr
308 
309  def write(self, osa_buf):
310  """write(_ostream self, char const * osa_buf)"""
311  return _IMP_parallel._ostream_write(self, osa_buf)
312 
313 _ostream_swigregister = _IMP_parallel._ostream_swigregister
314 _ostream_swigregister(_ostream)
315 
316 
317 _IMP_parallel.IMP_COMPILER_HAS_OVERRIDE_swigconstant(_IMP_parallel)
318 IMP_COMPILER_HAS_OVERRIDE = _IMP_parallel.IMP_COMPILER_HAS_OVERRIDE
319 
320 _IMP_parallel.IMP_COMPILER_HAS_FINAL_swigconstant(_IMP_parallel)
321 IMP_COMPILER_HAS_FINAL = _IMP_parallel.IMP_COMPILER_HAS_FINAL
322 
323 _IMP_parallel.IMP_HAS_NOEXCEPT_swigconstant(_IMP_parallel)
324 IMP_HAS_NOEXCEPT = _IMP_parallel.IMP_HAS_NOEXCEPT
325 import IMP
326 
327 _IMP_parallel.IMP_PARALLEL_HAS_BOOST_FILESYSTEM_swigconstant(_IMP_parallel)
328 IMP_PARALLEL_HAS_BOOST_FILESYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_FILESYSTEM
329 
330 _IMP_parallel.IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS_swigconstant(_IMP_parallel)
331 IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS
332 
333 _IMP_parallel.IMP_PARALLEL_HAS_BOOST_RANDOM_swigconstant(_IMP_parallel)
334 IMP_PARALLEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_RANDOM
335 
336 _IMP_parallel.IMP_PARALLEL_HAS_BOOST_SYSTEM_swigconstant(_IMP_parallel)
337 IMP_PARALLEL_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_SYSTEM
338 
339 _IMP_parallel.IMPPARALLEL_SHOW_WARNINGS_swigconstant(_IMP_parallel)
340 IMPPARALLEL_SHOW_WARNINGS = _IMP_parallel.IMPPARALLEL_SHOW_WARNINGS
341 import socket
342 import time
343 import sys
344 import os
345 import re
346 import random
347 import socket
348 import xdrlib
349 try:
350  import cPickle as pickle
351 except ImportError:
352  import pickle
353 from IMP.parallel import slavestate
354 from IMP.parallel.subproc import _run_background, _Popen4
355 from IMP.parallel.util import _ListenSocket, _ErrorWrapper
356 from IMP.parallel.util import _TaskWrapper, _HeartBeat, _ContextWrapper
357 from IMP.parallel.util import _SetPathAction
358 
359 # Save sys.path at import time, so that slaves can import using the same
360 # path that works for the master imports
361 _import_time_path = sys.path[:]
362 
364  """Base class for all errors specific to the parallel module"""
365  pass
366 
367 
368 class _NoMoreTasksError(Error):
369  pass
370 
371 
373  """Error raised if all slaves failed, so tasks cannot be run"""
374  pass
375 
376 
378  """Error raised if a problem occurs with the network"""
379  pass
380 
381 
383  """Error raised if a slave has an unhandled exception"""
384  def __init__(self, exc, traceback, slave):
385  self.exc = exc
386  self.traceback = traceback
387  self.slave = slave
388 
389  def __str__(self):
390  errstr = str(self.exc.__class__).replace("exceptions.", "")
391  return "%s: %s from %s\nRemote traceback:\n%s" \
392  % (errstr, str(self.exc), str(self.slave), self.traceback)
393 
394 class _Communicator(object):
395  """Simple support for sending Python pickled objects over the network"""
396 
397  def __init__(self):
398  self._socket = None
399  self._ibuffer = b''
400 
401  def _send(self, obj):
402  p = xdrlib.Packer()
403  try:
404  p.pack_string(pickle.dumps(obj, -1))
405  # Python < 2.5 can fail trying to send Inf or NaN floats in binary
406  # mode, so fall back to the old protocol in this case:
407  except SystemError:
408  p.pack_string(pickle.dumps(obj, 0))
409  self._socket.sendall(p.get_buffer())
410 
411  def get_data_pending(self):
412  return len(self._ibuffer) > 0
413 
414  def _recv(self):
415  while True:
416  try:
417  obj, self._ibuffer = self._unpickle(self._ibuffer)
418  if isinstance(obj, _ErrorWrapper):
419  raise RemoteError(obj.obj, obj.traceback, self)
420  else:
421  return obj
422  except (IndexError, EOFError):
423  try:
424  data = self._socket.recv(4096)
425  except socket.error as detail:
426  raise NetworkError("Connection lost to %s: %s" \
427  % (str(self), str(detail)))
428  if len(data) > 0:
429  self._ibuffer += data
430  else:
431  raise NetworkError("%s closed connection" % str(self))
432 
433  def _unpickle(self, ibuffer):
434  p = xdrlib.Unpacker(ibuffer)
435  obj = p.unpack_string()
436  return (pickle.loads(obj), ibuffer[p.get_position():])
437 
438 
439 class Slave(_Communicator):
440  """Representation of a single slave.
441  Each slave uses a single thread of execution (i.e. a single CPU core)
442  to run tasks sequentially.
443  Slave is an abstract class; instead of using this class directly, use
444  a subclass such as LocalSlave or SGEQsubSlaveArray."""
445 
446  def __init__(self):
447  _Communicator.__init__(self)
448  self._state = slavestate.init
449  self._context = None
450  self._task = None
451  self.update_contact_time()
452 
453  def _start(self, command, unique_id, output):
454  """Start the slave running on the remote host; override in subclasses"""
455  self._state = slavestate.started
456 
457  def _accept_connection(self, sock):
458  self._socket = sock
459  self._state = slavestate.connected
460  self.update_contact_time()
461 
462  def _set_python_search_path(self, path):
463  self._send(_SetPathAction(path))
464 
465  def update_contact_time(self):
466  self.last_contact_time = time.time()
467 
468  def get_contact_timed_out(self, timeout):
469  return (time.time() - self.last_contact_time) > timeout
470 
471  def _start_task(self, task, context):
472  if not self._ready_for_task(context) and not self._ready_for_task(None):
473  raise TypeError("%s not ready for task" % str(self))
474  if self._context != context:
475  self._context = context
476  self._send(_ContextWrapper(context._startup))
477  self._state = slavestate.running_task
478  self._task = task
479  self._send(_TaskWrapper(task))
480 
481  def _get_finished_task(self):
482  while True:
483  r = self._recv()
484  self.update_contact_time()
485  if isinstance(r, _HeartBeat):
486  if not self.get_data_pending():
487  return None
488  else:
489  break
490  task = self._task
491  task._results = r
492  self._task = None
493  self._state = slavestate.connected
494  return task
495 
496  def _kill(self):
497  task = self._task
498  self._task = None
499  self._context = None
500  self._state = slavestate.dead
501  return task
502 
503  def _ready_to_start(self):
504  return self._state == slavestate.init
505 
506  def _ready_for_task(self, context):
507  return self._state == slavestate.connected \
508  and self._context == context
509 
510  def _running_task(self, context):
511  return self._state == slavestate.running_task \
512  and self._context == context
513 
514 
515 class SlaveArray(object):
516  """Representation of an array of slaves.
517  This is similar to Slave, except that it represents a collection of
518  slaves that are controlled together, such as a batch submission system
519  array job on a compute cluster.
520  Slave is an abstract class; instead of using this class directly, use
521  a subclass such as SGEQsubSlaveArray."""
522 
523  def _get_slaves(self):
524  """Return a list of Slave objects contained within this array"""
525  pass
526 
527  def _start(self):
528  """Do any necessary startup after all contained Slaves have started"""
529  pass
530 
531 
532 class LocalSlave(Slave):
533  """A slave running on the same machine as the master."""
534 
535  def _start(self, command, unique_id, output):
536  Slave._start(self, command, unique_id, output)
537  cmdline = "%s %s" % (command, unique_id)
538  _run_background(cmdline, output)
539 
540  def __repr__(self):
541  return "<LocalSlave>"
542 
543 
544 class _SGEQsubSlave(Slave):
545  def __init__(self, array):
546  Slave.__init__(self)
547  self._jobid = None
548  self._array = array
549 
550  def _start(self, command, unique_id, output):
551  Slave._start(self, command, unique_id, output)
552  self._array._slave_started(unique_id, output, self)
553 
554  def __repr__(self):
555  jobid = self._jobid
556  if jobid is None:
557  jobid = '(unknown)'
558  return "<SGE qsub slave, ID %s>" % jobid
559 
560 
562  """An array of slaves on a Sun Grid Engine system, started with 'qsub'.
563  To use this class, the master process must be running on a machine that
564  can submit Sun Grid Engine (SGE) jobs using the 'qsub' command (this
565  is termed a 'submit host' by SGE). The class starts an SGE job array
566  (every slave has the same SGE job ID, but a different task ID).
567  """
568 
569 
570  standard_options = '-j y -cwd -r n -o sge-errors'
571 
572  def __init__(self, numslave, options):
573  """Constructor.
574  @param numslave The number of slaves, which correponds to the
575  number of tasks in the SGE job.
576  @param options A string of SGE options that are passed on the 'qsub'
577  command line. This is added to standard_options.
578  """
579  self._numslave = numslave
580  self._options = options
581  self._starting_slaves = []
582  self._jobid = None
583 
584  def _get_slaves(self):
585  """Return a list of Slave objects contained within this array"""
586  return [_SGEQsubSlave(self) for x in range(self._numslave)]
587 
588  def _slave_started(self, command, output, slave):
589  self._starting_slaves.append((command, output, slave))
590 
591  def _start(self, command):
592  qsub = "qsub -S /bin/sh %s %s -t 1-%d" % \
593  (self._options, self.standard_options,
594  len(self._starting_slaves))
595  print(qsub)
596  a = _Popen4(qsub)
597  (inp, out) = (a.stdin, a.stdout)
598  slave_uid = " ".join([repr(s[0]) for s in self._starting_slaves])
599  slave_out = " ".join([repr(s[1]) for s in self._starting_slaves])
600  inp.write("#!/bin/sh\n")
601  inp.write("uid=( '' %s )\n" % slave_uid)
602  inp.write("out=( '' %s )\n" % slave_out)
603  inp.write("myuid=${uid[$SGE_TASK_ID]}\n")
604  inp.write("myout=${out[$SGE_TASK_ID]}\n")
605  inp.write("%s $myuid > $myout 2>&1\n" % command)
606  inp.close()
607  outlines = out.readlines()
608  out.close()
609  for line in outlines:
610  print(line.rstrip('\r\n'))
611  a.require_clean_exit()
612  self._set_jobid(outlines)
613  self._starting_slaves = []
614 
615  def _set_jobid(self, outlines):
616  """Try to figure out the job ID from the SGE qsub output"""
617  if len(outlines) > 0:
618  m = re.compile(r"\d+").search(outlines[0])
619  if m:
620  self._jobid = int(m.group())
621  for (num, slave) in enumerate(self._starting_slaves):
622  slave[2]._jobid = "%d.%d" % (self._jobid, num+1)
623 
624 
625 class _SGEPESlave(Slave):
626  def __init__(self, host):
627  Slave.__init__(self)
628  self._host = host
629 
630  def _start(self, command, unique_id, output):
631  Slave._start(self, command, unique_id, output)
632  cmdline = "qrsh -inherit -V %s %s %s" % (self._host, command, unique_id)
633  _run_background(cmdline, output)
634 
635  def __repr__(self):
636  return "<SGE PE slave on %s>" % self._host
637 
638 
640  """An array of slaves in a Sun Grid Engine system parallel environment.
641  In order to use this class, the master must be run via Sun Grid Engine's
642  'qsub' command and submitted to a parallel environment using the qsub
643  -pe option. This class will start slaves on every node in the parallel
644  environment (including the node running the master). Each slave is
645  started using the 'qrsh' command with the '-inherit' option."""
646 
647  def _get_slaves(self):
648  slaves = []
649 
650  pe = os.environ['PE_HOSTFILE']
651  fh = open(pe, "r")
652  while True:
653  line = fh.readline()
654  if line == '':
655  break
656  (node, num, queue) = line.split(None, 2)
657  for i in range(int(num)):
658  slaves.append(_SGEPESlave(node))
659  # Replace first slave with a local slave, as this is ourself, and SGE
660  # won't let us start this process with qrsh (as we are already
661  # occupying the slot)
662  if len(slaves) > 0:
663  slaves[0] = LocalSlave()
664  return slaves
665 
666 
667 class Context(object):
668  """A collection of tasks that run in the same environment.
669  Context objects are typically created by calling Manager::get_context().
670  """
671  def __init__(self, manager, startup=None):
672  """Constructor."""
673  self._manager = manager
674  self._startup = startup
675  self._tasks = []
676 
677  def add_task(self, task):
678  """Add a task to this context.
679  Tasks are any Python callable object that can be pickled (e.g. a
680  function or a class that implements the \_\_call\_\_ method). When
681  the task is run on the slave its arguments are the return value
682  from this context's startup function."""
683  self._tasks.append(task)
684 
685  def get_results_unordered(self):
686  """Run all of the tasks on available slaves, and return results.
687  If there are more tasks than slaves, subsequent tasks are
688  started only once a running task completes: each slave only runs
689  a single task at a time. As each task completes, the return value(s)
690  from the task callable are returned from this method, as a
691  Python generator. Note that the results are returned in the order
692  that tasks complete, which may not be the same as the order they
693  were submitted in.
695  \exception NoMoreSlavesError there are no slaves available
696  to run the tasks (or they all failed during execution).
697  \exception RemoteError a slave encountered an unhandled exception.
698  \exception NetworkError the master lost (or was unable to
699  establish) communication with any slave.
700  """
701  return self._manager._get_results_unordered(self)
702 
703 
704 class Manager(object):
705  """Manages slaves and contexts.
706  """
707 
708  connect_timeout = 7200
709 
710  # Note: must be higher than that in slave_handler._HeartBeatThread
711  heartbeat_timeout = 7200
712 
713  def __init__(self, python=None, host=None, output='slave%d.output'):
714  """Constructor.
715  @param python If not None, the command to run to start a Python
716  interpreter that can import the IMP module. Otherwise,
717  the same interpreter that the master is currently using
718  is used. This is passed to the shell, so a full command
719  line (including multiple words separated by spaces) can
720  be used if necessary.
721  @param host The hostname that slaves use to connect back to the
722  master. If not specified, the master machine's primary
723  IP address is used. On multi-homed machines, such as
724  compute cluster headnodes, this may need to be changed
725  to allow all slaves to reach the master (typically the
726  name of the machine's internal network address is
727  needed). If only running local slaves, 'localhost' can
728  be used to prohibit connections across the network.
729  @param output A format string used to name slave output files. It is
730  given the numeric slave id, so for example the default
731  value 'slave\%d.output' will yield output files called
732  slave0.output, slave1.output, etc.
733  """
734  if python is None:
735  self._python = sys.executable
736  else:
737  self._python = python
738  self._host = host
739  self._output = output
740  self._all_slaves = []
741  self._starting_slaves = {}
742  self._slave_arrays = []
743  if host:
744  self._host = host
745  else:
746  # Get primary IP address of this machine
747  self._host = socket.gethostbyname_ex(socket.gethostname())[-1][0]
748  self._listen_sock = _ListenSocket(self._host, self.connect_timeout)
749 
750  def add_slave(self, slave):
751  """Add a Slave object."""
752  if hasattr(slave, '_get_slaves'):
753  self._slave_arrays.append(slave)
754  else:
755  self._all_slaves.append(slave)
756 
757  def get_context(self, startup=None):
758  """Create and return a new Context in which tasks can be run.
759  @param startup If not None, a callable (Python function or class
760  that implements the \_\_call\_\_ method) that sets up
761  the slave to run tasks. This method is only called
762  once per slave. The return values from this method
763  are passed to the task object when it runs on
764  the slave.
765  @return A new Context object.
766  """
767  return Context(self, startup)
768 
769  def _get_results_unordered(self, context):
770  """Run all of a context's tasks, and yield results"""
771  self._send_tasks_to_slaves(context)
772  try:
773  while True:
774  for task in self._get_finished_tasks(context):
775  tasks_queued = len(context._tasks)
776  yield task._results
777  # If the user added more tasks while processing these
778  # results, make sure they get sent off to the slaves
779  if len(context._tasks) > tasks_queued:
780  self._send_tasks_to_slaves(context)
781  except _NoMoreTasksError:
782  return
783 
784  def _start_all_slaves(self):
785  for array in self._slave_arrays:
786  self._all_slaves.extend(array._get_slaves())
787 
788  command = ("%s -c \"import IMP.parallel.slave_handler as s; s.main()\" "
789  "%s %d") % (self._python, self._host, self._listen_sock.port)
790 
791  for (num, slave) in enumerate(self._all_slaves):
792  if slave._ready_to_start():
793  unique_id = self._get_unique_id(num)
794  self._starting_slaves[unique_id] = slave
795  slave._start(command, unique_id, self._output % num)
796 
797  for array in self._slave_arrays:
798  array._start(command)
799  self._slave_arrays = []
800 
801  def _get_unique_id(self, num):
802  id = "%d:" % num
803  for i in range(0, 8):
804  id += chr(random.randint(0, 25) + ord('A'))
805  return id
806 
807  def _send_tasks_to_slaves(self, context):
808  self._start_all_slaves()
809  # Prefer slaves that already have the task context
810  available_slaves = [a for a in self._all_slaves
811  if a._ready_for_task(context)] + \
812  [a for a in self._all_slaves
813  if a._ready_for_task(None)]
814  for slave in available_slaves:
815  if len(context._tasks) == 0:
816  break
817  else:
818  self._send_task_to_slave(slave, context)
819 
820  def _send_task_to_slave(self, slave, context):
821  if len(context._tasks) == 0:
822  return
823  t = context._tasks[0]
824  try:
825  slave._start_task(t, context)
826  context._tasks.pop(0)
827  except socket.error as detail:
828  slave._kill()
829 
830  def _get_finished_tasks(self, context):
831  while True:
832  events = self._get_network_events(context)
833  if len(events) == 0:
834  self._kill_all_running_slaves(context)
835  for event in events:
836  task = self._process_event(event, context)
837  if task:
838  yield task
839 
840  def _process_event(self, event, context):
841  if event == self._listen_sock:
842  # New slave just connected
843  (conn, addr) = self._listen_sock.accept()
844  new_slave = self._accept_slave(conn, context)
845  elif event._running_task(context):
846  try:
847  task = event._get_finished_task()
848  if task:
849  self._send_task_to_slave(event, context)
850  return task
851  else: # the slave sent back a heartbeat
852  self._kill_timed_out_slaves(context)
853  except NetworkError as detail:
854  task = event._kill()
855  print("Slave %s failed (%s): rescheduling task %s" \
856  % (str(event), str(detail), str(task)))
857  context._tasks.append(task)
858  self._send_tasks_to_slaves(context)
859  else:
860  pass # Slave not running a task
861 
862  def _kill_timed_out_slaves(self, context):
863  timed_out = [a for a in self._all_slaves if a._running_task(context) \
864  and a.get_contact_timed_out(self.heartbeat_timeout)]
865  for slave in timed_out:
866  task = slave._kill()
867  print("Did not hear from slave %s in %d seconds; rescheduling "
868  "task %s" % (str(slave), self.heartbeat_timeout, str(task)))
869  context._tasks.append(task)
870  if len(timed_out) > 0:
871  self._send_tasks_to_slaves(context)
872 
873  def _kill_all_running_slaves(self, context):
874  running = [a for a in self._all_slaves if a._running_task(context)]
875  for slave in running:
876  task = slave._kill()
877  context._tasks.append(task)
878  raise NetworkError("Did not hear from any running slave in "
879  "%d seconds" % self.heartbeat_timeout)
880 
881  def _accept_slave(self, sock, context):
882  sock.setblocking(True)
883  identifier = sock.recv(1024)
884  if identifier:
885  identifier = identifier.decode('ascii')
886  if identifier and identifier in self._starting_slaves:
887  slave = self._starting_slaves.pop(identifier)
888  slave._accept_connection(sock)
889  print("Identified slave %s " % str(slave))
890  self._init_slave(slave)
891  self._send_task_to_slave(slave, context)
892  return slave
893  else:
894  print("Ignoring request from unknown slave")
895 
896  def _init_slave(self, slave):
897  if _import_time_path[0] != '':
898  slave._set_python_search_path(_import_time_path[0])
899  if sys.path[0] != '' and sys.path[0] != _import_time_path[0]:
900  slave._set_python_search_path(sys.path[0])
901 
902  def _get_network_events(self, context):
903  running = [a for a in self._all_slaves if a._running_task(context)]
904  if len(running) == 0:
905  if len(context._tasks) == 0:
906  raise _NoMoreTasksError()
907  elif len(self._starting_slaves) == 0:
908  raise NoMoreSlavesError("Ran out of slaves to run tasks")
909  # Otherwise, wait for starting slaves to connect back and get tasks
910 
911  return util._poll_events(self._listen_sock, running,
912  self.heartbeat_timeout)
913 
914 
915 
916 def get_module_version():
917  """get_module_version() -> std::string const"""
918  return _IMP_parallel.get_module_version()
919 
920 def get_example_path(fname):
921  """get_example_path(std::string fname) -> std::string"""
922  return _IMP_parallel.get_example_path(fname)
923 
924 def get_data_path(fname):
925  """get_data_path(std::string fname) -> std::string"""
926  return _IMP_parallel.get_data_path(fname)
927 from . import _version_check
928 _version_check.check_version(get_module_version())
929 __version__ = get_module_version()
930 
931 
932 
Representation of an array of slaves.
std::string get_data_path(std::string file_name)
Return the full path to one of this module's data files.
Error raised if a slave has an unhandled exception.
Subprocess handling.
Definition: subproc.py:1
def __init__
Constructor.
An array of slaves in a Sun Grid Engine system parallel environment.
A collection of tasks that run in the same environment.
Representation of a single slave.
std::string get_example_path(std::string file_name)
Return the full path to one of this module's example files.
def add_task
Add a task to this context.
Utilities for the IMP.parallel module.
Definition: util.py:1
Base class for all errors specific to the parallel module.
The general base class for IMP exceptions.
Definition: exception.h:49
def add_slave
Add a Slave object.
def get_context
Create and return a new Context in which tasks can be run.
Error raised if a problem occurs with the network.
Error raised if all slaves failed, so tasks cannot be run.
def __init__
Constructor.
Distribute IMP tasks to multiple processors or machines.
An array of slaves on a Sun Grid Engine system, started with 'qsub'.
def get_results_unordered
Run all of the tasks on available slaves, and return results.
Manages slaves and contexts.
A slave running on the same machine as the master.