IMP  2.4.0
The Integrative Modeling Platform
parallel/__init__.py
1 # This file was automatically generated by SWIG (http://www.swig.org).
2 # Version 3.0.2
3 #
4 # Do not make changes to this file unless you know what you are doing--modify
5 # the SWIG interface file instead.
6 
7 
8 
9 
10 
11 from sys import version_info
12 if version_info >= (2,6,0):
13  def swig_import_helper():
14  from os.path import dirname
15  import imp
16  fp = None
17  try:
18  fp, pathname, description = imp.find_module('_IMP_parallel', [dirname(__file__)])
19  except ImportError:
20  import _IMP_parallel
21  return _IMP_parallel
22  if fp is not None:
23  try:
24  _mod = imp.load_module('_IMP_parallel', fp, pathname, description)
25  finally:
26  fp.close()
27  return _mod
28  _IMP_parallel = swig_import_helper()
29  del swig_import_helper
30 else:
31  import _IMP_parallel
32 del version_info
33 try:
34  _swig_property = property
35 except NameError:
36  pass # Python < 2.2 doesn't have 'property'.
37 def _swig_setattr_nondynamic(self,class_type,name,value,static=1):
38  if (name == "thisown"): return self.this.own(value)
39  if (name == "this"):
40  if type(value).__name__ == 'SwigPyObject':
41  self.__dict__[name] = value
42  return
43  method = class_type.__swig_setmethods__.get(name,None)
44  if method: return method(self,value)
45  if (not static):
46  self.__dict__[name] = value
47  else:
48  raise AttributeError("You cannot add attributes to %s" % self)
49 
50 def _swig_setattr(self,class_type,name,value):
51  return _swig_setattr_nondynamic(self,class_type,name,value,0)
52 
53 def _swig_getattr(self,class_type,name):
54  if (name == "thisown"): return self.this.own()
55  method = class_type.__swig_getmethods__.get(name,None)
56  if method: return method(self)
57  raise AttributeError(name)
58 
59 def _swig_repr(self):
60  try: strthis = "proxy of " + self.this.__repr__()
61  except: strthis = ""
62  return "<%s.%s; %s >" % (self.__class__.__module__, self.__class__.__name__, strthis,)
63 
64 try:
65  _object = object
66  _newclass = 1
67 except AttributeError:
68  class _object : pass
69  _newclass = 0
70 
71 
72 def _swig_setattr_nondynamic_method(set):
73  def set_attr(self,name,value):
74  if (name == "thisown"): return self.this.own(value)
75  if hasattr(self,name) or (name == "this"):
76  set(self,name,value)
77  else:
78  raise AttributeError("You cannot add attributes to %s" % self)
79  return set_attr
80 
81 
82 try:
83  import weakref
84  weakref_proxy = weakref.proxy
85 except:
86  weakref_proxy = lambda x: x
87 
88 
89 class IMP_PARALLEL_SwigPyIterator(object):
90  """Proxy of C++ swig::IMP_PARALLEL_SwigPyIterator class"""
91  thisown = _swig_property(lambda x: x.this.own(), lambda x, v: x.this.own(v), doc='The membership flag')
92  def __init__(self, *args, **kwargs): raise AttributeError("No constructor defined - class is abstract")
93  __repr__ = _swig_repr
94  __swig_destroy__ = _IMP_parallel.delete_IMP_PARALLEL_SwigPyIterator
95  __del__ = lambda self : None;
96  def value(self):
97  """value(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
98  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_value(self)
99 
100  def incr(self, n=1):
101  """
102  incr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
103  incr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
104  """
105  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_incr(self, n)
106 
107  def decr(self, n=1):
108  """
109  decr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
110  decr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
111  """
112  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_decr(self, n)
113 
114  def distance(self, *args):
115  """distance(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t"""
116  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_distance(self, *args)
117 
118  def equal(self, *args):
119  """equal(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
120  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_equal(self, *args)
121 
122  def copy(self):
123  """copy(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator"""
124  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_copy(self)
125 
126  def next(self):
127  """next(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
128  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_next(self)
129 
130  def __next__(self):
131  """__next__(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
132  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___next__(self)
133 
134  def previous(self):
135  """previous(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
136  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_previous(self)
137 
138  def advance(self, *args):
139  """advance(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
140  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_advance(self, *args)
141 
142  def __eq__(self, *args):
143  """__eq__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
144  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___eq__(self, *args)
145 
146  def __ne__(self, *args):
147  """__ne__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
148  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___ne__(self, *args)
149 
150  def __iadd__(self, *args):
151  """__iadd__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
152  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___iadd__(self, *args)
153 
154  def __isub__(self, *args):
155  """__isub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
156  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___isub__(self, *args)
157 
158  def __add__(self, *args):
159  """__add__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
160  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___add__(self, *args)
161 
162  def __sub__(self, *args):
163  """
164  __sub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator
165  __sub__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t
166  """
167  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___sub__(self, *args)
168 
169  def __iter__(self): return self
170 IMP_PARALLEL_SwigPyIterator_swigregister = _IMP_parallel.IMP_PARALLEL_SwigPyIterator_swigregister
171 IMP_PARALLEL_SwigPyIterator_swigregister(IMP_PARALLEL_SwigPyIterator)
172 
173 _value_types=[]
174 _object_types=[]
175 _raii_types=[]
176 _plural_types=[]
177 
178 IMP_DEBUG = _IMP_parallel.IMP_DEBUG
179 IMP_RELEASE = _IMP_parallel.IMP_RELEASE
180 IMP_SILENT = _IMP_parallel.IMP_SILENT
181 IMP_PROGRESS = _IMP_parallel.IMP_PROGRESS
182 IMP_TERSE = _IMP_parallel.IMP_TERSE
183 IMP_VERBOSE = _IMP_parallel.IMP_VERBOSE
184 IMP_MEMORY = _IMP_parallel.IMP_MEMORY
185 IMP_NONE = _IMP_parallel.IMP_NONE
186 IMP_USAGE = _IMP_parallel.IMP_USAGE
187 IMP_INTERNAL = _IMP_parallel.IMP_INTERNAL
188 IMP_BASE_HAS_LOG4CXX = _IMP_parallel.IMP_BASE_HAS_LOG4CXX
189 IMP_COMPILER_HAS_AUTO = _IMP_parallel.IMP_COMPILER_HAS_AUTO
190 IMP_COMPILER_HAS_DEBUG_VECTOR = _IMP_parallel.IMP_COMPILER_HAS_DEBUG_VECTOR
191 IMP_BASE_HAS_BOOST_RANDOM = _IMP_parallel.IMP_BASE_HAS_BOOST_RANDOM
192 IMP_BASE_HAS_GPERFTOOLS = _IMP_parallel.IMP_BASE_HAS_GPERFTOOLS
193 IMP_BASE_HAS_TCMALLOC_HEAPCHECKER = _IMP_parallel.IMP_BASE_HAS_TCMALLOC_HEAPCHECKER
194 IMP_BASE_HAS_TCMALLOC_HEAPPROFILER = _IMP_parallel.IMP_BASE_HAS_TCMALLOC_HEAPPROFILER
195 IMPBASE_SHOW_WARNINGS = _IMP_parallel.IMPBASE_SHOW_WARNINGS
196 import sys
197 class _DirectorObjects(object):
198  """@internal Simple class to keep references to director objects
199  to prevent premature deletion."""
200  def __init__(self):
201  self._objects = []
202  def register(self, obj):
203  """Take a reference to a director object; will only work for
204  refcounted C++ classes"""
205  if hasattr(obj, 'get_ref_count'):
206  self._objects.append(obj)
207  def cleanup(self):
208  """Only drop our reference and allow cleanup by Python if no other
209  Python references exist (we hold 3 references: one in self._objects,
210  one in x, and one in the argument list for getrefcount) *and* no
211  other C++ references exist (the Python object always holds one)"""
212  objs = [x for x in self._objects if sys.getrefcount(x) > 3 \
213  or x.get_ref_count() > 1]
214  # Do in two steps so the references are kept until the end of the
215  # function (deleting references may trigger a fresh call to this method)
216  self._objects = objs
217  def get_object_count(self):
218  """Get number of director objects (useful for testing only)"""
219  return len(self._objects)
220 _director_objects = _DirectorObjects()
221 
222 class _ostream(object):
223  """Proxy of C++ std::ostream class"""
224  thisown = _swig_property(lambda x: x.this.own(), lambda x, v: x.this.own(v), doc='The membership flag')
225  def __init__(self, *args, **kwargs): raise AttributeError("No constructor defined")
226  __repr__ = _swig_repr
227  def write(self, *args):
228  """write(_ostream self, char const * osa_buf)"""
229  return _IMP_parallel._ostream_write(self, *args)
230 
231 _ostream_swigregister = _IMP_parallel._ostream_swigregister
232 _ostream_swigregister(_ostream)
233 
234 IMP_COMPILER_HAS_OVERRIDE = _IMP_parallel.IMP_COMPILER_HAS_OVERRIDE
235 IMP_COMPILER_HAS_FINAL = _IMP_parallel.IMP_COMPILER_HAS_FINAL
236 IMP_HAS_NOEXCEPT = _IMP_parallel.IMP_HAS_NOEXCEPT
237 import IMP.base
238 IMP_CGAL_HAS_BOOST_FILESYSTEM = _IMP_parallel.IMP_CGAL_HAS_BOOST_FILESYSTEM
239 IMP_CGAL_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_CGAL_HAS_BOOST_PROGRAMOPTIONS
240 IMP_CGAL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_CGAL_HAS_BOOST_RANDOM
241 IMP_CGAL_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_CGAL_HAS_BOOST_SYSTEM
242 IMPCGAL_SHOW_WARNINGS = _IMP_parallel.IMPCGAL_SHOW_WARNINGS
243 import IMP.cgal
244 IMP_ALGEBRA_HAS_IMP_CGAL = _IMP_parallel.IMP_ALGEBRA_HAS_IMP_CGAL
245 IMP_ALGEBRA_HAS_BOOST_FILESYSTEM = _IMP_parallel.IMP_ALGEBRA_HAS_BOOST_FILESYSTEM
246 IMP_ALGEBRA_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_ALGEBRA_HAS_BOOST_PROGRAMOPTIONS
247 IMP_ALGEBRA_HAS_BOOST_RANDOM = _IMP_parallel.IMP_ALGEBRA_HAS_BOOST_RANDOM
248 IMP_ALGEBRA_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_ALGEBRA_HAS_BOOST_SYSTEM
249 IMP_ALGEBRA_HAS_CGAL = _IMP_parallel.IMP_ALGEBRA_HAS_CGAL
250 IMP_ALGEBRA_HAS_ANN = _IMP_parallel.IMP_ALGEBRA_HAS_ANN
251 IMPALGEBRA_SHOW_WARNINGS = _IMP_parallel.IMPALGEBRA_SHOW_WARNINGS
252 import IMP.algebra
253 IMP_KERNEL_HAS_IMP_CGAL = _IMP_parallel.IMP_KERNEL_HAS_IMP_CGAL
254 IMP_KERNEL_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_KERNEL_HAS_BOOST_PROGRAMOPTIONS
255 IMP_KERNEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_KERNEL_HAS_BOOST_RANDOM
256 IMP_KERNEL_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_KERNEL_HAS_BOOST_SYSTEM
257 IMP_KERNEL_HAS_CGAL = _IMP_parallel.IMP_KERNEL_HAS_CGAL
258 IMPKERNEL_SHOW_WARNINGS = _IMP_parallel.IMPKERNEL_SHOW_WARNINGS
259 import IMP.kernel
260 IMP_PARALLEL_HAS_IMP_ALGEBRA = _IMP_parallel.IMP_PARALLEL_HAS_IMP_ALGEBRA
261 IMP_PARALLEL_HAS_IMP_BASE = _IMP_parallel.IMP_PARALLEL_HAS_IMP_BASE
262 IMP_PARALLEL_HAS_IMP_CGAL = _IMP_parallel.IMP_PARALLEL_HAS_IMP_CGAL
263 IMP_PARALLEL_HAS_BOOST_FILESYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_FILESYSTEM
264 IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS
265 IMP_PARALLEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_RANDOM
266 IMP_PARALLEL_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_SYSTEM
267 IMP_PARALLEL_HAS_CGAL = _IMP_parallel.IMP_PARALLEL_HAS_CGAL
268 IMPPARALLEL_SHOW_WARNINGS = _IMP_parallel.IMPPARALLEL_SHOW_WARNINGS
269 import socket
270 import time
271 import sys
272 import os
273 import re
274 import random
275 import socket
276 import xdrlib
277 try:
278  import cPickle as pickle
279 except ImportError:
280  import pickle
281 from IMP.parallel import slavestate
282 from IMP.parallel.subproc import _run_background, _Popen4
283 from IMP.parallel.util import _ListenSocket, _ErrorWrapper
284 from IMP.parallel.util import _TaskWrapper, _HeartBeat, _ContextWrapper
285 from IMP.parallel.util import _SetPathAction
286 
287 # Save sys.path at import time, so that slaves can import using the same
288 # path that works for the master imports
289 _import_time_path = sys.path[:]
290 
291 class Error(Exception):
292  """Base class for all errors specific to the parallel module"""
293  pass
294 
295 
296 class _NoMoreTasksError(Error):
297  pass
298 
299 
301  """Error raised if all slaves failed, so tasks cannot be run"""
302  pass
303 
304 
306  """Error raised if a problem occurs with the network"""
307  pass
308 
309 
311  """Error raised if a slave has an unhandled exception"""
312  def __init__(self, exc, traceback, slave):
313  self.exc = exc
314  self.traceback = traceback
315  self.slave = slave
316 
317  def __str__(self):
318  errstr = str(self.exc.__class__).replace("exceptions.", "")
319  return "%s: %s from %s\nRemote traceback:\n%s" \
320  % (errstr, str(self.exc), str(self.slave), self.traceback)
321 
322 class _Communicator(object):
323  """Simple support for sending Python pickled objects over the network"""
324 
325  def __init__(self):
326  self._socket = None
327  self._ibuffer = b''
328 
329  def _send(self, obj):
330  p = xdrlib.Packer()
331  try:
332  p.pack_string(pickle.dumps(obj, -1))
333  # Python < 2.5 can fail trying to send Inf or NaN floats in binary
334  # mode, so fall back to the old protocol in this case:
335  except SystemError:
336  p.pack_string(pickle.dumps(obj, 0))
337  self._socket.sendall(p.get_buffer())
338 
339  def get_data_pending(self):
340  return len(self._ibuffer) > 0
341 
342  def _recv(self):
343  while True:
344  try:
345  obj, self._ibuffer = self._unpickle(self._ibuffer)
346  if isinstance(obj, _ErrorWrapper):
347  raise RemoteError(obj.obj, obj.traceback, self)
348  else:
349  return obj
350  except (IndexError, EOFError):
351  try:
352  data = self._socket.recv(4096)
353  except socket.error as detail:
354  raise NetworkError("Connection lost to %s: %s" \
355  % (str(self), str(detail)))
356  if len(data) > 0:
357  self._ibuffer += data
358  else:
359  raise NetworkError("%s closed connection" % str(self))
360 
361  def _unpickle(self, ibuffer):
362  p = xdrlib.Unpacker(ibuffer)
363  obj = p.unpack_string()
364  return (pickle.loads(obj), ibuffer[p.get_position():])
365 
366 
367 class Slave(_Communicator):
368  """Representation of a single slave.
369  Each slave uses a single thread of execution (i.e. a single CPU core)
370  to run tasks sequentially.
371  Slave is an abstract class; instead of using this class directly, use
372  a subclass such as LocalSlave or SGEQsubSlaveArray."""
373 
374  def __init__(self):
375  _Communicator.__init__(self)
376  self._state = slavestate.init
377  self._context = None
378  self._task = None
379  self.update_contact_time()
380 
381  def _start(self, command, unique_id, output):
382  """Start the slave running on the remote host; override in subclasses"""
383  self._state = slavestate.started
384 
385  def _accept_connection(self, sock):
386  self._socket = sock
387  self._state = slavestate.connected
388  self.update_contact_time()
389 
390  def _set_python_search_path(self, path):
391  self._send(_SetPathAction(path))
392 
393  def update_contact_time(self):
394  self.last_contact_time = time.time()
395 
396  def get_contact_timed_out(self, timeout):
397  return (time.time() - self.last_contact_time) > timeout
398 
399  def _start_task(self, task, context):
400  if not self._ready_for_task(context) and not self._ready_for_task(None):
401  raise TypeError("%s not ready for task" % str(self))
402  if self._context != context:
403  self._context = context
404  self._send(_ContextWrapper(context._startup))
405  self._state = slavestate.running_task
406  self._task = task
407  self._send(_TaskWrapper(task))
408 
409  def _get_finished_task(self):
410  while True:
411  r = self._recv()
412  self.update_contact_time()
413  if isinstance(r, _HeartBeat):
414  if not self.get_data_pending():
415  return None
416  else:
417  break
418  task = self._task
419  task._results = r
420  self._task = None
421  self._state = slavestate.connected
422  return task
423 
424  def _kill(self):
425  task = self._task
426  self._task = None
427  self._context = None
428  self._state = slavestate.dead
429  return task
430 
431  def _ready_to_start(self):
432  return self._state == slavestate.init
433 
434  def _ready_for_task(self, context):
435  return self._state == slavestate.connected \
436  and self._context == context
437 
438  def _running_task(self, context):
439  return self._state == slavestate.running_task \
440  and self._context == context
441 
442 
443 class SlaveArray(object):
444  """Representation of an array of slaves.
445  This is similar to Slave, except that it represents a collection of
446  slaves that are controlled together, such as a batch submission system
447  array job on a compute cluster.
448  Slave is an abstract class; instead of using this class directly, use
449  a subclass such as SGEQsubSlaveArray."""
450 
451  def _get_slaves(self):
452  """Return a list of Slave objects contained within this array"""
453  pass
454 
455  def _start(self):
456  """Do any necessary startup after all contained Slaves have started"""
457  pass
458 
459 
460 class LocalSlave(Slave):
461  """A slave running on the same machine as the master."""
462 
463  def _start(self, command, unique_id, output):
464  Slave._start(self, command, unique_id, output)
465  cmdline = "%s %s" % (command, unique_id)
466  _run_background(cmdline, output)
467 
468  def __repr__(self):
469  return "<LocalSlave>"
470 
471 
472 class _SGEQsubSlave(Slave):
473  def __init__(self, array):
474  Slave.__init__(self)
475  self._jobid = None
476  self._array = array
477 
478  def _start(self, command, unique_id, output):
479  Slave._start(self, command, unique_id, output)
480  self._array._slave_started(unique_id, output, self)
481 
482  def __repr__(self):
483  jobid = self._jobid
484  if jobid is None:
485  jobid = '(unknown)'
486  return "<SGE qsub slave, ID %s>" % jobid
487 
488 
490  """An array of slaves on a Sun Grid Engine system, started with 'qsub'.
491  To use this class, the master process must be running on a machine that
492  can submit Sun Grid Engine (SGE) jobs using the 'qsub' command (this
493  is termed a 'submit host' by SGE). The class starts an SGE job array
494  (every slave has the same SGE job ID, but a different task ID).
495  """
496 
497 
498  standard_options = '-j y -cwd -r n -o sge-errors'
499 
500  def __init__(self, numslave, options):
501  """Constructor.
502  @param numslave The number of slaves, which correponds to the
503  number of tasks in the SGE job.
504  @param options A string of SGE options that are passed on the 'qsub'
505  command line. This is added to standard_options.
506  """
507  self._numslave = numslave
508  self._options = options
509  self._starting_slaves = []
510  self._jobid = None
511 
512  def _get_slaves(self):
513  """Return a list of Slave objects contained within this array"""
514  return [_SGEQsubSlave(self) for x in range(self._numslave)]
515 
516  def _slave_started(self, command, output, slave):
517  self._starting_slaves.append((command, output, slave))
518 
519  def _start(self, command):
520  qsub = "qsub -S /bin/sh %s %s -t 1-%d" % \
521  (self._options, self.standard_options,
522  len(self._starting_slaves))
523  print(qsub)
524  a = _Popen4(qsub)
525  (inp, out) = (a.stdin, a.stdout)
526  slave_uid = " ".join([repr(s[0]) for s in self._starting_slaves])
527  slave_out = " ".join([repr(s[1]) for s in self._starting_slaves])
528  inp.write("#!/bin/sh\n")
529  inp.write("uid=( '' %s )\n" % slave_uid)
530  inp.write("out=( '' %s )\n" % slave_out)
531  inp.write("myuid=${uid[$SGE_TASK_ID]}\n")
532  inp.write("myout=${out[$SGE_TASK_ID]}\n")
533  inp.write("%s $myuid > $myout 2>&1\n" % command)
534  inp.close()
535  outlines = out.readlines()
536  out.close()
537  for line in outlines:
538  print(line.rstrip('\r\n'))
539  a.require_clean_exit()
540  self._set_jobid(outlines)
541  self._starting_slaves = []
542 
543  def _set_jobid(self, outlines):
544  """Try to figure out the job ID from the SGE qsub output"""
545  if len(outlines) > 0:
546  m = re.compile(r"\d+").search(outlines[0])
547  if m:
548  self._jobid = int(m.group())
549  for (num, slave) in enumerate(self._starting_slaves):
550  slave[2]._jobid = "%d.%d" % (self._jobid, num+1)
551 
552 
553 class _SGEPESlave(Slave):
554  def __init__(self, host):
555  Slave.__init__(self)
556  self._host = host
557 
558  def _start(self, command, unique_id, output):
559  Slave._start(self, command, unique_id, output)
560  cmdline = "qrsh -inherit -V %s %s %s" % (self._host, command, unique_id)
561  _run_background(cmdline, output)
562 
563  def __repr__(self):
564  return "<SGE PE slave on %s>" % self._host
565 
566 
568  """An array of slaves in a Sun Grid Engine system parallel environment.
569  In order to use this class, the master must be run via Sun Grid Engine's
570  'qsub' command and submitted to a parallel environment using the qsub
571  -pe option. This class will start slaves on every node in the parallel
572  environment (including the node running the master). Each slave is
573  started using the 'qrsh' command with the '-inherit' option."""
574 
575  def _get_slaves(self):
576  slaves = []
577 
578  pe = os.environ['PE_HOSTFILE']
579  fh = open(pe, "r")
580  while True:
581  line = fh.readline()
582  if line == '':
583  break
584  (node, num, queue) = line.split(None, 2)
585  for i in range(int(num)):
586  slaves.append(_SGEPESlave(node))
587  # Replace first slave with a local slave, as this is ourself, and SGE
588  # won't let us start this process with qrsh (as we are already
589  # occupying the slot)
590  if len(slaves) > 0:
591  slaves[0] = LocalSlave()
592  return slaves
593 
594 
595 class Context(object):
596  """A collection of tasks that run in the same environment.
597  Context objects are typically created by calling Manager::get_context().
598  """
599  def __init__(self, manager, startup=None):
600  """Constructor."""
601  self._manager = manager
602  self._startup = startup
603  self._tasks = []
604 
605  def add_task(self, task):
606  """Add a task to this context.
607  Tasks are any Python callable object that can be pickled (e.g. a
608  function or a class that implements the \_\_call\_\_ method). When
609  the task is run on the slave its arguments are the return value
610  from this context's startup function."""
611  self._tasks.append(task)
612 
613  def get_results_unordered(self):
614  """Run all of the tasks on available slaves, and return results.
615  If there are more tasks than slaves, subsequent tasks are
616  started only once a running task completes: each slave only runs
617  a single task at a time. As each task completes, the return value(s)
618  from the task callable are returned from this method, as a
619  Python generator. Note that the results are returned in the order
620  that tasks complete, which may not be the same as the order they
621  were submitted in.
623  \exception NoMoreSlavesError there are no slaves available
624  to run the tasks (or they all failed during execution).
625  \exception RemoteError a slave encountered an unhandled exception.
626  \exception NetworkError the master lost (or was unable to
627  establish) communication with any slave.
628  """
629  return self._manager._get_results_unordered(self)
630 
631 
632 class Manager(object):
633  """Manages slaves and contexts.
634  """
635 
636  connect_timeout = 7200
637 
638  # Note: must be higher than that in slave_handler._HeartBeatThread
639  heartbeat_timeout = 7200
640 
641  def __init__(self, python=None, host=None, output='slave%d.output'):
642  """Constructor.
643  @param python If not None, the command to run to start a Python
644  interpreter that can import the IMP module. Otherwise,
645  the same interpreter that the master is currently using
646  is used. This is passed to the shell, so a full command
647  line (including multiple words separated by spaces) can
648  be used if necessary.
649  @param host The hostname that slaves use to connect back to the
650  master. If not specified, the master machine's primary
651  IP address is used. On multi-homed machines, such as
652  compute cluster headnodes, this may need to be changed
653  to allow all slaves to reach the master (typically the
654  name of the machine's internal network address is
655  needed). If only running local slaves, 'localhost' can
656  be used to prohibit connections across the network.
657  @param output A format string used to name slave output files. It is
658  given the numeric slave id, so for example the default
659  value 'slave\%d.output' will yield output files called
660  slave0.output, slave1.output, etc.
661  """
662  if python is None:
663  self._python = sys.executable
664  else:
665  self._python = python
666  self._host = host
667  self._output = output
668  self._all_slaves = []
669  self._starting_slaves = {}
670  self._slave_arrays = []
671  if host:
672  self._host = host
673  else:
674  # Get primary IP address of this machine
675  self._host = socket.gethostbyname_ex(socket.gethostname())[-1][0]
676  self._listen_sock = _ListenSocket(self._host, self.connect_timeout)
677 
678  def add_slave(self, slave):
679  """Add a Slave object."""
680  if hasattr(slave, '_get_slaves'):
681  self._slave_arrays.append(slave)
682  else:
683  self._all_slaves.append(slave)
684 
685  def get_context(self, startup=None):
686  """Create and return a new Context in which tasks can be run.
687  @param startup If not None, a callable (Python function or class
688  that implements the \_\_call\_\_ method) that sets up
689  the slave to run tasks. This method is only called
690  once per slave. The return values from this method
691  are passed to the task object when it runs on
692  the slave.
693  @return A new Context object.
694  """
695  return Context(self, startup)
696 
697  def _get_results_unordered(self, context):
698  """Run all of a context's tasks, and yield results"""
699  self._send_tasks_to_slaves(context)
700  try:
701  while True:
702  for task in self._get_finished_tasks(context):
703  tasks_queued = len(context._tasks)
704  yield task._results
705  # If the user added more tasks while processing these
706  # results, make sure they get sent off to the slaves
707  if len(context._tasks) > tasks_queued:
708  self._send_tasks_to_slaves(context)
709  except _NoMoreTasksError:
710  return
711 
712  def _start_all_slaves(self):
713  for array in self._slave_arrays:
714  self._all_slaves.extend(array._get_slaves())
715 
716  command = ("%s -c \"import IMP.parallel.slave_handler as s; s.main()\" "
717  "%s %d") % (self._python, self._host, self._listen_sock.port)
718 
719  for (num, slave) in enumerate(self._all_slaves):
720  if slave._ready_to_start():
721  unique_id = self._get_unique_id(num)
722  self._starting_slaves[unique_id] = slave
723  slave._start(command, unique_id, self._output % num)
724 
725  for array in self._slave_arrays:
726  array._start(command)
727  self._slave_arrays = []
728 
729  def _get_unique_id(self, num):
730  id = "%d:" % num
731  for i in range(0, 8):
732  id += chr(random.randint(0, 25) + ord('A'))
733  return id
734 
735  def _send_tasks_to_slaves(self, context):
736  self._start_all_slaves()
737  # Prefer slaves that already have the task context
738  available_slaves = [a for a in self._all_slaves
739  if a._ready_for_task(context)] + \
740  [a for a in self._all_slaves
741  if a._ready_for_task(None)]
742  for slave in available_slaves:
743  if len(context._tasks) == 0:
744  break
745  else:
746  self._send_task_to_slave(slave, context)
747 
748  def _send_task_to_slave(self, slave, context):
749  if len(context._tasks) == 0:
750  return
751  t = context._tasks[0]
752  try:
753  slave._start_task(t, context)
754  context._tasks.pop(0)
755  except socket.error as detail:
756  slave._kill()
757 
758  def _get_finished_tasks(self, context):
759  while True:
760  events = self._get_network_events(context)
761  if len(events) == 0:
762  self._kill_all_running_slaves(context)
763  for event in events:
764  task = self._process_event(event, context)
765  if task:
766  yield task
767 
768  def _process_event(self, event, context):
769  if event == self._listen_sock:
770  # New slave just connected
771  (conn, addr) = self._listen_sock.accept()
772  new_slave = self._accept_slave(conn, context)
773  elif event._running_task(context):
774  try:
775  task = event._get_finished_task()
776  if task:
777  self._send_task_to_slave(event, context)
778  return task
779  else: # the slave sent back a heartbeat
780  self._kill_timed_out_slaves(context)
781  except NetworkError as detail:
782  task = event._kill()
783  print("Slave %s failed (%s): rescheduling task %s" \
784  % (str(event), str(detail), str(task)))
785  context._tasks.append(task)
786  self._send_tasks_to_slaves(context)
787  else:
788  pass # Slave not running a task
789 
790  def _kill_timed_out_slaves(self, context):
791  timed_out = [a for a in self._all_slaves if a._running_task(context) \
792  and a.get_contact_timed_out(self.heartbeat_timeout)]
793  for slave in timed_out:
794  task = slave._kill()
795  print("Did not hear from slave %s in %d seconds; rescheduling "
796  "task %s" % (str(slave), self.heartbeat_timeout, str(task)))
797  context._tasks.append(task)
798  if len(timed_out) > 0:
799  self._send_tasks_to_slaves(context)
800 
801  def _kill_all_running_slaves(self, context):
802  running = [a for a in self._all_slaves if a._running_task(context)]
803  for slave in running:
804  task = slave._kill()
805  context._tasks.append(task)
806  raise NetworkError("Did not hear from any running slave in "
807  "%d seconds" % self.heartbeat_timeout)
808 
809  def _accept_slave(self, sock, context):
810  sock.setblocking(True)
811  identifier = sock.recv(1024)
812  if identifier:
813  identifier = identifier.decode('ascii')
814  if identifier and identifier in self._starting_slaves:
815  slave = self._starting_slaves.pop(identifier)
816  slave._accept_connection(sock)
817  print("Identified slave %s " % str(slave))
818  self._init_slave(slave)
819  self._send_task_to_slave(slave, context)
820  return slave
821  else:
822  print("Ignoring request from unknown slave")
823 
824  def _init_slave(self, slave):
825  if _import_time_path[0] != '':
826  slave._set_python_search_path(_import_time_path[0])
827  if sys.path[0] != '' and sys.path[0] != _import_time_path[0]:
828  slave._set_python_search_path(sys.path[0])
829 
830  def _get_network_events(self, context):
831  running = [a for a in self._all_slaves if a._running_task(context)]
832  if len(running) == 0:
833  if len(context._tasks) == 0:
834  raise _NoMoreTasksError()
835  elif len(self._starting_slaves) == 0:
836  raise NoMoreSlavesError("Ran out of slaves to run tasks")
837  # Otherwise, wait for starting slaves to connect back and get tasks
838 
839  return util._poll_events(self._listen_sock, running,
840  self.heartbeat_timeout)
841 
842 
843 
844 def get_module_version():
845  """get_module_version() -> std::string const"""
846  return _IMP_parallel.get_module_version()
847 
848 def get_example_path(*args):
849  """get_example_path(std::string fname) -> std::string"""
850  return _IMP_parallel.get_example_path(*args)
851 
852 def get_data_path(*args):
853  """get_data_path(std::string fname) -> std::string"""
854  return _IMP_parallel.get_data_path(*args)
855 from . import _version_check
856 _version_check.check_version(get_module_version())
857 
858 
859 
Representation of an array of slaves.
std::string get_data_path(std::string file_name)
Return the full path to installed data.
Error raised if a slave has an unhandled exception.
Make CGAL functionality available to IMP.
Low level functionality (logging, error handling, profiling, command line flags etc) that is used by ...
Subprocess handling.
Definition: subproc.py:1
def __init__
Constructor.
An array of slaves in a Sun Grid Engine system parallel environment.
A collection of tasks that run in the same environment.
Representation of a single slave.
Base functionality and abstract base classes for representation, scoring and sampling.
std::string get_example_path(std::string file_name)
Return the path to installed example data for this module.
def add_task
Add a task to this context.
Utilities for the IMP.parallel module.
Definition: util.py:1
General purpose algebraic and geometric methods that are expected to be used by a wide variety of IMP...
Base class for all errors specific to the parallel module.
def add_slave
Add a Slave object.
def get_context
Create and return a new Context in which tasks can be run.
Error raised if a problem occurs with the network.
Error raised if all slaves failed, so tasks cannot be run.
def __init__
Constructor.
Distribute IMP tasks to multiple processors or machines.
An array of slaves on a Sun Grid Engine system, started with 'qsub'.
def get_results_unordered
Run all of the tasks on available slaves, and return results.
Manages slaves and contexts.
A slave running on the same machine as the master.