IMP  2.1.0
The Integrative Modeling Platform
parallel/__init__.py
1 # This file was automatically generated by SWIG (http://www.swig.org).
2 # Version 2.0.11
3 #
4 # Do not make changes to this file unless you know what you are doing--modify
5 # the SWIG interface file instead.
6 
7 
8 
9 
10 
11 from sys import version_info
12 if version_info >= (2,6,0):
13  def swig_import_helper():
14  from os.path import dirname
15  import imp
16  fp = None
17  try:
18  fp, pathname, description = imp.find_module('_IMP_parallel', [dirname(__file__)])
19  except ImportError:
20  import _IMP_parallel
21  return _IMP_parallel
22  if fp is not None:
23  try:
24  _mod = imp.load_module('_IMP_parallel', fp, pathname, description)
25  finally:
26  fp.close()
27  return _mod
28  _IMP_parallel = swig_import_helper()
29  del swig_import_helper
30 else:
31  import _IMP_parallel
32 del version_info
33 try:
34  _swig_property = property
35 except NameError:
36  pass # Python < 2.2 doesn't have 'property'.
37 def _swig_setattr_nondynamic(self,class_type,name,value,static=1):
38  if (name == "thisown"): return self.this.own(value)
39  if (name == "this"):
40  if type(value).__name__ == 'SwigPyObject':
41  self.__dict__[name] = value
42  return
43  method = class_type.__swig_setmethods__.get(name,None)
44  if method: return method(self,value)
45  if (not static):
46  self.__dict__[name] = value
47  else:
48  raise AttributeError("You cannot add attributes to %s" % self)
49 
50 def _swig_setattr(self,class_type,name,value):
51  return _swig_setattr_nondynamic(self,class_type,name,value,0)
52 
53 def _swig_getattr(self,class_type,name):
54  if (name == "thisown"): return self.this.own()
55  method = class_type.__swig_getmethods__.get(name,None)
56  if method: return method(self)
57  raise AttributeError(name)
58 
59 def _swig_repr(self):
60  try: strthis = "proxy of " + self.this.__repr__()
61  except: strthis = ""
62  return "<%s.%s; %s >" % (self.__class__.__module__, self.__class__.__name__, strthis,)
63 
64 try:
65  _object = object
66  _newclass = 1
67 except AttributeError:
68  class _object : pass
69  _newclass = 0
70 
71 
72 try:
73  import weakref
74  weakref_proxy = weakref.proxy
75 except:
76  weakref_proxy = lambda x: x
77 
78 
79 class IMP_PARALLEL_SwigPyIterator(_object):
80  """Proxy of C++ swig::IMP_PARALLEL_SwigPyIterator class"""
81  __swig_setmethods__ = {}
82  __setattr__ = lambda self, name, value: _swig_setattr(self, IMP_PARALLEL_SwigPyIterator, name, value)
83  __swig_getmethods__ = {}
84  __getattr__ = lambda self, name: _swig_getattr(self, IMP_PARALLEL_SwigPyIterator, name)
85  def __init__(self, *args, **kwargs): raise AttributeError("No constructor defined - class is abstract")
86  __repr__ = _swig_repr
87  __swig_destroy__ = _IMP_parallel.delete_IMP_PARALLEL_SwigPyIterator
88  __del__ = lambda self : None;
89  def value(self):
90  """value(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
91  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_value(self)
92 
93  def incr(self, n=1):
94  """
95  incr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
96  incr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
97  """
98  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_incr(self, n)
99 
100  def decr(self, n=1):
101  """
102  decr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
103  decr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
104  """
105  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_decr(self, n)
106 
107  def distance(self, *args):
108  """distance(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t"""
109  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_distance(self, *args)
110 
111  def equal(self, *args):
112  """equal(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
113  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_equal(self, *args)
114 
115  def copy(self):
116  """copy(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator"""
117  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_copy(self)
118 
119  def next(self):
120  """next(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
121  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_next(self)
122 
123  def __next__(self):
124  """__next__(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
125  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___next__(self)
126 
127  def previous(self):
128  """previous(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
129  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_previous(self)
130 
131  def advance(self, *args):
132  """advance(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
133  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_advance(self, *args)
134 
135  def __eq__(self, *args):
136  """__eq__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
137  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___eq__(self, *args)
138 
139  def __ne__(self, *args):
140  """__ne__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
141  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___ne__(self, *args)
142 
143  def __iadd__(self, *args):
144  """__iadd__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
145  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___iadd__(self, *args)
146 
147  def __isub__(self, *args):
148  """__isub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
149  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___isub__(self, *args)
150 
151  def __add__(self, *args):
152  """__add__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
153  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___add__(self, *args)
154 
155  def __sub__(self, *args):
156  """
157  __sub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator
158  __sub__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t
159  """
160  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___sub__(self, *args)
161 
162  def __iter__(self): return self
163 IMP_PARALLEL_SwigPyIterator_swigregister = _IMP_parallel.IMP_PARALLEL_SwigPyIterator_swigregister
164 IMP_PARALLEL_SwigPyIterator_swigregister(IMP_PARALLEL_SwigPyIterator)
165 
166 _value_types=[]
167 _object_types=[]
168 _raii_types=[]
169 _plural_types=[]
170 
171 IMP_DEBUG = _IMP_parallel.IMP_DEBUG
172 IMP_RELEASE = _IMP_parallel.IMP_RELEASE
173 IMP_SILENT = _IMP_parallel.IMP_SILENT
174 IMP_PROGRESS = _IMP_parallel.IMP_PROGRESS
175 IMP_TERSE = _IMP_parallel.IMP_TERSE
176 IMP_VERBOSE = _IMP_parallel.IMP_VERBOSE
177 IMP_MEMORY = _IMP_parallel.IMP_MEMORY
178 IMP_NONE = _IMP_parallel.IMP_NONE
179 IMP_USAGE = _IMP_parallel.IMP_USAGE
180 IMP_INTERNAL = _IMP_parallel.IMP_INTERNAL
181 IMP_BASE_HAS_LOG4CXX = _IMP_parallel.IMP_BASE_HAS_LOG4CXX
182 IMP_COMPILER_HAS_AUTO = _IMP_parallel.IMP_COMPILER_HAS_AUTO
183 IMP_COMPILER_HAS_DEBUG_VECTOR = _IMP_parallel.IMP_COMPILER_HAS_DEBUG_VECTOR
184 IMP_BASE_HAS_BOOST_RANDOM = _IMP_parallel.IMP_BASE_HAS_BOOST_RANDOM
185 IMP_BASE_HAS_GPERFTOOLS = _IMP_parallel.IMP_BASE_HAS_GPERFTOOLS
186 IMP_BASE_HAS_TCMALLOC_HEAPCHECKER = _IMP_parallel.IMP_BASE_HAS_TCMALLOC_HEAPCHECKER
187 IMP_BASE_HAS_TCMALLOC_HEAPPROFILER = _IMP_parallel.IMP_BASE_HAS_TCMALLOC_HEAPPROFILER
188 IMPBASE_SHOW_WARNINGS = _IMP_parallel.IMPBASE_SHOW_WARNINGS
189 import sys
190 class _DirectorObjects(object):
191  """@internal Simple class to keep references to director objects
192  to prevent premature deletion."""
193  def __init__(self):
194  self._objects = []
195  def register(self, obj):
196  """Take a reference to a director object; will only work for
197  refcounted C++ classes"""
198  if hasattr(obj, 'get_ref_count'):
199  self._objects.append(obj)
200  def cleanup(self):
201  """Only drop our reference and allow cleanup by Python if no other
202  Python references exist (we hold 3 references: one in self._objects,
203  one in x, and one in the argument list for getrefcount) *and* no
204  other C++ references exist (the Python object always holds one)"""
205  objs = [x for x in self._objects if sys.getrefcount(x) > 3 \
206  or x.get_ref_count() > 1]
207 
208 
209  self._objects = objs
210  def get_object_count(self):
211  """Get number of director objects (useful for testing only)"""
212  return len(self._objects)
213 _director_objects = _DirectorObjects()
214 
215 DEFAULT_CHECK = _IMP_parallel.DEFAULT_CHECK
216 NONE = _IMP_parallel.NONE
217 USAGE = _IMP_parallel.USAGE
218 USAGE_AND_INTERNAL = _IMP_parallel.USAGE_AND_INTERNAL
219 
220 def set_check_level(*args):
221  """set_check_level(IMP::base::CheckLevel tf)"""
222  return _IMP_parallel.set_check_level(*args)
223 
224 def get_check_level():
225  """get_check_level() -> IMP::base::CheckLevel"""
226  return _IMP_parallel.get_check_level()
227 class _ostream(_object):
228  """Proxy of C++ std::ostream class"""
229  __swig_setmethods__ = {}
230  __setattr__ = lambda self, name, value: _swig_setattr(self, _ostream, name, value)
231  __swig_getmethods__ = {}
232  __getattr__ = lambda self, name: _swig_getattr(self, _ostream, name)
233  def __init__(self, *args, **kwargs): raise AttributeError("No constructor defined")
234  __repr__ = _swig_repr
235  def write(self, *args):
236  """write(_ostream self, char const * osa_buf)"""
237  return _IMP_parallel._ostream_write(self, *args)
238 
239 _ostream_swigregister = _IMP_parallel._ostream_swigregister
240 _ostream_swigregister(_ostream)
241 
242 IMP_COMPILER_HAS_OVERRIDE = _IMP_parallel.IMP_COMPILER_HAS_OVERRIDE
243 IMP_COMPILER_HAS_FINAL = _IMP_parallel.IMP_COMPILER_HAS_FINAL
244 IMP_HAS_NOEXCEPT = _IMP_parallel.IMP_HAS_NOEXCEPT
245 import IMP.base
246 IMP_CGAL_HAS_BOOST_FILESYSTEM = _IMP_parallel.IMP_CGAL_HAS_BOOST_FILESYSTEM
247 IMP_CGAL_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_CGAL_HAS_BOOST_PROGRAMOPTIONS
248 IMP_CGAL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_CGAL_HAS_BOOST_RANDOM
249 IMP_CGAL_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_CGAL_HAS_BOOST_SYSTEM
250 IMPCGAL_SHOW_WARNINGS = _IMP_parallel.IMPCGAL_SHOW_WARNINGS
251 import IMP.cgal
252 IMP_ALGEBRA_HAS_IMP_CGAL = _IMP_parallel.IMP_ALGEBRA_HAS_IMP_CGAL
253 IMP_ALGEBRA_HAS_BOOST_FILESYSTEM = _IMP_parallel.IMP_ALGEBRA_HAS_BOOST_FILESYSTEM
254 IMP_ALGEBRA_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_ALGEBRA_HAS_BOOST_PROGRAMOPTIONS
255 IMP_ALGEBRA_HAS_BOOST_RANDOM = _IMP_parallel.IMP_ALGEBRA_HAS_BOOST_RANDOM
256 IMP_ALGEBRA_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_ALGEBRA_HAS_BOOST_SYSTEM
257 IMP_ALGEBRA_HAS_CGAL = _IMP_parallel.IMP_ALGEBRA_HAS_CGAL
258 IMP_ALGEBRA_HAS_ANN = _IMP_parallel.IMP_ALGEBRA_HAS_ANN
259 IMPALGEBRA_SHOW_WARNINGS = _IMP_parallel.IMPALGEBRA_SHOW_WARNINGS
260 import IMP.algebra
261 IMP_KERNEL_HAS_IMP_CGAL = _IMP_parallel.IMP_KERNEL_HAS_IMP_CGAL
262 IMP_KERNEL_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_KERNEL_HAS_BOOST_PROGRAMOPTIONS
263 IMP_KERNEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_KERNEL_HAS_BOOST_RANDOM
264 IMP_KERNEL_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_KERNEL_HAS_BOOST_SYSTEM
265 IMP_KERNEL_HAS_CGAL = _IMP_parallel.IMP_KERNEL_HAS_CGAL
266 IMPKERNEL_SHOW_WARNINGS = _IMP_parallel.IMPKERNEL_SHOW_WARNINGS
267 import IMP.kernel
268 IMP_PARALLEL_HAS_IMP_ALGEBRA = _IMP_parallel.IMP_PARALLEL_HAS_IMP_ALGEBRA
269 IMP_PARALLEL_HAS_IMP_BASE = _IMP_parallel.IMP_PARALLEL_HAS_IMP_BASE
270 IMP_PARALLEL_HAS_IMP_CGAL = _IMP_parallel.IMP_PARALLEL_HAS_IMP_CGAL
271 IMP_PARALLEL_HAS_BOOST_FILESYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_FILESYSTEM
272 IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS
273 IMP_PARALLEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_RANDOM
274 IMP_PARALLEL_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_SYSTEM
275 IMP_PARALLEL_HAS_CGAL = _IMP_parallel.IMP_PARALLEL_HAS_CGAL
276 IMPPARALLEL_SHOW_WARNINGS = _IMP_parallel.IMPPARALLEL_SHOW_WARNINGS
277 import socket
278 import time
279 import sys
280 import os
281 import re
282 import random
283 import socket
284 import xdrlib
285 try:
286  import cPickle as pickle
287 except ImportError:
288  import pickle
289 from IMP.parallel import slavestate
290 from IMP.parallel.subproc import _run_background, _Popen4
291 from IMP.parallel.util import _ListenSocket, _ErrorWrapper
292 from IMP.parallel.util import _TaskWrapper, _HeartBeat, _ContextWrapper
293 from IMP.parallel.util import _SetPathAction
294 
295 # Save sys.path at import time, so that slaves can import using the same
296 # path that works for the master imports
297 _import_time_path = sys.path[:]
298 
299 class Error(Exception):
300  """Base class for all errors specific to the parallel module"""
301  pass
302 
303 
304 class _NoMoreTasksError(Error):
305  pass
306 
307 
309  """Error raised if all slaves failed, so tasks cannot be run"""
310  pass
311 
312 
314  """Error raised if a problem occurs with the network"""
315  pass
316 
317 
319  """Error raised if a slave has an unhandled exception"""
320  def __init__(self, exc, traceback, slave):
321  self.exc = exc
322  self.traceback = traceback
323  self.slave = slave
324 
325  def __str__(self):
326  errstr = str(self.exc.__class__).replace("exceptions.", "")
327  return "%s: %s from %s\nRemote traceback:\n%s" \
328  % (errstr, str(self.exc), str(self.slave), self.traceback)
329 
330 class _Communicator(object):
331  """Simple support for sending Python pickled objects over the network"""
332 
333  def __init__(self):
334  self._socket = None
335  self._ibuffer = ''
336 
337  def _send(self, obj):
338  p = xdrlib.Packer()
339  try:
340  p.pack_string(pickle.dumps(obj, -1))
341  # Python < 2.5 can fail trying to send Inf or NaN floats in binary
342  # mode, so fall back to the old protocol in this case:
343  except SystemError:
344  p.pack_string(pickle.dumps(obj, 0))
345  self._socket.sendall(p.get_buffer())
346 
347  def get_data_pending(self):
348  return len(self._ibuffer) > 0
349 
350  def _recv(self):
351  while True:
352  try:
353  obj, self._ibuffer = self._unpickle(self._ibuffer)
354  if isinstance(obj, _ErrorWrapper):
355  raise RemoteError(obj.obj, obj.traceback, self)
356  else:
357  return obj
358  except (IndexError, EOFError):
359  try:
360  data = self._socket.recv(4096)
361  except socket.error, detail:
362  raise NetworkError("Connection lost to %s: %s" \
363  % (str(self), str(detail)))
364  if len(data) > 0:
365  self._ibuffer += data
366  else:
367  raise NetworkError("%s closed connection" % str(self))
368 
369  def _unpickle(self, ibuffer):
370  p = xdrlib.Unpacker(ibuffer)
371  obj = p.unpack_string()
372  return (pickle.loads(obj), ibuffer[p.get_position():])
373 
374 
375 class Slave(_Communicator):
376  """Representation of a single slave.
377  Each slave uses a single thread of execution (i.e. a single CPU core)
378  to run tasks sequentially.
379  Slave is an abstract class; instead of using this class directly, use
380  a subclass such as LocalSlave or SGEQsubSlaveArray."""
381 
382  def __init__(self):
383  _Communicator.__init__(self)
384  self._state = slavestate.init
385  self._context = None
386  self._task = None
387  self.update_contact_time()
388 
389  def _start(self, command, unique_id, output):
390  """Start the slave running on the remote host; override in subclasses"""
391  self._state = slavestate.started
392 
393  def _accept_connection(self, sock):
394  self._socket = sock
395  self._state = slavestate.connected
396  self.update_contact_time()
397 
398  def _set_python_search_path(self, path):
399  self._send(_SetPathAction(path))
400 
401  def update_contact_time(self):
402  self.last_contact_time = time.time()
403 
404  def get_contact_timed_out(self, timeout):
405  return (time.time() - self.last_contact_time) > timeout
406 
407  def _start_task(self, task, context):
408  if not self.ready_for_task(context) and not self.ready_for_task(None):
409  raise TypeError("%s not ready for task" % str(self))
410  if self._context != context:
411  self._context = context
412  self._send(_ContextWrapper(context._startup))
413  self._state = slavestate.running_task
414  self._task = task
415  self._send(_TaskWrapper(task))
416 
417  def _get_finished_task(self):
418  while True:
419  r = self._recv()
420  self.update_contact_time()
421  if isinstance(r, _HeartBeat):
422  if not self.get_data_pending():
423  return None
424  else:
425  break
426  task = self._task
427  task._results = r
428  self._task = None
429  self._state = slavestate.connected
430  return task
431 
432  def _kill(self):
433  task = self._task
434  self._task = None
435  self._context = None
436  self._state = slavestate.dead
437  return task
438 
439  def ready_to_start(self):
440  return self._state == slavestate.init
441 
442  def ready_for_task(self, context):
443  return self._state == slavestate.connected \
444  and self._context == context
445 
446  def running_task(self, context):
447  return self._state == slavestate.running_task \
448  and self._context == context
449 
450 
451 class SlaveArray(object):
452  """Representation of an array of slaves.
453  This is similar to Slave, except that it represents a collection of
454  slaves that are controlled together, such as a batch submission system
455  array job on a compute cluster.
456  Slave is an abstract class; instead of using this class directly, use
457  a subclass such as SGEQsubSlaveArray."""
458 
459  def _get_slaves(self):
460  """Return a list of Slave objects contained within this array"""
461  pass
462 
463  def _start(self):
464  """Do any necessary startup after all contained Slaves have started"""
465  pass
466 
467 
468 class LocalSlave(Slave):
469  """A slave running on the same machine as the master."""
470 
471  def _start(self, command, unique_id, output):
472  Slave._start(self, command, unique_id, output)
473  cmdline = "%s %s" % (command, unique_id)
474  _run_background(cmdline, output)
475 
476  def __repr__(self):
477  return "<LocalSlave>"
478 
479 
480 class _SGEQsubSlave(Slave):
481  def __init__(self, array):
482  Slave.__init__(self)
483  self._jobid = None
484  self._array = array
485 
486  def _start(self, command, unique_id, output):
487  Slave._start(self, command, unique_id, output)
488  self._array._slave_started(unique_id, output, self)
489 
490  def __repr__(self):
491  jobid = self._jobid
492  if jobid is None:
493  jobid = '(unknown)'
494  return "<SGE qsub slave, ID %s>" % jobid
495 
496 
498  """An array of slaves on a Sun Grid Engine system, started with 'qsub'.
499  To use this class, the master process must be running on a machine that
500  can submit Sun Grid Engine (SGE) jobs using the 'qsub' command (this
501  is termed a 'submit host' by SGE). The class starts an SGE job array
502  (every slave has the same SGE job ID, but a different task ID).
503  @param numslave The number of slaves, which correponds to the number of
504  tasks in the SGE job.
505  @param options A string of SGE options that are passed on the 'qsub'
506  command line. This is added to standard_options.
507  """
508 
509 
510  standard_options = '-j y -cwd -r n -o sge-errors'
511 
512  def __init__(self, numslave, options):
513  self._numslave = numslave
514  self._options = options
515  self._starting_slaves = []
516  self._jobid = None
517 
518  def _get_slaves(self):
519  """Return a list of Slave objects contained within this array"""
520  return [_SGEQsubSlave(self) for x in range(self._numslave)]
521 
522  def _slave_started(self, command, output, slave):
523  self._starting_slaves.append((command, output, slave))
524 
525  def _start(self, command):
526  qsub = "qsub -S /bin/sh %s %s -t 1-%d" % \
527  (self._options, self.standard_options,
528  len(self._starting_slaves))
529  print qsub
530  a = _Popen4(qsub)
531  (inp, out) = (a.stdin, a.stdout)
532  slave_uid = " ".join([repr(s[0]) for s in self._starting_slaves])
533  slave_out = " ".join([repr(s[1]) for s in self._starting_slaves])
534  inp.write("#!/bin/sh\n")
535  inp.write("uid=( '' %s )\n" % slave_uid)
536  inp.write("out=( '' %s )\n" % slave_out)
537  inp.write("myuid=${uid[$SGE_TASK_ID]}\n")
538  inp.write("myout=${out[$SGE_TASK_ID]}\n")
539  inp.write("%s $myuid > $myout 2>&1\n" % command)
540  inp.close()
541  outlines = out.readlines()
542  out.close()
543  for line in outlines:
544  print line.rstrip('\r\n')
545  a.require_clean_exit()
546  self._set_jobid(outlines)
547  self._starting_slaves = []
548 
549  def _set_jobid(self, outlines):
550  """Try to figure out the job ID from the SGE qsub output"""
551  if len(outlines) > 0:
552  m = re.compile(r"\d+").search(outlines[0])
553  if m:
554  self._jobid = int(m.group())
555  for (num, slave) in enumerate(self._starting_slaves):
556  slave[2]._jobid = "%d.%d" % (self._jobid, num+1)
557 
558 
559 class _SGEPESlave(Slave):
560  def __init__(self, host):
561  Slave.__init__(self)
562  self._host = host
563 
564  def _start(self, command, unique_id, output):
565  Slave._start(self, command, unique_id, output)
566  cmdline = "qrsh -inherit -V %s %s %s" % (self._host, command, unique_id)
567  _run_background(cmdline, output)
568 
569  def __repr__(self):
570  return "<SGE PE slave on %s>" % self._host
571 
572 
574  """An array of slaves in a Sun Grid Engine system parallel environment.
575  In order to use this class, the master must be run via Sun Grid Engine's
576  'qsub' command and submitted to a parallel environment using the qsub
577  -pe option. This class will start slaves on every node in the parallel
578  environment (including the node running the master). Each slave is
579  started using the 'qrsh' command with the '-inherit' option."""
580 
581  def _get_slaves(self):
582  slaves = []
583 
584  pe = os.environ['PE_HOSTFILE']
585  fh = open(pe, "r")
586  while True:
587  line = fh.readline()
588  if line == '':
589  break
590  (node, num, queue) = line.split(None, 2)
591  for i in range(int(num)):
592  slaves.append(_SGEPESlave(node))
593  # Replace first slave with a local slave, as this is ourself, and SGE
594  # won't let us start this process with qrsh (as we are already
595  # occupying the slot)
596  if len(slaves) > 0:
597  slaves[0] = LocalSlave()
598  return slaves
599 
600 
601 class Context(object):
602  """A collection of tasks that run in the same environment.
603  Context objects are typically created by calling Manager::get_context().
604  """
605  def __init__(self, manager, startup=None):
606  self._manager = manager
607  self._startup = startup
608  self._tasks = []
609 
610  def add_task(self, task):
611  """Add a task to this context.
612  Tasks are any Python callable object that can be pickled (e.g. a
613  function or a class that implements the \_\_call\_\_ method). When
614  the task is run on the slave its arguments are the return value
615  from this context's startup function."""
616  self._tasks.append(task)
617 
618  def get_results_unordered(self):
619  """Run all of the tasks on available slaves, and return results.
620  If there are more tasks than slaves, subsequent tasks are
621  started only once a running task completes: each slave only runs
622  a single task at a time. As each task completes, the return value(s)
623  from the task callable are returned from this method, as a
624  Python generator. Note that the results are returned in the order
625  that tasks complete, which may not be the same as the order they
626  were submitted in.
627 
628  \exception NoMoreSlavesError there are no slaves available
629  to run the tasks (or they all failed during execution).
630  \exception RemoteError a slave encountered an unhandled exception.
631  \exception NetworkError the master lost (or was unable to
632  establish) communication with any slave.
633  """
634  return self._manager._get_results_unordered(self)
635 
636 
637 class Manager(object):
638  """Manages slaves and contexts.
639  @param python If not None, the command to run to start a Python
640  interpreter that can import the IMP module. Otherwise,
641  the same interpreter that the master is currently using
642  is used. This is passed to the shell, so a full command
643  line (including multiple words separated by spaces) can
644  be used if necessary.
645  @param host The hostname that slaves use to connect back to the master.
646  If not specified, the master machine's primary IP address
647  is used. On multi-homed machines, such as compute cluster
648  headnodes, this may need to be changed to allow all
649  slaves to reach the master (typically the name of the
650  machine's internal network address is needed). If only
651  running local slaves, 'localhost' can be used to prohibit
652  connections across the network.
653  @param output A format string used to name slave output files. It is
654  given the numeric slave id, so for example the default
655  value 'slave\%d.output' will yield output files called
656  slave0.output, slave1.output, etc.
657  """
658 
659  connect_timeout = 7200
660 
661  # Note: must be higher than that in slave_handler._HeartBeatThread
662  heartbeat_timeout = 7200
663 
664  def __init__(self, python=None, host=None, output='slave%d.output'):
665  if python is None:
666  self._python = sys.executable
667  else:
668  self._python = python
669  self._host = host
670  self._output = output
671  self._all_slaves = []
672  self._starting_slaves = {}
673  self._slave_arrays = []
674  if host:
675  self._host = host
676  else:
677  # Get primary IP address of this machine
678  self._host = socket.gethostbyname_ex(socket.gethostname())[-1][0]
679  self._listen_sock = _ListenSocket(self._host, self.connect_timeout)
680 
681  def add_slave(self, slave):
682  """Add a Slave object."""
683  if hasattr(slave, '_get_slaves'):
684  self._slave_arrays.append(slave)
685  else:
686  self._all_slaves.append(slave)
687 
688  def get_context(self, startup=None):
689  """Create and return a new Context in which tasks can be run.
690  @param startup If not None, a callable (Python function or class
691  that implements the \_\_call\_\_ method) that sets up
692  the slave to run tasks. This method is only called
693  once per slave. The return values from this method
694  are passed to the task object when it runs on
695  the slave.
696  @return A new Context object.
697  """
698  return Context(self, startup)
699 
700  def _get_results_unordered(self, context):
701  """Run all of a context's tasks, and yield results"""
702  self._send_tasks_to_slaves(context)
703  try:
704  while True:
705  for task in self._get_finished_tasks(context):
706  tasks_queued = len(context._tasks)
707  yield task._results
708  # If the user added more tasks while processing these
709  # results, make sure they get sent off to the slaves
710  if len(context._tasks) > tasks_queued:
711  self._send_tasks_to_slaves(context)
712  except _NoMoreTasksError:
713  return
714 
715  def _start_all_slaves(self):
716  for array in self._slave_arrays:
717  self._all_slaves.extend(array._get_slaves())
718 
719  command = ("%s -c \"import IMP.parallel.slave_handler as s; s.main()\" "
720  "%s %d") % (self._python, self._host, self._listen_sock.port)
721 
722  for (num, slave) in enumerate(self._all_slaves):
723  if slave.ready_to_start():
724  unique_id = self._get_unique_id(num)
725  self._starting_slaves[unique_id] = slave
726  slave._start(command, unique_id, self._output % num)
727 
728  for array in self._slave_arrays:
729  array._start(command)
730  self._slave_arrays = []
731 
732  def _get_unique_id(self, num):
733  id = "%d:" % num
734  for i in range(0, 8):
735  id += chr(random.randint(0, 25) + ord('A'))
736  return id
737 
738  def _send_tasks_to_slaves(self, context):
739  self._start_all_slaves()
740  # Prefer slaves that already have the task context
741  available_slaves = [a for a in self._all_slaves
742  if a.ready_for_task(context)] + \
743  [a for a in self._all_slaves
744  if a.ready_for_task(None)]
745  for slave in available_slaves:
746  if len(context._tasks) == 0:
747  break
748  else:
749  self._send_task_to_slave(slave, context)
750 
751  def _send_task_to_slave(self, slave, context):
752  if len(context._tasks) == 0:
753  return
754  t = context._tasks[0]
755  try:
756  slave._start_task(t, context)
757  context._tasks.pop(0)
758  except socket.error, detail:
759  slave._kill()
760 
761  def _get_finished_tasks(self, context):
762  while True:
763  events = self._get_network_events(context)
764  if len(events) == 0:
765  self._kill_all_running_slaves(context)
766  for event in events:
767  task = self._process_event(event, context)
768  if task:
769  yield task
770 
771  def _process_event(self, event, context):
772  if event == self._listen_sock:
773  # New slave just connected
774  (conn, addr) = self._listen_sock.accept()
775  new_slave = self._accept_slave(conn, context)
776  elif event.running_task(context):
777  try:
778  task = event._get_finished_task()
779  if task:
780  self._send_task_to_slave(event, context)
781  return task
782  else: # the slave sent back a heartbeat
783  self._kill_timed_out_slaves(context)
784  except NetworkError, detail:
785  task = event._kill()
786  print "Slave %s failed (%s): rescheduling task %s" \
787  % (str(event), str(detail), str(task))
788  context._tasks.append(task)
789  self._send_tasks_to_slaves(context)
790  else:
791  pass # Slave not running a task
792 
793  def _kill_timed_out_slaves(self, context):
794  timed_out = [a for a in self._all_slaves if a.running_task(context) \
795  and a.get_contact_timed_out(self.heartbeat_timeout)]
796  for slave in timed_out:
797  task = slave._kill()
798  print("Did not hear from slave %s in %d seconds; rescheduling "
799  "task %s" % (str(slave), self.heartbeat_timeout, str(task)))
800  context._tasks.append(task)
801  if len(timed_out) > 0:
802  self._send_tasks_to_slaves(context)
803 
804  def _kill_all_running_slaves(self, context):
805  running = [a for a in self._all_slaves if a.running_task(context)]
806  for slave in running:
807  task = slave._kill()
808  context._tasks.append(task)
809  raise NetworkError("Did not hear from any running slave in "
810  "%d seconds" % self.heartbeat_timeout)
811 
812  def _accept_slave(self, sock, context):
813  sock.setblocking(True)
814  identifier = sock.recv(1024)
815  if identifier and identifier in self._starting_slaves:
816  slave = self._starting_slaves.pop(identifier)
817  slave._accept_connection(sock)
818  print "Identified slave %s " % str(slave)
819  self._init_slave(slave)
820  self._send_task_to_slave(slave, context)
821  return slave
822  else:
823  print "Ignoring request from unknown slave"
824 
825  def _init_slave(self, slave):
826  if _import_time_path[0] != '':
827  slave._set_python_search_path(_import_time_path[0])
828  if sys.path[0] != '' and sys.path[0] != _import_time_path[0]:
829  slave._set_python_search_path(sys.path[0])
830 
831  def _get_network_events(self, context):
832  running = [a for a in self._all_slaves if a.running_task(context)]
833  if len(running) == 0:
834  if len(context._tasks) == 0:
835  raise _NoMoreTasksError()
836  elif len(self._starting_slaves) == 0:
837  raise NoMoreSlavesError("Ran out of slaves to run tasks")
838  # Otherwise, wait for starting slaves to connect back and get tasks
839 
840  return util._poll_events(self._listen_sock, running,
841  self.heartbeat_timeout)
842 
843 
844 
845 def get_module_version():
846  """get_module_version() -> std::string const"""
847  return _IMP_parallel.get_module_version()
848 
849 def get_example_path(*args):
850  """get_example_path(std::string fname) -> std::string"""
851  return _IMP_parallel.get_example_path(*args)
852 
853 def get_data_path(*args):
854  """get_data_path(std::string fname) -> std::string"""
855  return _IMP_parallel.get_data_path(*args)
856 import _version_check
857 _version_check.check_version(get_module_version())
858 
859 # This file is compatible with both classic and new-style classes.
860 
861 
Representation of an array of slaves.
std::string get_data_path(std::string file_name)
Return the full path to installed data.
Error raised if a slave has an unhandled exception.
See IMP.cgal for more information.
Definition: cgal_config.h:107
void set_check_level(CheckLevel tf)
Control runtime checks in the code.
See IMP.base for more information.
Definition: base/Array.h:20
Subprocess handling.
Definition: subproc.py:1
An array of slaves in a Sun Grid Engine system parallel environment.
A collection of tasks that run in the same environment.
Representation of a single slave.
See IMP.kernel for more information.
std::string get_example_path(std::string file_name)
Return the path to installed example data for this module.
def add_task
Add a task to this context.
Utilities for the IMP.parallel module.
Definition: util.py:1
See IMP.algebra for more information.
Base class for all errors specific to the parallel module.
def add_slave
Add a Slave object.
def get_context
Create and return a new Context in which tasks can be run.
Error raised if a problem occurs with the network.
CheckLevel get_check_level()
Get the current audit mode.
Error raised if all slaves failed, so tasks cannot be run.
See IMP.parallel for more information.
An array of slaves on a Sun Grid Engine system, started with &#39;qsub&#39;.
def get_results_unordered
Run all of the tasks on available slaves, and return results.
Manages slaves and contexts.
A slave running on the same machine as the master.