IMP  2.0.0
The Integrative Modeling Platform
parallel/__init__.py
1 # This file was automatically generated by SWIG (http://www.swig.org).
2 # Version 2.0.8
3 #
4 # Do not make changes to this file unless you know what you are doing--modify
5 # the SWIG interface file instead.
6 
7 
8 
9 from sys import version_info
10 if version_info >= (2,6,0):
11  def swig_import_helper():
12  from os.path import dirname
13  import imp
14  fp = None
15  try:
16  fp, pathname, description = imp.find_module('_IMP_parallel', [dirname(__file__)])
17  except ImportError:
18  import _IMP_parallel
19  return _IMP_parallel
20  if fp is not None:
21  try:
22  _mod = imp.load_module('_IMP_parallel', fp, pathname, description)
23  finally:
24  fp.close()
25  return _mod
26  _IMP_parallel = swig_import_helper()
27  del swig_import_helper
28 else:
29  import _IMP_parallel
30 del version_info
31 try:
32  _swig_property = property
33 except NameError:
34  pass # Python < 2.2 doesn't have 'property'.
35 def _swig_setattr_nondynamic(self,class_type,name,value,static=1):
36  if (name == "thisown"): return self.this.own(value)
37  if (name == "this"):
38  if type(value).__name__ == 'SwigPyObject':
39  self.__dict__[name] = value
40  return
41  method = class_type.__swig_setmethods__.get(name,None)
42  if method: return method(self,value)
43  if (not static):
44  self.__dict__[name] = value
45  else:
46  raise AttributeError("You cannot add attributes to %s" % self)
47 
48 def _swig_setattr(self,class_type,name,value):
49  return _swig_setattr_nondynamic(self,class_type,name,value,0)
50 
51 def _swig_getattr(self,class_type,name):
52  if (name == "thisown"): return self.this.own()
53  method = class_type.__swig_getmethods__.get(name,None)
54  if method: return method(self)
55  raise AttributeError(name)
56 
57 def _swig_repr(self):
58  try: strthis = "proxy of " + self.this.__repr__()
59  except: strthis = ""
60  return "<%s.%s; %s >" % (self.__class__.__module__, self.__class__.__name__, strthis,)
61 
62 try:
63  _object = object
64  _newclass = 1
65 except AttributeError:
66  class _object : pass
67  _newclass = 0
68 
69 
70 try:
71  import weakref
72  weakref_proxy = weakref.proxy
73 except:
74  weakref_proxy = lambda x: x
75 
76 
77 class IMP_PARALLEL_SwigPyIterator(_object):
78  """Proxy of C++ swig::IMP_PARALLEL_SwigPyIterator class"""
79  __swig_setmethods__ = {}
80  __setattr__ = lambda self, name, value: _swig_setattr(self, IMP_PARALLEL_SwigPyIterator, name, value)
81  __swig_getmethods__ = {}
82  __getattr__ = lambda self, name: _swig_getattr(self, IMP_PARALLEL_SwigPyIterator, name)
83  def __init__(self, *args, **kwargs): raise AttributeError("No constructor defined - class is abstract")
84  __repr__ = _swig_repr
85  __swig_destroy__ = _IMP_parallel.delete_IMP_PARALLEL_SwigPyIterator
86  __del__ = lambda self : None;
87  def value(self):
88  """value(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
89  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_value(self)
90 
91  def incr(self, n=1):
92  """
93  incr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
94  incr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
95  """
96  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_incr(self, n)
97 
98  def decr(self, n=1):
99  """
100  decr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
101  decr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
102  """
103  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_decr(self, n)
104 
105  def distance(self, *args):
106  """distance(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t"""
107  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_distance(self, *args)
108 
109  def equal(self, *args):
110  """equal(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
111  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_equal(self, *args)
112 
113  def copy(self):
114  """copy(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator"""
115  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_copy(self)
116 
117  def next(self):
118  """next(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
119  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_next(self)
120 
121  def __next__(self):
122  """__next__(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
123  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___next__(self)
124 
125  def previous(self):
126  """previous(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
127  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_previous(self)
128 
129  def advance(self, *args):
130  """advance(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
131  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_advance(self, *args)
132 
133  def __eq__(self, *args):
134  """__eq__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
135  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___eq__(self, *args)
136 
137  def __ne__(self, *args):
138  """__ne__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
139  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___ne__(self, *args)
140 
141  def __iadd__(self, *args):
142  """__iadd__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
143  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___iadd__(self, *args)
144 
145  def __isub__(self, *args):
146  """__isub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
147  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___isub__(self, *args)
148 
149  def __add__(self, *args):
150  """__add__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
151  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___add__(self, *args)
152 
153  def __sub__(self, *args):
154  """
155  __sub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator
156  __sub__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t
157  """
158  return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___sub__(self, *args)
159 
160  def __iter__(self): return self
161 IMP_PARALLEL_SwigPyIterator_swigregister = _IMP_parallel.IMP_PARALLEL_SwigPyIterator_swigregister
162 IMP_PARALLEL_SwigPyIterator_swigregister(IMP_PARALLEL_SwigPyIterator)
163 
164 _value_types=[]
165 _object_types=[]
166 _raii_types=[]
167 _plural_types=[]
168 
169 IMP_HAS_DEPRECATED = _IMP_parallel.IMP_HAS_DEPRECATED
170 IMP_DEBUG = _IMP_parallel.IMP_DEBUG
171 IMP_RELEASE = _IMP_parallel.IMP_RELEASE
172 IMP_SILENT = _IMP_parallel.IMP_SILENT
173 IMP_PROGRESS = _IMP_parallel.IMP_PROGRESS
174 IMP_TERSE = _IMP_parallel.IMP_TERSE
175 IMP_VERBOSE = _IMP_parallel.IMP_VERBOSE
176 IMP_NONE = _IMP_parallel.IMP_NONE
177 IMP_USAGE = _IMP_parallel.IMP_USAGE
178 IMP_INTERNAL = _IMP_parallel.IMP_INTERNAL
179 IMP_COMPILER_HAS_AUTO = _IMP_parallel.IMP_COMPILER_HAS_AUTO
180 IMP_COMPILER_HAS_DEBUG_VECTOR = _IMP_parallel.IMP_COMPILER_HAS_DEBUG_VECTOR
181 IMP_COMPILER_HAS_NULLPTR = _IMP_parallel.IMP_COMPILER_HAS_NULLPTR
182 IMP_BASE_HAS_BOOST_RANDOM = _IMP_parallel.IMP_BASE_HAS_BOOST_RANDOM
183 IMP_BASE_HAS_GPERFTOOLS = _IMP_parallel.IMP_BASE_HAS_GPERFTOOLS
184 IMP_BASE_HAS_LOG4CXX = _IMP_parallel.IMP_BASE_HAS_LOG4CXX
185 IMP_BASE_HAS_TCMALLOC_HEAPCHECKER = _IMP_parallel.IMP_BASE_HAS_TCMALLOC_HEAPCHECKER
186 IMP_BASE_HAS_TCMALLOC_HEAPPROFILER = _IMP_parallel.IMP_BASE_HAS_TCMALLOC_HEAPPROFILER
187 import sys
188 class _DirectorObjects(object):
189  """@internal Simple class to keep references to director objects
190  to prevent premature deletion."""
191  def __init__(self):
192  self._objects = []
193  def register(self, obj):
194  """Take a reference to a director object; will only work for
195  refcounted C++ classes"""
196  if hasattr(obj, 'get_ref_count'):
197  self._objects.append(obj)
198  def cleanup(self):
199  """Only drop our reference and allow cleanup by Python if no other
200  Python references exist (we hold 3 references: one in self._objects,
201  one in x, and one in the argument list for getrefcount) *and* no
202  other C++ references exist (the Python object always holds one)"""
203  objs = [x for x in self._objects if sys.getrefcount(x) > 3 \
204  or x.get_ref_count() > 1]
205 
206 
207  self._objects = objs
208  def get_object_count(self):
209  """Get number of director objects (useful for testing only)"""
210  return len(self._objects)
211 _director_objects = _DirectorObjects()
212 
213 DEFAULT_CHECK = _IMP_parallel.DEFAULT_CHECK
214 NONE = _IMP_parallel.NONE
215 USAGE = _IMP_parallel.USAGE
216 USAGE_AND_INTERNAL = _IMP_parallel.USAGE_AND_INTERNAL
217 
218 def set_check_level(*args):
219  """set_check_level(IMP::base::CheckLevel tf)"""
220  return _IMP_parallel.set_check_level(*args)
221 
222 def get_check_level():
223  """get_check_level() -> IMP::base::CheckLevel"""
224  return _IMP_parallel.get_check_level()
225 class _ostream(_object):
226  """Proxy of C++ std::ostream class"""
227  __swig_setmethods__ = {}
228  __setattr__ = lambda self, name, value: _swig_setattr(self, _ostream, name, value)
229  __swig_getmethods__ = {}
230  __getattr__ = lambda self, name: _swig_getattr(self, _ostream, name)
231  def __init__(self, *args, **kwargs): raise AttributeError("No constructor defined")
232  __repr__ = _swig_repr
233  def write(self, *args):
234  """write(_ostream self, char const * osa_buf)"""
235  return _IMP_parallel._ostream_write(self, *args)
236 
237 _ostream_swigregister = _IMP_parallel._ostream_swigregister
238 _ostream_swigregister(_ostream)
239 
240 IMP_COMPILER_HAS_OVERRIDE = _IMP_parallel.IMP_COMPILER_HAS_OVERRIDE
241 IMP_COMPILER_HAS_FINAL = _IMP_parallel.IMP_COMPILER_HAS_FINAL
242 import IMP.base
243 IMP_CGAL_HAS_BOOST_FILESYSTEM = _IMP_parallel.IMP_CGAL_HAS_BOOST_FILESYSTEM
244 IMP_CGAL_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_CGAL_HAS_BOOST_PROGRAMOPTIONS
245 IMP_CGAL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_CGAL_HAS_BOOST_RANDOM
246 IMP_CGAL_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_CGAL_HAS_BOOST_SYSTEM
247 import IMP.cgal
248 IMP_ALGEBRA_HAS_IMP_CGAL = _IMP_parallel.IMP_ALGEBRA_HAS_IMP_CGAL
249 IMP_ALGEBRA_HAS_BOOST_FILESYSTEM = _IMP_parallel.IMP_ALGEBRA_HAS_BOOST_FILESYSTEM
250 IMP_ALGEBRA_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_ALGEBRA_HAS_BOOST_PROGRAMOPTIONS
251 IMP_ALGEBRA_HAS_BOOST_RANDOM = _IMP_parallel.IMP_ALGEBRA_HAS_BOOST_RANDOM
252 IMP_ALGEBRA_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_ALGEBRA_HAS_BOOST_SYSTEM
253 IMP_ALGEBRA_HAS_CGAL = _IMP_parallel.IMP_ALGEBRA_HAS_CGAL
254 IMP_ALGEBRA_HAS_ANN = _IMP_parallel.IMP_ALGEBRA_HAS_ANN
255 import IMP.algebra
256 IMP_KERNEL_HAS_IMP_CGAL = _IMP_parallel.IMP_KERNEL_HAS_IMP_CGAL
257 IMP_KERNEL_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_KERNEL_HAS_BOOST_PROGRAMOPTIONS
258 IMP_KERNEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_KERNEL_HAS_BOOST_RANDOM
259 IMP_KERNEL_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_KERNEL_HAS_BOOST_SYSTEM
260 IMP_KERNEL_HAS_CGAL = _IMP_parallel.IMP_KERNEL_HAS_CGAL
261 import IMP.kernel
262 IMP_PARALLEL_HAS_IMP_ALGEBRA = _IMP_parallel.IMP_PARALLEL_HAS_IMP_ALGEBRA
263 IMP_PARALLEL_HAS_IMP_BASE = _IMP_parallel.IMP_PARALLEL_HAS_IMP_BASE
264 IMP_PARALLEL_HAS_IMP_CGAL = _IMP_parallel.IMP_PARALLEL_HAS_IMP_CGAL
265 IMP_PARALLEL_HAS_BOOST_FILESYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_FILESYSTEM
266 IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS
267 IMP_PARALLEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_RANDOM
268 IMP_PARALLEL_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_SYSTEM
269 IMP_PARALLEL_HAS_CGAL = _IMP_parallel.IMP_PARALLEL_HAS_CGAL
270 import socket
271 import time
272 import sys
273 import os
274 import re
275 import random
276 import socket
277 import xdrlib
278 try:
279  import cPickle as pickle
280 except ImportError:
281  import pickle
282 from IMP.parallel import slavestate
283 from IMP.parallel.subproc import _run_background, _Popen4
284 from IMP.parallel.util import _ListenSocket, _ErrorWrapper
285 from IMP.parallel.util import _TaskWrapper, _HeartBeat, _ContextWrapper
286 from IMP.parallel.util import _SetPathAction
287 
288 # Save sys.path at import time, so that slaves can import using the same
289 # path that works for the master imports
290 _import_time_path = sys.path[:]
291 
292 class Error(Exception):
293  """Base class for all errors specific to the parallel module"""
294  pass
295 
296 
297 class _NoMoreTasksError(Error):
298  pass
299 
300 
301 class NoMoreSlavesError(Error):
302  """Error raised if all slaves failed, so tasks cannot be run"""
303  pass
304 
305 
306 class NetworkError(Error):
307  """Error raised if a problem occurs with the network"""
308  pass
309 
310 
311 class RemoteError(Error):
312  """Error raised if a slave has an unhandled exception"""
313  def __init__(self, exc, traceback, slave):
314  self.exc = exc
315  self.traceback = traceback
316  self.slave = slave
317 
318  def __str__(self):
319  errstr = str(self.exc.__class__).replace("exceptions.", "")
320  return "%s: %s from %s\nRemote traceback:\n%s" \
321  % (errstr, str(self.exc), str(self.slave), self.traceback)
322 
323 class _Communicator(object):
324  """Simple support for sending Python pickled objects over the network"""
325 
326  def __init__(self):
327  self._socket = None
328  self._ibuffer = ''
329 
330  def _send(self, obj):
331  p = xdrlib.Packer()
332  try:
333  p.pack_string(pickle.dumps(obj, -1))
334  # Python < 2.5 can fail trying to send Inf or NaN floats in binary
335  # mode, so fall back to the old protocol in this case:
336  except SystemError:
337  p.pack_string(pickle.dumps(obj, 0))
338  self._socket.sendall(p.get_buffer())
339 
340  def get_data_pending(self):
341  return len(self._ibuffer) > 0
342 
343  def _recv(self):
344  while True:
345  try:
346  obj, self._ibuffer = self._unpickle(self._ibuffer)
347  if isinstance(obj, _ErrorWrapper):
348  raise RemoteError(obj.obj, obj.traceback, self)
349  else:
350  return obj
351  except (IndexError, EOFError):
352  try:
353  data = self._socket.recv(4096)
354  except socket.error, detail:
355  raise NetworkError("Connection lost to %s: %s" \
356  % (str(self), str(detail)))
357  if len(data) > 0:
358  self._ibuffer += data
359  else:
360  raise NetworkError("%s closed connection" % str(self))
361 
362  def _unpickle(self, ibuffer):
363  p = xdrlib.Unpacker(ibuffer)
364  obj = p.unpack_string()
365  return (pickle.loads(obj), ibuffer[p.get_position():])
366 
367 
368 class Slave(_Communicator):
369  """Representation of a single slave.
370  Each slave uses a single thread of execution (i.e. a single CPU core)
371  to run tasks sequentially.
372  Slave is an abstract class; instead of using this class directly, use
373  a subclass such as LocalSlave or SGEQsubSlaveArray."""
374 
375  def __init__(self):
376  _Communicator.__init__(self)
377  self._state = slavestate.init
378  self._context = None
379  self._task = None
380  self.update_contact_time()
381 
382  def _start(self, command, unique_id, output):
383  """Start the slave running on the remote host; override in subclasses"""
384  self._state = slavestate.started
385 
386  def _accept_connection(self, sock):
387  self._socket = sock
388  self._state = slavestate.connected
389  self.update_contact_time()
390 
391  def _set_python_search_path(self, path):
392  self._send(_SetPathAction(path))
393 
394  def update_contact_time(self):
395  self.last_contact_time = time.time()
396 
397  def get_contact_timed_out(self, timeout):
398  return (time.time() - self.last_contact_time) > timeout
399 
400  def _start_task(self, task, context):
401  if not self.ready_for_task(context) and not self.ready_for_task(None):
402  raise TypeError("%s not ready for task" % str(self))
403  if self._context != context:
404  self._context = context
405  self._send(_ContextWrapper(context._startup))
406  self._state = slavestate.running_task
407  self._task = task
408  self._send(_TaskWrapper(task))
409 
410  def _get_finished_task(self):
411  while True:
412  r = self._recv()
413  self.update_contact_time()
414  if isinstance(r, _HeartBeat):
415  if not self.get_data_pending():
416  return None
417  else:
418  break
419  task = self._task
420  task._results = r
421  self._task = None
422  self._state = slavestate.connected
423  return task
424 
425  def _kill(self):
426  task = self._task
427  self._task = None
428  self._context = None
429  self._state = slavestate.dead
430  return task
431 
432  def ready_to_start(self):
433  return self._state == slavestate.init
434 
435  def ready_for_task(self, context):
436  return self._state == slavestate.connected \
437  and self._context == context
438 
439  def running_task(self, context):
440  return self._state == slavestate.running_task \
441  and self._context == context
442 
443 
444 class SlaveArray(object):
445  """Representation of an array of slaves.
446  This is similar to Slave, except that it represents a collection of
447  slaves that are controlled together, such as a batch submission system
448  array job on a compute cluster."""
449 
450  def _get_slaves(self):
451  """Return a list of Slave objects contained within this array"""
452  pass
453 
454  def _start(self):
455  """Do any necessary startup after all contained Slaves have started"""
456  pass
457 
458 
459 class LocalSlave(Slave):
460  """A slave running on the same machine as the master."""
461 
462  def _start(self, command, unique_id, output):
463  Slave._start(self, command, unique_id, output)
464  cmdline = "%s %s" % (command, unique_id)
465  _run_background(cmdline, output)
466 
467  def __repr__(self):
468  return "<LocalSlave>"
469 
470 
471 class _SGEQsubSlave(Slave):
472  def __init__(self, array):
473  Slave.__init__(self)
474  self._jobid = None
475  self._array = array
476 
477  def _start(self, command, unique_id, output):
478  Slave._start(self, command, unique_id, output)
479  self._array._slave_started(unique_id, output, self)
480 
481  def __repr__(self):
482  jobid = self._jobid
483  if jobid is None:
484  jobid = '(unknown)'
485  return "<SGE qsub slave, ID %s>" % jobid
486 
487 
488 class SGEQsubSlaveArray(SlaveArray):
489  """An array of slaves on a Sun Grid Engine system, started with 'qsub'.
490  To use this class, the master process must be running on a machine that
491  can submit Sun Grid Engine (SGE) jobs using the 'qsub' command (this
492  is termed a 'submit host' by SGE). The class starts an SGE job array
493  (every slave has the same SGE job ID, but a different task ID).
494  @param numslave The number of slaves, which correponds to the number of
495  tasks in the SGE job.
496  @param options A string of SGE options that are passed on the 'qsub'
497  command line. This is added to standard_options.
498  """
499 
500 
501  standard_options = '-j y -cwd -r n -o sge-errors'
502 
503  def __init__(self, numslave, options):
504  self._numslave = numslave
505  self._options = options
506  self._starting_slaves = []
507  self._jobid = None
508 
509  def _get_slaves(self):
510  """Return a list of Slave objects contained within this array"""
511  return [_SGEQsubSlave(self) for x in range(self._numslave)]
512 
513  def _slave_started(self, command, output, slave):
514  self._starting_slaves.append((command, output, slave))
515 
516  def _start(self, command):
517  qsub = "qsub -S /bin/sh %s %s -t 1-%d" % \
518  (self._options, self.standard_options,
519  len(self._starting_slaves))
520  print qsub
521  a = _Popen4(qsub)
522  (inp, out) = (a.stdin, a.stdout)
523  slave_uid = " ".join([repr(s[0]) for s in self._starting_slaves])
524  slave_out = " ".join([repr(s[1]) for s in self._starting_slaves])
525  inp.write("#!/bin/sh\n")
526  inp.write("uid=( '' %s )\n" % slave_uid)
527  inp.write("out=( '' %s )\n" % slave_out)
528  inp.write("myuid=${uid[$SGE_TASK_ID]}\n")
529  inp.write("myout=${out[$SGE_TASK_ID]}\n")
530  inp.write("%s $myuid > $myout 2>&1\n" % command)
531  inp.close()
532  outlines = out.readlines()
533  out.close()
534  for line in outlines:
535  print line.rstrip('\r\n')
536  a.require_clean_exit()
537  self._set_jobid(outlines)
538  self._starting_slaves = []
539 
540  def _set_jobid(self, outlines):
541  """Try to figure out the job ID from the SGE qsub output"""
542  if len(outlines) > 0:
543  m = re.compile(r"\d+").search(outlines[0])
544  if m:
545  self._jobid = int(m.group())
546  for (num, slave) in enumerate(self._starting_slaves):
547  slave[2]._jobid = "%d.%d" % (self._jobid, num+1)
548 
549 
550 class _SGEPESlave(Slave):
551  def __init__(self, host):
552  Slave.__init__(self)
553  self._host = host
554 
555  def _start(self, command, unique_id, output):
556  Slave._start(self, command, unique_id, output)
557  cmdline = "qrsh -inherit -V %s %s %s" % (self._host, command, unique_id)
558  _run_background(cmdline, output)
559 
560  def __repr__(self):
561  return "<SGE PE slave on %s>" % self._host
562 
563 
564 class SGEPESlaveArray(SlaveArray):
565  """An array of slaves in a Sun Grid Engine system parallel environment.
566  In order to use this class, the master must be run via Sun Grid Engine's
567  'qsub' command and submitted to a parallel environment using the qsub
568  -pe option. This class will start slaves on every node in the parallel
569  environment (including the node running the master). Each slave is
570  started using the 'qrsh' command with the '-inherit' option."""
571 
572  def _get_slaves(self):
573  slaves = []
574 
575  pe = os.environ['PE_HOSTFILE']
576  fh = open(pe, "r")
577  while True:
578  line = fh.readline()
579  if line == '':
580  break
581  (node, num, queue) = line.split(None, 2)
582  for i in range(int(num)):
583  slaves.append(_SGEPESlave(node))
584  # Replace first slave with a local slave, as this is ourself, and SGE
585  # won't let us start this process with qrsh (as we are already
586  # occupying the slot)
587  if len(slaves) > 0:
588  slaves[0] = LocalSlave()
589  return slaves
590 
591 
592 class Context(object):
593  """A collection of tasks that run in the same environment.
594  Context objects are typically created by calling Manager::get_context().
595  """
596  def __init__(self, manager, startup=None):
597  self._manager = manager
598  self._startup = startup
599  self._tasks = []
600 
601  def add_task(self, task):
602  """Add a task to this context.
603  Tasks are any Python callable object that can be pickled (e.g. a
604  function or a class that implements the __call__ method). When the
605  task is run on the slave its arguments are the return value from this
606  context's startup function."""
607  self._tasks.append(task)
608 
609  def get_results_unordered(self):
610  """Run all of the tasks on available slaves, and return results.
611  If there are more tasks than slaves, subsequent tasks are
612  started only once a running task completes: each slave only runs
613  a single task at a time. As each task completes, the return value(s)
614  from the task callable are returned from this method, as a
615  Python generator. Note that the results are returned in the order
616  that tasks complete, which may not be the same as the order they
617  were submitted in.
618 
619  \exception NoMoreSlavesError there are no slaves available
620  to run the tasks (or they all failed during execution).
621  \exception RemoteError a slave encountered an unhandled exception.
622  \exception NetworkError the master lost (or was unable to
623  establish) communication with any slave.
624  """
625  return self._manager._get_results_unordered(self)
626 
627 
628 class Manager(object):
629  """Manages slaves and contexts.
630  @param python If not None, the command to run to start a Python
631  interpreter that can import the IMP module. Otherwise,
632  the same interpreter that the master is currently using
633  is used. This is passed to the shell, so a full command
634  line (including multiple words separated by spaces) can
635  be used if necessary.
636  @param host The hostname that slaves use to connect back to the master.
637  If not specified, the master machine's primary IP address
638  is used. On multi-homed machines, such as compute cluster
639  headnodes, this may need to be changed to allow all
640  slaves to reach the master (typically the name of the
641  machine's internal network address is needed). If only
642  running local slaves, 'localhost' can be used to prohibit
643  connections across the network.
644  @param output A format string used to name slave output files. It is
645  given the numeric slave id, so for example the default
646  value 'slave%d.output' will yield output files called
647  slave0.output, slave1.output, etc.
648  """
649 
650  connect_timeout = 7200
651 
652  # Note: must be higher than that in slave_handler._HeartBeatThread
653  heartbeat_timeout = 7200
654 
655  def __init__(self, python=None, host=None, output='slave%d.output'):
656  if python is None:
657  self._python = sys.executable
658  else:
659  self._python = python
660  self._host = host
661  self._output = output
662  self._all_slaves = []
663  self._starting_slaves = {}
664  self._slave_arrays = []
665  if host:
666  self._host = host
667  else:
668  # Get primary IP address of this machine
669  self._host = socket.gethostbyname_ex(socket.gethostname())[-1][0]
670  self._listen_sock = _ListenSocket(self._host, self.connect_timeout)
671 
672  def add_slave(self, slave):
673  """Add a Slave object."""
674  if hasattr(slave, '_get_slaves'):
675  self._slave_arrays.append(slave)
676  else:
677  self._all_slaves.append(slave)
678 
679  def get_context(self, startup=None):
680  """Create and return a new Context in which tasks can be run.
681  @param startup If not None, a callable (Python function or class
682  that implements the __call__ method) that sets up
683  the slave to run tasks. This method is only called
684  once per slave. The return values from this method
685  are passed to the task object when it runs on
686  the slave.
687  """
688  return Context(self, startup)
689 
690  def _get_results_unordered(self, context):
691  """Run all of a context's tasks, and yield results"""
692  self._send_tasks_to_slaves(context)
693  try:
694  while True:
695  for task in self._get_finished_tasks(context):
696  tasks_queued = len(context._tasks)
697  yield task._results
698  # If the user added more tasks while processing these
699  # results, make sure they get sent off to the slaves
700  if len(context._tasks) > tasks_queued:
701  self._send_tasks_to_slaves(context)
702  except _NoMoreTasksError:
703  return
704 
705  def _start_all_slaves(self):
706  for array in self._slave_arrays:
707  self._all_slaves.extend(array._get_slaves())
708 
709  command = ("%s -c \"import IMP.parallel.slave_handler as s; s.main()\" "
710  "%s %d") % (self._python, self._host, self._listen_sock.port)
711 
712  for (num, slave) in enumerate(self._all_slaves):
713  if slave.ready_to_start():
714  unique_id = self._get_unique_id(num)
715  self._starting_slaves[unique_id] = slave
716  slave._start(command, unique_id, self._output % num)
717 
718  for array in self._slave_arrays:
719  array._start(command)
720  self._slave_arrays = []
721 
722  def _get_unique_id(self, num):
723  id = "%d:" % num
724  for i in range(0, 8):
725  id += chr(random.randint(0, 25) + ord('A'))
726  return id
727 
728  def _send_tasks_to_slaves(self, context):
729  self._start_all_slaves()
730  # Prefer slaves that already have the task context
731  available_slaves = [a for a in self._all_slaves
732  if a.ready_for_task(context)] + \
733  [a for a in self._all_slaves
734  if a.ready_for_task(None)]
735  for slave in available_slaves:
736  if len(context._tasks) == 0:
737  break
738  else:
739  self._send_task_to_slave(slave, context)
740 
741  def _send_task_to_slave(self, slave, context):
742  if len(context._tasks) == 0:
743  return
744  t = context._tasks[0]
745  try:
746  slave._start_task(t, context)
747  context._tasks.pop(0)
748  except socket.error, detail:
749  slave._kill()
750 
751  def _get_finished_tasks(self, context):
752  while True:
753  events = self._get_network_events(context)
754  if len(events) == 0:
755  self._kill_all_running_slaves(context)
756  for event in events:
757  task = self._process_event(event, context)
758  if task:
759  yield task
760 
761  def _process_event(self, event, context):
762  if event == self._listen_sock:
763  # New slave just connected
764  (conn, addr) = self._listen_sock.accept()
765  new_slave = self._accept_slave(conn, context)
766  elif event.running_task(context):
767  try:
768  task = event._get_finished_task()
769  if task:
770  self._send_task_to_slave(event, context)
771  return task
772  else: # the slave sent back a heartbeat
773  self._kill_timed_out_slaves(context)
774  except NetworkError, detail:
775  task = event._kill()
776  print "Slave %s failed (%s): rescheduling task %s" \
777  % (str(event), str(detail), str(task))
778  context._tasks.append(task)
779  self._send_tasks_to_slaves(context)
780  else:
781  pass # Slave not running a task
782 
783  def _kill_timed_out_slaves(self, context):
784  timed_out = [a for a in self._all_slaves if a.running_task(context) \
785  and a.get_contact_timed_out(self.heartbeat_timeout)]
786  for slave in timed_out:
787  task = slave._kill()
788  print("Did not hear from slave %s in %d seconds; rescheduling "
789  "task %s" % (str(slave), self.heartbeat_timeout, str(task)))
790  context._tasks.append(task)
791  if len(timed_out) > 0:
792  self._send_tasks_to_slaves(context)
793 
794  def _kill_all_running_slaves(self, context):
795  running = [a for a in self._all_slaves if a.running_task(context)]
796  for slave in running:
797  task = slave._kill()
798  context._tasks.append(task)
799  raise NetworkError("Did not hear from any running slave in "
800  "%d seconds" % self.heartbeat_timeout)
801 
802  def _accept_slave(self, sock, context):
803  sock.setblocking(True)
804  identifier = sock.recv(1024)
805  if identifier and identifier in self._starting_slaves:
806  slave = self._starting_slaves.pop(identifier)
807  slave._accept_connection(sock)
808  print "Identified slave %s " % str(slave)
809  self._init_slave(slave)
810  self._send_task_to_slave(slave, context)
811  return slave
812  else:
813  print "Ignoring request from unknown slave"
814 
815  def _init_slave(self, slave):
816  if _import_time_path[0] != '':
817  slave._set_python_search_path(_import_time_path[0])
818  if sys.path[0] != '' and sys.path[0] != _import_time_path[0]:
819  slave._set_python_search_path(sys.path[0])
820 
821  def _get_network_events(self, context):
822  running = [a for a in self._all_slaves if a.running_task(context)]
823  if len(running) == 0:
824  if len(context._tasks) == 0:
825  raise _NoMoreTasksError()
826  elif len(self._starting_slaves) == 0:
827  raise NoMoreSlavesError("Ran out of slaves to run tasks")
828  # Otherwise, wait for starting slaves to connect back and get tasks
829 
830  return util._poll_events(self._listen_sock, running,
831  self.heartbeat_timeout)
832 
833 
834 
835 def get_module_version():
836  """get_module_version() -> std::string const"""
837  return _IMP_parallel.get_module_version()
838 
839 def get_example_path(*args):
840  """get_example_path(std::string fname) -> std::string"""
841  return _IMP_parallel.get_example_path(*args)
842 
843 def get_data_path(*args):
844  """get_data_path(std::string fname) -> std::string"""
845  return _IMP_parallel.get_data_path(*args)
846 import _version_check
847 _version_check.check_version(get_module_version())
848 
849 # This file is compatible with both classic and new-style classes.
850 
851