IMP logo
IMP Reference Guide  2.6.0
The Integrative Modeling Platform
parallel/__init__.py
1 # This file was automatically generated by SWIG (http://www.swig.org).
2 # Version 3.0.7
3 #
4 # Do not make changes to this file unless you know what you are doing--modify
5 # the SWIG interface file instead.
6 
7 # This wrapper is part of IMP,
8 # Copyright 2007-2016 IMP Inventors. All rights reserved.
9 
10 from __future__ import print_function, division, absolute_import
11 
12 
13 
14 
15 from sys import version_info
16 if version_info >= (2, 6, 0):
17  def swig_import_helper():
18  from os.path import dirname
19  import imp
20  fp = None
21  try:
22  fp, pathname, description = imp.find_module('_IMP_parallel', [dirname(__file__)])
23  except ImportError:
24  import _IMP_parallel
25  return _IMP_parallel
26  if fp is not None:
27  try:
28  _mod = imp.load_module('_IMP_parallel', fp, pathname, description)
29  finally:
30  fp.close()
31  return _mod
32  _IMP_parallel = swig_import_helper()
33  del swig_import_helper
34 else:
35  import _IMP_parallel
36 del version_info
37 try:
38  _swig_property = property
39 except NameError:
40  pass # Python < 2.2 doesn't have 'property'.
41 
42 
43 def _swig_setattr_nondynamic(self, class_type, name, value, static=1):
44  if (name == "thisown"):
45  return self.this.own(value)
46  if (name == "this"):
47  if type(value).__name__ == 'SwigPyObject':
48  self.__dict__[name] = value
49  return
50  method = class_type.__swig_setmethods__.get(name, None)
51  if method:
52  return method(self, value)
53  if (not static):
54  object.__setattr__(self, name, value)
55  else:
56  raise AttributeError("You cannot add attributes to %s" % self)
57 
58 
59 def _swig_setattr(self, class_type, name, value):
60  return _swig_setattr_nondynamic(self, class_type, name, value, 0)
61 
62 
63 def _swig_getattr_nondynamic(self, class_type, name, static=1):
64  if (name == "thisown"):
65  return self.this.own()
66  method = class_type.__swig_getmethods__.get(name, None)
67  if method:
68  return method(self)
69  if (not static):
70  return object.__getattr__(self, name)
71  else:
72  raise AttributeError(name)
73 
74 def _swig_getattr(self, class_type, name):
75  return _swig_getattr_nondynamic(self, class_type, name, 0)
76 
77 
78 def _swig_repr(self):
79  try:
80  strthis = "proxy of " + self.this.__repr__()
81  except:
82  strthis = ""
83  return "<%s.%s; %s >" % (self.__class__.__module__, self.__class__.__name__, strthis,)
84 
85 try:
86  _object = object
87  _newclass = 1
88 except AttributeError:
89  class _object:
90  pass
91  _newclass = 0
92 
93 
94 
95 def _swig_setattr_nondynamic_method(set):
96  def set_attr(self, name, value):
97  if (name == "thisown"):
98  return self.this.own(value)
99  if hasattr(self, name) or (name == "this"):
100  set(self, name, value)
101  else:
102  raise AttributeError("You cannot add attributes to %s" % self)
103  return set_attr
104 
105 
106 try:
107  import weakref
108  weakref_proxy = weakref.proxy
109 except:
110  weakref_proxy = lambda x: x
111 
112 
113 class IMP_PARALLEL_SwigPyIterator(object):
114  """Proxy of C++ swig::IMP_PARALLEL_SwigPyIterator class"""
115  thisown = _swig_property(lambda x: x.this.own(), lambda x, v: x.this.own(v), doc='The membership flag')
116 
117  def __init__(self, *args, **kwargs):
118  raise AttributeError("No constructor defined - class is abstract")
119  __repr__ = _swig_repr
120  __swig_destroy__ = _IMP_parallel.delete_IMP_PARALLEL_SwigPyIterator
121  __del__ = lambda self: None
122 
123  def value(self):
124  """value(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
125  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_value(self)
126 
127 
128  def incr(self, n=1):
129  """
130  incr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
131  incr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
132  """
133  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_incr(self, n)
134 
135 
136  def decr(self, n=1):
137  """
138  decr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
139  decr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
140  """
141  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_decr(self, n)
142 
143 
144  def distance(self, x):
145  """distance(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t"""
146  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_distance(self, x)
147 
148 
149  def equal(self, x):
150  """equal(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
151  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_equal(self, x)
152 
153 
154  def copy(self):
155  """copy(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator"""
156  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_copy(self)
157 
158 
159  def next(self):
160  """next(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
161  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_next(self)
162 
163 
164  def __next__(self):
165  """__next__(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
166  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___next__(self)
167 
168 
169  def previous(self):
170  """previous(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
171  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_previous(self)
172 
173 
174  def advance(self, n):
175  """advance(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
176  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_advance(self, n)
177 
178 
179  def __eq__(self, x):
180  """__eq__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
181  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___eq__(self, x)
182 
183 
184  def __ne__(self, x):
185  """__ne__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
186  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___ne__(self, x)
187 
188 
189  def __iadd__(self, n):
190  """__iadd__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
191  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___iadd__(self, n)
192 
193 
194  def __isub__(self, n):
195  """__isub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
196  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___isub__(self, n)
197 
198 
199  def __add__(self, n):
200  """__add__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
201  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___add__(self, n)
202 
203 
204  def __sub__(self, *args):
205  """
206  __sub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator
207  __sub__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t
208  """
209  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___sub__(self, *args)
210 
211  def __iter__(self):
212  return self
213 IMP_PARALLEL_SwigPyIterator_swigregister = _IMP_parallel.IMP_PARALLEL_SwigPyIterator_swigregister
214 IMP_PARALLEL_SwigPyIterator_swigregister(IMP_PARALLEL_SwigPyIterator)
215 
216 
217 _value_types=[]
218 _object_types=[]
219 _raii_types=[]
220 _plural_types=[]
221 
222 
223 _IMP_parallel.IMP_DEBUG_swigconstant(_IMP_parallel)
224 IMP_DEBUG = _IMP_parallel.IMP_DEBUG
225 
226 _IMP_parallel.IMP_RELEASE_swigconstant(_IMP_parallel)
227 IMP_RELEASE = _IMP_parallel.IMP_RELEASE
228 
229 _IMP_parallel.IMP_SILENT_swigconstant(_IMP_parallel)
230 IMP_SILENT = _IMP_parallel.IMP_SILENT
231 
232 _IMP_parallel.IMP_PROGRESS_swigconstant(_IMP_parallel)
233 IMP_PROGRESS = _IMP_parallel.IMP_PROGRESS
234 
235 _IMP_parallel.IMP_TERSE_swigconstant(_IMP_parallel)
236 IMP_TERSE = _IMP_parallel.IMP_TERSE
237 
238 _IMP_parallel.IMP_VERBOSE_swigconstant(_IMP_parallel)
239 IMP_VERBOSE = _IMP_parallel.IMP_VERBOSE
240 
241 _IMP_parallel.IMP_MEMORY_swigconstant(_IMP_parallel)
242 IMP_MEMORY = _IMP_parallel.IMP_MEMORY
243 
244 _IMP_parallel.IMP_NONE_swigconstant(_IMP_parallel)
245 IMP_NONE = _IMP_parallel.IMP_NONE
246 
247 _IMP_parallel.IMP_USAGE_swigconstant(_IMP_parallel)
248 IMP_USAGE = _IMP_parallel.IMP_USAGE
249 
250 _IMP_parallel.IMP_INTERNAL_swigconstant(_IMP_parallel)
251 IMP_INTERNAL = _IMP_parallel.IMP_INTERNAL
252 
253 _IMP_parallel.IMP_KERNEL_HAS_LOG4CXX_swigconstant(_IMP_parallel)
254 IMP_KERNEL_HAS_LOG4CXX = _IMP_parallel.IMP_KERNEL_HAS_LOG4CXX
255 
256 _IMP_parallel.IMP_COMPILER_HAS_AUTO_swigconstant(_IMP_parallel)
257 IMP_COMPILER_HAS_AUTO = _IMP_parallel.IMP_COMPILER_HAS_AUTO
258 
259 _IMP_parallel.IMP_COMPILER_HAS_DEBUG_VECTOR_swigconstant(_IMP_parallel)
260 IMP_COMPILER_HAS_DEBUG_VECTOR = _IMP_parallel.IMP_COMPILER_HAS_DEBUG_VECTOR
261 
262 _IMP_parallel.IMP_COMPILER_HAS_UNIQUE_PTR_swigconstant(_IMP_parallel)
263 IMP_COMPILER_HAS_UNIQUE_PTR = _IMP_parallel.IMP_COMPILER_HAS_UNIQUE_PTR
264 
265 _IMP_parallel.IMP_KERNEL_HAS_BOOST_RANDOM_swigconstant(_IMP_parallel)
266 IMP_KERNEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_KERNEL_HAS_BOOST_RANDOM
267 
268 _IMP_parallel.IMP_KERNEL_HAS_GPERFTOOLS_swigconstant(_IMP_parallel)
269 IMP_KERNEL_HAS_GPERFTOOLS = _IMP_parallel.IMP_KERNEL_HAS_GPERFTOOLS
270 
271 _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPCHECKER_swigconstant(_IMP_parallel)
272 IMP_KERNEL_HAS_TCMALLOC_HEAPCHECKER = _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPCHECKER
273 
274 _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPPROFILER_swigconstant(_IMP_parallel)
275 IMP_KERNEL_HAS_TCMALLOC_HEAPPROFILER = _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPPROFILER
276 
277 _IMP_parallel.IMPKERNEL_SHOW_WARNINGS_swigconstant(_IMP_parallel)
278 IMPKERNEL_SHOW_WARNINGS = _IMP_parallel.IMPKERNEL_SHOW_WARNINGS
279 
280 import sys
281 class _DirectorObjects(object):
282  """@internal Simple class to keep references to director objects
283  to prevent premature deletion."""
284  def __init__(self):
285  self._objects = []
286  def register(self, obj):
287  """Take a reference to a director object; will only work for
288  refcounted C++ classes"""
289  if hasattr(obj, 'get_ref_count'):
290  self._objects.append(obj)
291  def cleanup(self):
292  """Only drop our reference and allow cleanup by Python if no other
293  Python references exist (we hold 3 references: one in self._objects,
294  one in x, and one in the argument list for getrefcount) *and* no
295  other C++ references exist (the Python object always holds one)"""
296  objs = [x for x in self._objects if sys.getrefcount(x) > 3 \
297  or x.get_ref_count() > 1]
298 # Do in two steps so the references are kept until the end of the
299 # function (deleting references may trigger a fresh call to this method)
300  self._objects = objs
301  def get_object_count(self):
302  """Get number of director objects (useful for testing only)"""
303  return len(self._objects)
304 _director_objects = _DirectorObjects()
305 
306 class _ostream(object):
307  """Proxy of C++ std::ostream class"""
308  thisown = _swig_property(lambda x: x.this.own(), lambda x, v: x.this.own(v), doc='The membership flag')
309 
310  def __init__(self, *args, **kwargs):
311  raise AttributeError("No constructor defined")
312  __repr__ = _swig_repr
313 
314  def write(self, osa_buf):
315  """write(_ostream self, char const * osa_buf)"""
316  return _IMP_parallel._ostream_write(self, osa_buf)
317 
318 _ostream_swigregister = _IMP_parallel._ostream_swigregister
319 _ostream_swigregister(_ostream)
320 
321 
322 _IMP_parallel.IMP_COMPILER_HAS_OVERRIDE_swigconstant(_IMP_parallel)
323 IMP_COMPILER_HAS_OVERRIDE = _IMP_parallel.IMP_COMPILER_HAS_OVERRIDE
324 
325 _IMP_parallel.IMP_COMPILER_HAS_FINAL_swigconstant(_IMP_parallel)
326 IMP_COMPILER_HAS_FINAL = _IMP_parallel.IMP_COMPILER_HAS_FINAL
327 
328 _IMP_parallel.IMP_HAS_NOEXCEPT_swigconstant(_IMP_parallel)
329 IMP_HAS_NOEXCEPT = _IMP_parallel.IMP_HAS_NOEXCEPT
330 import IMP
331 
332 _IMP_parallel.IMP_PARALLEL_HAS_BOOST_FILESYSTEM_swigconstant(_IMP_parallel)
333 IMP_PARALLEL_HAS_BOOST_FILESYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_FILESYSTEM
334 
335 _IMP_parallel.IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS_swigconstant(_IMP_parallel)
336 IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS
337 
338 _IMP_parallel.IMP_PARALLEL_HAS_BOOST_RANDOM_swigconstant(_IMP_parallel)
339 IMP_PARALLEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_RANDOM
340 
341 _IMP_parallel.IMP_PARALLEL_HAS_BOOST_SYSTEM_swigconstant(_IMP_parallel)
342 IMP_PARALLEL_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_SYSTEM
343 
344 _IMP_parallel.IMPPARALLEL_SHOW_WARNINGS_swigconstant(_IMP_parallel)
345 IMPPARALLEL_SHOW_WARNINGS = _IMP_parallel.IMPPARALLEL_SHOW_WARNINGS
346 
347 
348 import socket
349 import time
350 import sys
351 import os
352 import re
353 import random
354 import socket
355 import xdrlib
356 try:
357  import cPickle as pickle
358 except ImportError:
359  import pickle
360 from IMP.parallel import slavestate
361 from IMP.parallel.subproc import _run_background, _Popen4
362 from IMP.parallel.util import _ListenSocket, _ErrorWrapper
363 from IMP.parallel.util import _TaskWrapper, _HeartBeat, _ContextWrapper
364 from IMP.parallel.util import _SetPathAction
365 
366 # Save sys.path at import time, so that slaves can import using the same
367 # path that works for the master imports
368 _import_time_path = sys.path[:]
369 
371  """Base class for all errors specific to the parallel module"""
372  pass
373 
374 
375 class _NoMoreTasksError(Error):
376  pass
377 
378 
380  """Error raised if all slaves failed, so tasks cannot be run"""
381  pass
382 
383 
385  """Error raised if a problem occurs with the network"""
386  pass
387 
388 
390  """Error raised if a slave has an unhandled exception"""
391  def __init__(self, exc, traceback, slave):
392  self.exc = exc
393  self.traceback = traceback
394  self.slave = slave
395 
396  def __str__(self):
397  errstr = str(self.exc.__class__).replace("exceptions.", "")
398  return "%s: %s from %s\nRemote traceback:\n%s" \
399  % (errstr, str(self.exc), str(self.slave), self.traceback)
400 
401 class _Communicator(object):
402  """Simple support for sending Python pickled objects over the network"""
403 
404  def __init__(self):
405  self._socket = None
406  self._ibuffer = b''
407 
408  def _send(self, obj):
409  p = xdrlib.Packer()
410  try:
411  p.pack_string(pickle.dumps(obj, -1))
412 # Python < 2.5 can fail trying to send Inf or NaN floats in binary
413 # mode, so fall back to the old protocol in this case:
414  except SystemError:
415  p.pack_string(pickle.dumps(obj, 0))
416  self._socket.sendall(p.get_buffer())
417 
418  def get_data_pending(self):
419  return len(self._ibuffer) > 0
420 
421  def _recv(self):
422  while True:
423  try:
424  obj, self._ibuffer = self._unpickle(self._ibuffer)
425  if isinstance(obj, _ErrorWrapper):
426  raise RemoteError(obj.obj, obj.traceback, self)
427  else:
428  return obj
429  except (IndexError, EOFError):
430  try:
431  data = self._socket.recv(4096)
432  except socket.error as detail:
433  raise NetworkError("Connection lost to %s: %s" \
434  % (str(self), str(detail)))
435  if len(data) > 0:
436  self._ibuffer += data
437  else:
438  raise NetworkError("%s closed connection" % str(self))
439 
440  def _unpickle(self, ibuffer):
441  p = xdrlib.Unpacker(ibuffer)
442  obj = p.unpack_string()
443  return (pickle.loads(obj), ibuffer[p.get_position():])
444 
445 
446 class Slave(_Communicator):
447  """Representation of a single slave.
448  Each slave uses a single thread of execution (i.e. a single CPU core)
449  to run tasks sequentially.
450  Slave is an abstract class; instead of using this class directly, use
451  a subclass such as LocalSlave or SGEQsubSlaveArray."""
452 
453  def __init__(self):
454  _Communicator.__init__(self)
455  self._state = slavestate.init
456  self._context = None
457  self._task = None
458  self.update_contact_time()
459 
460  def _start(self, command, unique_id, output):
461  """Start the slave running on the remote host; override in subclasses"""
462  self._state = slavestate.started
463 
464  def _accept_connection(self, sock):
465  self._socket = sock
466  self._state = slavestate.connected
467  self.update_contact_time()
468 
469  def _set_python_search_path(self, path):
470  self._send(_SetPathAction(path))
471 
472  def update_contact_time(self):
473  self.last_contact_time = time.time()
474 
475  def get_contact_timed_out(self, timeout):
476  return (time.time() - self.last_contact_time) > timeout
477 
478  def _start_task(self, task, context):
479  if not self._ready_for_task(context) and not self._ready_for_task(None):
480  raise TypeError("%s not ready for task" % str(self))
481  if self._context != context:
482  self._context = context
483  self._send(_ContextWrapper(context._startup))
484  self._state = slavestate.running_task
485  self._task = task
486  self._send(_TaskWrapper(task))
487 
488  def _get_finished_task(self):
489  while True:
490  r = self._recv()
491  self.update_contact_time()
492  if isinstance(r, _HeartBeat):
493  if not self.get_data_pending():
494  return None
495  else:
496  break
497  task = self._task
498  task._results = r
499  self._task = None
500  self._state = slavestate.connected
501  return task
502 
503  def _kill(self):
504  task = self._task
505  self._task = None
506  self._context = None
507  self._state = slavestate.dead
508  return task
509 
510  def _ready_to_start(self):
511  return self._state == slavestate.init
512 
513  def _ready_for_task(self, context):
514  return self._state == slavestate.connected \
515  and self._context == context
516 
517  def _running_task(self, context):
518  return self._state == slavestate.running_task \
519  and self._context == context
520 
521 
522 class SlaveArray(object):
523  """Representation of an array of slaves.
524  This is similar to Slave, except that it represents a collection of
525  slaves that are controlled together, such as a batch submission system
526  array job on a compute cluster.
527  Slave is an abstract class; instead of using this class directly, use
528  a subclass such as SGEQsubSlaveArray."""
529 
530  def _get_slaves(self):
531  """Return a list of Slave objects contained within this array"""
532  pass
533 
534  def _start(self):
535  """Do any necessary startup after all contained Slaves have started"""
536  pass
537 
538 
539 class LocalSlave(Slave):
540  """A slave running on the same machine as the master."""
541 
542  def _start(self, command, unique_id, output):
543  Slave._start(self, command, unique_id, output)
544  cmdline = "%s %s" % (command, unique_id)
545  _run_background(cmdline, output)
546 
547  def __repr__(self):
548  return "<LocalSlave>"
549 
550 
551 class _SGEQsubSlave(Slave):
552  def __init__(self, array):
553  Slave.__init__(self)
554  self._jobid = None
555  self._array = array
556 
557  def _start(self, command, unique_id, output):
558  Slave._start(self, command, unique_id, output)
559  self._array._slave_started(unique_id, output, self)
560 
561  def __repr__(self):
562  jobid = self._jobid
563  if jobid is None:
564  jobid = '(unknown)'
565  return "<SGE qsub slave, ID %s>" % jobid
566 
567 
569  """An array of slaves on a Sun Grid Engine system, started with 'qsub'.
570  To use this class, the master process must be running on a machine that
571  can submit Sun Grid Engine (SGE) jobs using the 'qsub' command (this
572  is termed a 'submit host' by SGE). The class starts an SGE job array
573  (every slave has the same SGE job ID, but a different task ID).
574  """
575 
576 
577  standard_options = '-j y -cwd -r n -o sge-errors'
578 
579  def __init__(self, numslave, options):
580  """Constructor.
581  @param numslave The number of slaves, which corresponds to the
582  number of tasks in the SGE job.
583  @param options A string of SGE options that are passed on the 'qsub'
584  command line. This is added to standard_options.
585  """
586  self._numslave = numslave
587  self._options = options
588  self._starting_slaves = []
589  self._jobid = None
590 
591  def _get_slaves(self):
592  """Return a list of Slave objects contained within this array"""
593  return [_SGEQsubSlave(self) for x in range(self._numslave)]
594 
595  def _slave_started(self, command, output, slave):
596  self._starting_slaves.append((command, output, slave))
597 
598  def _start(self, command):
599  qsub = "qsub -S /bin/sh %s %s -t 1-%d" % \
600  (self._options, self.standard_options,
601  len(self._starting_slaves))
602  print(qsub)
603  a = _Popen4(qsub)
604  (inp, out) = (a.stdin, a.stdout)
605  slave_uid = " ".join([repr(s[0]) for s in self._starting_slaves])
606  slave_out = " ".join([repr(s[1]) for s in self._starting_slaves])
607  inp.write("#!/bin/sh\n")
608  inp.write("uid=( '' %s )\n" % slave_uid)
609  inp.write("out=( '' %s )\n" % slave_out)
610  inp.write("myuid=${uid[$SGE_TASK_ID]}\n")
611  inp.write("myout=${out[$SGE_TASK_ID]}\n")
612  inp.write("%s $myuid > $myout 2>&1\n" % command)
613  inp.close()
614  outlines = out.readlines()
615  out.close()
616  for line in outlines:
617  print(line.rstrip('\r\n'))
618  a.require_clean_exit()
619  self._set_jobid(outlines)
620  self._starting_slaves = []
621 
622  def _set_jobid(self, outlines):
623  """Try to figure out the job ID from the SGE qsub output"""
624  if len(outlines) > 0:
625  m = re.compile(r"\d+").search(outlines[0])
626  if m:
627  self._jobid = int(m.group())
628  for (num, slave) in enumerate(self._starting_slaves):
629  slave[2]._jobid = "%d.%d" % (self._jobid, num+1)
630 
631 
632 class _SGEPESlave(Slave):
633  def __init__(self, host):
634  Slave.__init__(self)
635  self._host = host
636 
637  def _start(self, command, unique_id, output):
638  Slave._start(self, command, unique_id, output)
639  cmdline = "qrsh -inherit -V %s %s %s" % (self._host, command, unique_id)
640  _run_background(cmdline, output)
641 
642  def __repr__(self):
643  return "<SGE PE slave on %s>" % self._host
644 
645 
647  """An array of slaves in a Sun Grid Engine system parallel environment.
648  In order to use this class, the master must be run via Sun Grid Engine's
649  'qsub' command and submitted to a parallel environment using the qsub
650  -pe option. This class will start slaves on every node in the parallel
651  environment (including the node running the master). Each slave is
652  started using the 'qrsh' command with the '-inherit' option."""
653 
654  def _get_slaves(self):
655  slaves = []
656 
657  pe = os.environ['PE_HOSTFILE']
658  fh = open(pe, "r")
659  while True:
660  line = fh.readline()
661  if line == '':
662  break
663  (node, num, queue) = line.split(None, 2)
664  for i in range(int(num)):
665  slaves.append(_SGEPESlave(node))
666 # Replace first slave with a local slave, as this is ourself, and SGE
667 # won't let us start this process with qrsh (as we are already
668 # occupying the slot)
669  if len(slaves) > 0:
670  slaves[0] = LocalSlave()
671  return slaves
672 
673 
674 class Context(object):
675  """A collection of tasks that run in the same environment.
676  Context objects are typically created by calling Manager::get_context().
677  """
678  def __init__(self, manager, startup=None):
679  """Constructor."""
680  self._manager = manager
681  self._startup = startup
682  self._tasks = []
683 
684  def add_task(self, task):
685  """Add a task to this context.
686  Tasks are any Python callable object that can be pickled (e.g. a
687  function or a class that implements the \_\_call\_\_ method). When
688  the task is run on the slave its arguments are the return value
689  from this context's startup function."""
690  self._tasks.append(task)
691 
692  def get_results_unordered(self):
693  """Run all of the tasks on available slaves, and return results.
694  If there are more tasks than slaves, subsequent tasks are
695  started only once a running task completes: each slave only runs
696  a single task at a time. As each task completes, the return value(s)
697  from the task callable are returned from this method, as a
698  Python generator. Note that the results are returned in the order
699  that tasks complete, which may not be the same as the order they
700  were submitted in.
702  \exception NoMoreSlavesError there are no slaves available
703  to run the tasks (or they all failed during execution).
704  \exception RemoteError a slave encountered an unhandled exception.
705  \exception NetworkError the master lost (or was unable to
706  establish) communication with any slave.
707  """
708  return self._manager._get_results_unordered(self)
709 
710 
711 class Manager(object):
712  """Manages slaves and contexts.
713  """
714 
715  connect_timeout = 7200
716 
717 # Note: must be higher than that in slave_handler._HeartBeatThread
718  heartbeat_timeout = 7200
719 
720  def __init__(self, python=None, host=None, output='slave%d.output'):
721  """Constructor.
722  @param python If not None, the command to run to start a Python
723  interpreter that can import the IMP module. Otherwise,
724  the same interpreter that the master is currently using
725  is used. This is passed to the shell, so a full command
726  line (including multiple words separated by spaces) can
727  be used if necessary.
728  @param host The hostname that slaves use to connect back to the
729  master. If not specified, the master machine's primary
730  IP address is used. On multi-homed machines, such as
731  compute cluster headnodes, this may need to be changed
732  to allow all slaves to reach the master (typically the
733  name of the machine's internal network address is
734  needed). If only running local slaves, 'localhost' can
735  be used to prohibit connections across the network.
736  @param output A format string used to name slave output files. It is
737  given the numeric slave id, so for example the default
738  value 'slave\%d.output' will yield output files called
739  slave0.output, slave1.output, etc.
740  """
741  if python is None:
742  self._python = sys.executable
743  else:
744  self._python = python
745  self._host = host
746  self._output = output
747  self._all_slaves = []
748  self._starting_slaves = {}
749  self._slave_arrays = []
750  if host:
751  self._host = host
752  else:
753 # Get primary IP address of this machine
754  self._host = socket.gethostbyname_ex(socket.gethostname())[-1][0]
755  self._listen_sock = _ListenSocket(self._host, self.connect_timeout)
756 
757  def add_slave(self, slave):
758  """Add a Slave object."""
759  if hasattr(slave, '_get_slaves'):
760  self._slave_arrays.append(slave)
761  else:
762  self._all_slaves.append(slave)
763 
764  def get_context(self, startup=None):
765  """Create and return a new Context in which tasks can be run.
766  @param startup If not None, a callable (Python function or class
767  that implements the \_\_call\_\_ method) that sets up
768  the slave to run tasks. This method is only called
769  once per slave. The return values from this method
770  are passed to the task object when it runs on
771  the slave.
772  @return A new Context object.
773  """
774  return Context(self, startup)
775 
776  def _get_results_unordered(self, context):
777  """Run all of a context's tasks, and yield results"""
778  self._send_tasks_to_slaves(context)
779  try:
780  while True:
781  for task in self._get_finished_tasks(context):
782  tasks_queued = len(context._tasks)
783  yield task._results
784 # If the user added more tasks while processing these
785 # results, make sure they get sent off to the slaves
786  if len(context._tasks) > tasks_queued:
787  self._send_tasks_to_slaves(context)
788  except _NoMoreTasksError:
789  return
790 
791  def _start_all_slaves(self):
792  for array in self._slave_arrays:
793  self._all_slaves.extend(array._get_slaves())
794 
795  command = ("%s -c \"import IMP.parallel.slave_handler as s; s.main()\" "
796  "%s %d") % (self._python, self._host, self._listen_sock.port)
797 
798  for (num, slave) in enumerate(self._all_slaves):
799  if slave._ready_to_start():
800  unique_id = self._get_unique_id(num)
801  self._starting_slaves[unique_id] = slave
802  slave._start(command, unique_id, self._output % num)
803 
804  for array in self._slave_arrays:
805  array._start(command)
806  self._slave_arrays = []
807 
808  def _get_unique_id(self, num):
809  id = "%d:" % num
810  for i in range(0, 8):
811  id += chr(random.randint(0, 25) + ord('A'))
812  return id
813 
814  def _send_tasks_to_slaves(self, context):
815  self._start_all_slaves()
816 # Prefer slaves that already have the task context
817  available_slaves = [a for a in self._all_slaves
818  if a._ready_for_task(context)] + \
819  [a for a in self._all_slaves
820  if a._ready_for_task(None)]
821  for slave in available_slaves:
822  if len(context._tasks) == 0:
823  break
824  else:
825  self._send_task_to_slave(slave, context)
826 
827  def _send_task_to_slave(self, slave, context):
828  if len(context._tasks) == 0:
829  return
830  t = context._tasks[0]
831  try:
832  slave._start_task(t, context)
833  context._tasks.pop(0)
834  except socket.error as detail:
835  slave._kill()
836 
837  def _get_finished_tasks(self, context):
838  while True:
839  events = self._get_network_events(context)
840  if len(events) == 0:
841  self._kill_all_running_slaves(context)
842  for event in events:
843  task = self._process_event(event, context)
844  if task:
845  yield task
846 
847  def _process_event(self, event, context):
848  if event == self._listen_sock:
849 # New slave just connected
850  (conn, addr) = self._listen_sock.accept()
851  new_slave = self._accept_slave(conn, context)
852  elif event._running_task(context):
853  try:
854  task = event._get_finished_task()
855  if task:
856  self._send_task_to_slave(event, context)
857  return task
858  else: # the slave sent back a heartbeat
859  self._kill_timed_out_slaves(context)
860  except NetworkError as detail:
861  task = event._kill()
862  print("Slave %s failed (%s): rescheduling task %s" \
863  % (str(event), str(detail), str(task)))
864  context._tasks.append(task)
865  self._send_tasks_to_slaves(context)
866  else:
867  pass # Slave not running a task
868 
869  def _kill_timed_out_slaves(self, context):
870  timed_out = [a for a in self._all_slaves if a._running_task(context) \
871  and a.get_contact_timed_out(self.heartbeat_timeout)]
872  for slave in timed_out:
873  task = slave._kill()
874  print("Did not hear from slave %s in %d seconds; rescheduling "
875  "task %s" % (str(slave), self.heartbeat_timeout, str(task)))
876  context._tasks.append(task)
877  if len(timed_out) > 0:
878  self._send_tasks_to_slaves(context)
879 
880  def _kill_all_running_slaves(self, context):
881  running = [a for a in self._all_slaves if a._running_task(context)]
882  for slave in running:
883  task = slave._kill()
884  context._tasks.append(task)
885  raise NetworkError("Did not hear from any running slave in "
886  "%d seconds" % self.heartbeat_timeout)
887 
888  def _accept_slave(self, sock, context):
889  sock.setblocking(True)
890  identifier = sock.recv(1024)
891  if identifier:
892  identifier = identifier.decode('ascii')
893  if identifier and identifier in self._starting_slaves:
894  slave = self._starting_slaves.pop(identifier)
895  slave._accept_connection(sock)
896  print("Identified slave %s " % str(slave))
897  self._init_slave(slave)
898  self._send_task_to_slave(slave, context)
899  return slave
900  else:
901  print("Ignoring request from unknown slave")
902 
903  def _init_slave(self, slave):
904  if _import_time_path[0] != '':
905  slave._set_python_search_path(_import_time_path[0])
906  if sys.path[0] != '' and sys.path[0] != _import_time_path[0]:
907  slave._set_python_search_path(sys.path[0])
908 
909  def _get_network_events(self, context):
910  running = [a for a in self._all_slaves if a._running_task(context)]
911  if len(running) == 0:
912  if len(context._tasks) == 0:
913  raise _NoMoreTasksError()
914  elif len(self._starting_slaves) == 0:
915  raise NoMoreSlavesError("Ran out of slaves to run tasks")
916 # Otherwise, wait for starting slaves to connect back and get tasks
917 
918  return util._poll_events(self._listen_sock, running,
919  self.heartbeat_timeout)
920 
921 
922 
923 def get_module_version():
924  """get_module_version() -> std::string const"""
925  return _IMP_parallel.get_module_version()
926 
927 def get_example_path(fname):
928  """get_example_path(std::string fname) -> std::string"""
929  return _IMP_parallel.get_example_path(fname)
930 
931 def get_data_path(fname):
932  """get_data_path(std::string fname) -> std::string"""
933  return _IMP_parallel.get_data_path(fname)
934 
935 from . import _version_check
936 _version_check.check_version(get_module_version())
937 __version__ = get_module_version()
938 
939 
940 
Representation of an array of slaves.
std::string get_data_path(std::string file_name)
Return the full path to one of this module's data files.
Error raised if a slave has an unhandled exception.
Subprocess handling.
Definition: subproc.py:1
def __init__
Constructor.
An array of slaves in a Sun Grid Engine system parallel environment.
A collection of tasks that run in the same environment.
Representation of a single slave.
std::string get_example_path(std::string file_name)
Return the full path to one of this module's example files.
def add_task
Add a task to this context.
Utilities for the IMP.parallel module.
Definition: util.py:1
Base class for all errors specific to the parallel module.
The general base class for IMP exceptions.
Definition: exception.h:49
def add_slave
Add a Slave object.
def get_context
Create and return a new Context in which tasks can be run.
Error raised if a problem occurs with the network.
Error raised if all slaves failed, so tasks cannot be run.
def __init__
Constructor.
Distribute IMP tasks to multiple processors or machines.
An array of slaves on a Sun Grid Engine system, started with 'qsub'.
def get_results_unordered
Run all of the tasks on available slaves, and return results.
Manages slaves and contexts.
A slave running on the same machine as the master.