11 from sys
import version_info
12 if version_info >= (2,6,0):
13 def swig_import_helper():
14 from os.path
import dirname
18 fp, pathname, description = imp.find_module(
'_IMP_parallel', [dirname(__file__)])
24 _mod = imp.load_module(
'_IMP_parallel', fp, pathname, description)
28 _IMP_parallel = swig_import_helper()
29 del swig_import_helper
34 _swig_property = property
37 def _swig_setattr_nondynamic(self,class_type,name,value,static=1):
38 if (name ==
"thisown"):
return self.this.own(value)
40 if type(value).__name__ ==
'SwigPyObject':
41 self.__dict__[name] = value
43 method = class_type.__swig_setmethods__.get(name,
None)
44 if method:
return method(self,value)
46 self.__dict__[name] = value
48 raise AttributeError(
"You cannot add attributes to %s" % self)
50 def _swig_setattr(self,class_type,name,value):
51 return _swig_setattr_nondynamic(self,class_type,name,value,0)
53 def _swig_getattr(self,class_type,name):
54 if (name ==
"thisown"):
return self.this.own()
55 method = class_type.__swig_getmethods__.get(name,
None)
56 if method:
return method(self)
57 raise AttributeError(name)
60 try: strthis =
"proxy of " + self.this.__repr__()
62 return "<%s.%s; %s >" % (self.__class__.__module__, self.__class__.__name__, strthis,)
67 except AttributeError:
74 weakref_proxy = weakref.proxy
76 weakref_proxy =
lambda x: x
79 class IMP_PARALLEL_SwigPyIterator(_object):
80 """Proxy of C++ swig::IMP_PARALLEL_SwigPyIterator class"""
81 __swig_setmethods__ = {}
82 __setattr__ =
lambda self, name, value: _swig_setattr(self, IMP_PARALLEL_SwigPyIterator, name, value)
83 __swig_getmethods__ = {}
84 __getattr__ =
lambda self, name: _swig_getattr(self, IMP_PARALLEL_SwigPyIterator, name)
85 def __init__(self, *args, **kwargs):
raise AttributeError(
"No constructor defined - class is abstract")
87 __swig_destroy__ = _IMP_parallel.delete_IMP_PARALLEL_SwigPyIterator
88 __del__ =
lambda self :
None;
90 """value(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
91 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_value(self)
95 incr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
96 incr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
98 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_incr(self, n)
102 decr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
103 decr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
105 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_decr(self, n)
107 def distance(self, *args):
108 """distance(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t"""
109 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_distance(self, *args)
111 def equal(self, *args):
112 """equal(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
113 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_equal(self, *args)
116 """copy(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator"""
117 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_copy(self)
120 """next(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
121 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_next(self)
124 """__next__(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
125 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___next__(self)
128 """previous(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
129 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_previous(self)
131 def advance(self, *args):
132 """advance(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
133 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_advance(self, *args)
135 def __eq__(self, *args):
136 """__eq__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
137 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___eq__(self, *args)
139 def __ne__(self, *args):
140 """__ne__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
141 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___ne__(self, *args)
143 def __iadd__(self, *args):
144 """__iadd__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
145 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___iadd__(self, *args)
147 def __isub__(self, *args):
148 """__isub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
149 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___isub__(self, *args)
151 def __add__(self, *args):
152 """__add__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
153 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___add__(self, *args)
155 def __sub__(self, *args):
157 __sub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator
158 __sub__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t
160 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___sub__(self, *args)
162 def __iter__(self):
return self
163 IMP_PARALLEL_SwigPyIterator_swigregister = _IMP_parallel.IMP_PARALLEL_SwigPyIterator_swigregister
164 IMP_PARALLEL_SwigPyIterator_swigregister(IMP_PARALLEL_SwigPyIterator)
171 IMP_DEBUG = _IMP_parallel.IMP_DEBUG
172 IMP_RELEASE = _IMP_parallel.IMP_RELEASE
173 IMP_SILENT = _IMP_parallel.IMP_SILENT
174 IMP_PROGRESS = _IMP_parallel.IMP_PROGRESS
175 IMP_TERSE = _IMP_parallel.IMP_TERSE
176 IMP_VERBOSE = _IMP_parallel.IMP_VERBOSE
177 IMP_MEMORY = _IMP_parallel.IMP_MEMORY
178 IMP_NONE = _IMP_parallel.IMP_NONE
179 IMP_USAGE = _IMP_parallel.IMP_USAGE
180 IMP_INTERNAL = _IMP_parallel.IMP_INTERNAL
181 IMP_BASE_HAS_LOG4CXX = _IMP_parallel.IMP_BASE_HAS_LOG4CXX
182 IMP_COMPILER_HAS_AUTO = _IMP_parallel.IMP_COMPILER_HAS_AUTO
183 IMP_COMPILER_HAS_DEBUG_VECTOR = _IMP_parallel.IMP_COMPILER_HAS_DEBUG_VECTOR
184 IMP_BASE_HAS_BOOST_RANDOM = _IMP_parallel.IMP_BASE_HAS_BOOST_RANDOM
185 IMP_BASE_HAS_GPERFTOOLS = _IMP_parallel.IMP_BASE_HAS_GPERFTOOLS
186 IMP_BASE_HAS_TCMALLOC_HEAPCHECKER = _IMP_parallel.IMP_BASE_HAS_TCMALLOC_HEAPCHECKER
187 IMP_BASE_HAS_TCMALLOC_HEAPPROFILER = _IMP_parallel.IMP_BASE_HAS_TCMALLOC_HEAPPROFILER
188 IMPBASE_SHOW_WARNINGS = _IMP_parallel.IMPBASE_SHOW_WARNINGS
190 class _DirectorObjects(object):
191 """@internal Simple class to keep references to director objects
192 to prevent premature deletion."""
195 def register(self, obj):
196 """Take a reference to a director object; will only work for
197 refcounted C++ classes"""
198 if hasattr(obj,
'get_ref_count'):
199 self._objects.append(obj)
201 """Only drop our reference and allow cleanup by Python if no other
202 Python references exist (we hold 3 references: one in self._objects,
203 one in x, and one in the argument list for getrefcount) *and* no
204 other C++ references exist (the Python object always holds one)"""
205 objs = [x
for x
in self._objects
if sys.getrefcount(x) > 3 \
206 or x.get_ref_count() > 1]
210 def get_object_count(self):
211 """Get number of director objects (useful for testing only)"""
212 return len(self._objects)
213 _director_objects = _DirectorObjects()
215 DEFAULT_CHECK = _IMP_parallel.DEFAULT_CHECK
216 NONE = _IMP_parallel.NONE
217 USAGE = _IMP_parallel.USAGE
218 USAGE_AND_INTERNAL = _IMP_parallel.USAGE_AND_INTERNAL
221 """set_check_level(IMP::base::CheckLevel tf)"""
222 return _IMP_parallel.set_check_level(*args)
225 """get_check_level() -> IMP::base::CheckLevel"""
226 return _IMP_parallel.get_check_level()
227 class _ostream(_object):
228 """Proxy of C++ std::ostream class"""
229 __swig_setmethods__ = {}
230 __setattr__ =
lambda self, name, value: _swig_setattr(self, _ostream, name, value)
231 __swig_getmethods__ = {}
232 __getattr__ =
lambda self, name: _swig_getattr(self, _ostream, name)
233 def __init__(self, *args, **kwargs):
raise AttributeError(
"No constructor defined")
234 __repr__ = _swig_repr
235 def write(self, *args):
236 """write(_ostream self, char const * osa_buf)"""
237 return _IMP_parallel._ostream_write(self, *args)
239 _ostream_swigregister = _IMP_parallel._ostream_swigregister
240 _ostream_swigregister(_ostream)
242 IMP_COMPILER_HAS_OVERRIDE = _IMP_parallel.IMP_COMPILER_HAS_OVERRIDE
243 IMP_COMPILER_HAS_FINAL = _IMP_parallel.IMP_COMPILER_HAS_FINAL
244 IMP_HAS_NOEXCEPT = _IMP_parallel.IMP_HAS_NOEXCEPT
246 IMP_CGAL_HAS_BOOST_FILESYSTEM = _IMP_parallel.IMP_CGAL_HAS_BOOST_FILESYSTEM
247 IMP_CGAL_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_CGAL_HAS_BOOST_PROGRAMOPTIONS
248 IMP_CGAL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_CGAL_HAS_BOOST_RANDOM
249 IMP_CGAL_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_CGAL_HAS_BOOST_SYSTEM
250 IMPCGAL_SHOW_WARNINGS = _IMP_parallel.IMPCGAL_SHOW_WARNINGS
252 IMP_ALGEBRA_HAS_IMP_CGAL = _IMP_parallel.IMP_ALGEBRA_HAS_IMP_CGAL
253 IMP_ALGEBRA_HAS_BOOST_FILESYSTEM = _IMP_parallel.IMP_ALGEBRA_HAS_BOOST_FILESYSTEM
254 IMP_ALGEBRA_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_ALGEBRA_HAS_BOOST_PROGRAMOPTIONS
255 IMP_ALGEBRA_HAS_BOOST_RANDOM = _IMP_parallel.IMP_ALGEBRA_HAS_BOOST_RANDOM
256 IMP_ALGEBRA_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_ALGEBRA_HAS_BOOST_SYSTEM
257 IMP_ALGEBRA_HAS_CGAL = _IMP_parallel.IMP_ALGEBRA_HAS_CGAL
258 IMP_ALGEBRA_HAS_ANN = _IMP_parallel.IMP_ALGEBRA_HAS_ANN
259 IMPALGEBRA_SHOW_WARNINGS = _IMP_parallel.IMPALGEBRA_SHOW_WARNINGS
261 IMP_KERNEL_HAS_IMP_CGAL = _IMP_parallel.IMP_KERNEL_HAS_IMP_CGAL
262 IMP_KERNEL_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_KERNEL_HAS_BOOST_PROGRAMOPTIONS
263 IMP_KERNEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_KERNEL_HAS_BOOST_RANDOM
264 IMP_KERNEL_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_KERNEL_HAS_BOOST_SYSTEM
265 IMP_KERNEL_HAS_CGAL = _IMP_parallel.IMP_KERNEL_HAS_CGAL
266 IMPKERNEL_SHOW_WARNINGS = _IMP_parallel.IMPKERNEL_SHOW_WARNINGS
268 IMP_PARALLEL_HAS_IMP_ALGEBRA = _IMP_parallel.IMP_PARALLEL_HAS_IMP_ALGEBRA
269 IMP_PARALLEL_HAS_IMP_BASE = _IMP_parallel.IMP_PARALLEL_HAS_IMP_BASE
270 IMP_PARALLEL_HAS_IMP_CGAL = _IMP_parallel.IMP_PARALLEL_HAS_IMP_CGAL
271 IMP_PARALLEL_HAS_BOOST_FILESYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_FILESYSTEM
272 IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS
273 IMP_PARALLEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_RANDOM
274 IMP_PARALLEL_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_SYSTEM
275 IMP_PARALLEL_HAS_CGAL = _IMP_parallel.IMP_PARALLEL_HAS_CGAL
276 IMPPARALLEL_SHOW_WARNINGS = _IMP_parallel.IMPPARALLEL_SHOW_WARNINGS
286 import cPickle
as pickle
297 _import_time_path = sys.path[:]
300 """Base class for all errors specific to the parallel module"""
304 class _NoMoreTasksError(Error):
309 """Error raised if all slaves failed, so tasks cannot be run"""
314 """Error raised if a problem occurs with the network"""
319 """Error raised if a slave has an unhandled exception"""
320 def __init__(self, exc, traceback, slave):
322 self.traceback = traceback
326 errstr = str(self.exc.__class__).replace(
"exceptions.",
"")
327 return "%s: %s from %s\nRemote traceback:\n%s" \
328 % (errstr, str(self.exc), str(self.slave), self.traceback)
330 class _Communicator(object):
331 """Simple support for sending Python pickled objects over the network"""
337 def _send(self, obj):
340 p.pack_string(pickle.dumps(obj, -1))
344 p.pack_string(pickle.dumps(obj, 0))
345 self._socket.sendall(p.get_buffer())
347 def get_data_pending(self):
348 return len(self._ibuffer) > 0
353 obj, self._ibuffer = self._unpickle(self._ibuffer)
354 if isinstance(obj, _ErrorWrapper):
358 except (IndexError, EOFError):
360 data = self._socket.recv(4096)
361 except socket.error, detail:
363 % (str(self), str(detail)))
365 self._ibuffer += data
369 def _unpickle(self, ibuffer):
370 p = xdrlib.Unpacker(ibuffer)
371 obj = p.unpack_string()
372 return (pickle.loads(obj), ibuffer[p.get_position():])
376 """Representation of a single slave.
377 Each slave uses a single thread of execution (i.e. a single CPU core)
378 to run tasks sequentially.
379 Slave is an abstract class; instead of using this class directly, use
380 a subclass such as LocalSlave or SGEQsubSlaveArray."""
383 _Communicator.__init__(self)
384 self._state = slavestate.init
387 self.update_contact_time()
389 def _start(self, command, unique_id, output):
390 """Start the slave running on the remote host; override in subclasses"""
391 self._state = slavestate.started
393 def _accept_connection(self, sock):
395 self._state = slavestate.connected
396 self.update_contact_time()
398 def _set_python_search_path(self, path):
399 self._send(_SetPathAction(path))
401 def update_contact_time(self):
402 self.last_contact_time = time.time()
404 def get_contact_timed_out(self, timeout):
405 return (time.time() - self.last_contact_time) > timeout
407 def _start_task(self, task, context):
408 if not self.ready_for_task(context)
and not self.ready_for_task(
None):
409 raise TypeError(
"%s not ready for task" % str(self))
410 if self._context != context:
411 self._context = context
412 self._send(_ContextWrapper(context._startup))
413 self._state = slavestate.running_task
415 self._send(_TaskWrapper(task))
417 def _get_finished_task(self):
420 self.update_contact_time()
421 if isinstance(r, _HeartBeat):
422 if not self.get_data_pending():
429 self._state = slavestate.connected
436 self._state = slavestate.dead
439 def ready_to_start(self):
440 return self._state == slavestate.init
442 def ready_for_task(self, context):
443 return self._state == slavestate.connected \
444 and self._context == context
446 def running_task(self, context):
447 return self._state == slavestate.running_task \
448 and self._context == context
452 """Representation of an array of slaves.
453 This is similar to Slave, except that it represents a collection of
454 slaves that are controlled together, such as a batch submission system
455 array job on a compute cluster.
456 Slave is an abstract class; instead of using this class directly, use
457 a subclass such as SGEQsubSlaveArray."""
459 def _get_slaves(self):
460 """Return a list of Slave objects contained within this array"""
464 """Do any necessary startup after all contained Slaves have started"""
468 class LocalSlave(Slave):
469 """A slave running on the same machine as the master."""
471 def _start(self, command, unique_id, output):
472 Slave._start(self, command, unique_id, output)
473 cmdline =
"%s %s" % (command, unique_id)
474 _run_background(cmdline, output)
477 return "<LocalSlave>"
480 class _SGEQsubSlave(Slave):
481 def __init__(self, array):
486 def _start(self, command, unique_id, output):
487 Slave._start(self, command, unique_id, output)
488 self._array._slave_started(unique_id, output, self)
494 return "<SGE qsub slave, ID %s>" % jobid
498 """An array of slaves on a Sun Grid Engine system, started with 'qsub'.
499 To use this class, the master process must be running on a machine that
500 can submit Sun Grid Engine (SGE) jobs using the 'qsub' command (this
501 is termed a 'submit host' by SGE). The class starts an SGE job array
502 (every slave has the same SGE job ID, but a different task ID).
503 @param numslave The number of slaves, which correponds to the number of
504 tasks in the SGE job.
505 @param options A string of SGE options that are passed on the 'qsub'
506 command line. This is added to standard_options.
510 standard_options =
'-j y -cwd -r n -o sge-errors'
512 def __init__(self, numslave, options):
513 self._numslave = numslave
514 self._options = options
515 self._starting_slaves = []
518 def _get_slaves(self):
519 """Return a list of Slave objects contained within this array"""
520 return [_SGEQsubSlave(self)
for x
in range(self._numslave)]
522 def _slave_started(self, command, output, slave):
523 self._starting_slaves.append((command, output, slave))
525 def _start(self, command):
526 qsub =
"qsub -S /bin/sh %s %s -t 1-%d" % \
527 (self._options, self.standard_options,
528 len(self._starting_slaves))
531 (inp, out) = (a.stdin, a.stdout)
532 slave_uid =
" ".join([repr(s[0])
for s
in self._starting_slaves])
533 slave_out =
" ".join([repr(s[1])
for s
in self._starting_slaves])
534 inp.write(
"#!/bin/sh\n")
535 inp.write(
"uid=( '' %s )\n" % slave_uid)
536 inp.write(
"out=( '' %s )\n" % slave_out)
537 inp.write(
"myuid=${uid[$SGE_TASK_ID]}\n")
538 inp.write(
"myout=${out[$SGE_TASK_ID]}\n")
539 inp.write(
"%s $myuid > $myout 2>&1\n" % command)
541 outlines = out.readlines()
543 for line
in outlines:
544 print line.rstrip(
'\r\n')
545 a.require_clean_exit()
546 self._set_jobid(outlines)
547 self._starting_slaves = []
549 def _set_jobid(self, outlines):
550 """Try to figure out the job ID from the SGE qsub output"""
551 if len(outlines) > 0:
552 m = re.compile(
r"\d+").search(outlines[0])
554 self._jobid = int(m.group())
555 for (num, slave)
in enumerate(self._starting_slaves):
556 slave[2]._jobid =
"%d.%d" % (self._jobid, num+1)
559 class _SGEPESlave(
Slave):
560 def __init__(self, host):
564 def _start(self, command, unique_id, output):
565 Slave._start(self, command, unique_id, output)
566 cmdline =
"qrsh -inherit -V %s %s %s" % (self._host, command, unique_id)
567 _run_background(cmdline, output)
570 return "<SGE PE slave on %s>" % self._host
574 """An array of slaves in a Sun Grid Engine system parallel environment.
575 In order to use this class, the master must be run via Sun Grid Engine's
576 'qsub' command and submitted to a parallel environment using the qsub
577 -pe option. This class will start slaves on every node in the parallel
578 environment (including the node running the master). Each slave is
579 started using the 'qrsh' command with the '-inherit' option."""
581 def _get_slaves(self):
584 pe = os.environ[
'PE_HOSTFILE']
590 (node, num, queue) = line.split(
None, 2)
591 for i
in range(int(num)):
592 slaves.append(_SGEPESlave(node))
602 """A collection of tasks that run in the same environment.
603 Context objects are typically created by calling Manager::get_context().
605 def __init__(self, manager, startup=None):
606 self._manager = manager
607 self._startup = startup
611 """Add a task to this context.
612 Tasks are any Python callable object that can be pickled (e.g. a
613 function or a class that implements the \_\_call\_\_ method). When
614 the task is run on the slave its arguments are the return value
615 from this context's startup function."""
616 self._tasks.append(task)
619 """Run all of the tasks on available slaves, and return results.
620 If there are more tasks than slaves, subsequent tasks are
621 started only once a running task completes: each slave only runs
622 a single task at a time. As each task completes, the return value(s)
623 from the task callable are returned from this method, as a
624 Python generator. Note that the results are returned in the order
625 that tasks complete, which may not be the same as the order they
628 \exception NoMoreSlavesError there are no slaves available
629 to run the tasks (or they all failed during execution).
630 \exception RemoteError a slave encountered an unhandled exception.
631 \exception NetworkError the master lost (or was unable to
632 establish) communication with any slave.
634 return self._manager._get_results_unordered(self)
638 """Manages slaves and contexts.
639 @param python If not None, the command to run to start a Python
640 interpreter that can import the IMP module. Otherwise,
641 the same interpreter that the master is currently using
642 is used. This is passed to the shell, so a full command
643 line (including multiple words separated by spaces) can
644 be used if necessary.
645 @param host The hostname that slaves use to connect back to the master.
646 If not specified, the master machine's primary IP address
647 is used. On multi-homed machines, such as compute cluster
648 headnodes, this may need to be changed to allow all
649 slaves to reach the master (typically the name of the
650 machine's internal network address is needed). If only
651 running local slaves, 'localhost' can be used to prohibit
652 connections across the network.
653 @param output A format string used to name slave output files. It is
654 given the numeric slave id, so for example the default
655 value 'slave\%d.output' will yield output files called
656 slave0.output, slave1.output, etc.
659 connect_timeout = 7200
662 heartbeat_timeout = 7200
664 def __init__(self, python=None, host=None, output='slave%d.output'):
666 self._python = sys.executable
668 self._python = python
670 self._output = output
671 self._all_slaves = []
672 self._starting_slaves = {}
673 self._slave_arrays = []
678 self._host = socket.gethostbyname_ex(socket.gethostname())[-1][0]
679 self._listen_sock = _ListenSocket(self._host, self.connect_timeout)
682 """Add a Slave object."""
683 if hasattr(slave,
'_get_slaves'):
684 self._slave_arrays.append(slave)
686 self._all_slaves.append(slave)
689 """Create and return a new Context in which tasks can be run.
690 @param startup If not None, a callable (Python function or class
691 that implements the \_\_call\_\_ method) that sets up
692 the slave to run tasks. This method is only called
693 once per slave. The return values from this method
694 are passed to the task object when it runs on
696 @return A new Context object.
700 def _get_results_unordered(self, context):
701 """Run all of a context's tasks, and yield results"""
702 self._send_tasks_to_slaves(context)
705 for task
in self._get_finished_tasks(context):
706 tasks_queued = len(context._tasks)
710 if len(context._tasks) > tasks_queued:
711 self._send_tasks_to_slaves(context)
712 except _NoMoreTasksError:
715 def _start_all_slaves(self):
716 for array
in self._slave_arrays:
717 self._all_slaves.extend(array._get_slaves())
719 command = (
"%s -c \"import IMP.parallel.slave_handler as s; s.main()\" "
720 "%s %d") % (self._python, self._host, self._listen_sock.port)
722 for (num, slave)
in enumerate(self._all_slaves):
723 if slave.ready_to_start():
724 unique_id = self._get_unique_id(num)
725 self._starting_slaves[unique_id] = slave
726 slave._start(command, unique_id, self._output % num)
728 for array
in self._slave_arrays:
729 array._start(command)
730 self._slave_arrays = []
732 def _get_unique_id(self, num):
734 for i
in range(0, 8):
735 id += chr(random.randint(0, 25) + ord(
'A'))
738 def _send_tasks_to_slaves(self, context):
739 self._start_all_slaves()
741 available_slaves = [a
for a
in self._all_slaves
742 if a.ready_for_task(context)] + \
743 [a
for a
in self._all_slaves
744 if a.ready_for_task(
None)]
745 for slave
in available_slaves:
746 if len(context._tasks) == 0:
749 self._send_task_to_slave(slave, context)
751 def _send_task_to_slave(self, slave, context):
752 if len(context._tasks) == 0:
754 t = context._tasks[0]
756 slave._start_task(t, context)
757 context._tasks.pop(0)
758 except socket.error, detail:
761 def _get_finished_tasks(self, context):
763 events = self._get_network_events(context)
765 self._kill_all_running_slaves(context)
767 task = self._process_event(event, context)
771 def _process_event(self, event, context):
772 if event == self._listen_sock:
774 (conn, addr) = self._listen_sock.accept()
775 new_slave = self._accept_slave(conn, context)
776 elif event.running_task(context):
778 task = event._get_finished_task()
780 self._send_task_to_slave(event, context)
783 self._kill_timed_out_slaves(context)
784 except NetworkError, detail:
786 print "Slave %s failed (%s): rescheduling task %s" \
787 % (str(event), str(detail), str(task))
788 context._tasks.append(task)
789 self._send_tasks_to_slaves(context)
793 def _kill_timed_out_slaves(self, context):
794 timed_out = [a
for a
in self._all_slaves
if a.running_task(context) \
795 and a.get_contact_timed_out(self.heartbeat_timeout)]
796 for slave
in timed_out:
798 print(
"Did not hear from slave %s in %d seconds; rescheduling "
799 "task %s" % (str(slave), self.heartbeat_timeout, str(task)))
800 context._tasks.append(task)
801 if len(timed_out) > 0:
802 self._send_tasks_to_slaves(context)
804 def _kill_all_running_slaves(self, context):
805 running = [a
for a
in self._all_slaves
if a.running_task(context)]
806 for slave
in running:
808 context._tasks.append(task)
809 raise NetworkError(
"Did not hear from any running slave in "
810 "%d seconds" % self.heartbeat_timeout)
812 def _accept_slave(self, sock, context):
813 sock.setblocking(
True)
814 identifier = sock.recv(1024)
815 if identifier
and identifier
in self._starting_slaves:
816 slave = self._starting_slaves.pop(identifier)
817 slave._accept_connection(sock)
818 print "Identified slave %s " % str(slave)
819 self._init_slave(slave)
820 self._send_task_to_slave(slave, context)
823 print "Ignoring request from unknown slave"
825 def _init_slave(self, slave):
826 if _import_time_path[0] !=
'':
827 slave._set_python_search_path(_import_time_path[0])
828 if sys.path[0] !=
'' and sys.path[0] != _import_time_path[0]:
829 slave._set_python_search_path(sys.path[0])
831 def _get_network_events(self, context):
832 running = [a
for a
in self._all_slaves
if a.running_task(context)]
833 if len(running) == 0:
834 if len(context._tasks) == 0:
835 raise _NoMoreTasksError()
836 elif len(self._starting_slaves) == 0:
840 return util._poll_events(self._listen_sock, running,
841 self.heartbeat_timeout)
845 def get_module_version():
846 """get_module_version() -> std::string const"""
847 return _IMP_parallel.get_module_version()
850 """get_example_path(std::string fname) -> std::string"""
851 return _IMP_parallel.get_example_path(*args)
854 """get_data_path(std::string fname) -> std::string"""
855 return _IMP_parallel.get_data_path(*args)
856 import _version_check
857 _version_check.check_version(get_module_version())
Representation of an array of slaves.
std::string get_data_path(std::string file_name)
Return the full path to installed data.
Error raised if a slave has an unhandled exception.
See IMP.cgal for more information.
void set_check_level(CheckLevel tf)
Control runtime checks in the code.
See IMP.base for more information.
An array of slaves in a Sun Grid Engine system parallel environment.
A collection of tasks that run in the same environment.
Representation of a single slave.
See IMP.kernel for more information.
std::string get_example_path(std::string file_name)
Return the path to installed example data for this module.
def add_task
Add a task to this context.
Utilities for the IMP.parallel module.
See IMP.algebra for more information.
Base class for all errors specific to the parallel module.
def add_slave
Add a Slave object.
def get_context
Create and return a new Context in which tasks can be run.
Error raised if a problem occurs with the network.
CheckLevel get_check_level()
Get the current audit mode.
Error raised if all slaves failed, so tasks cannot be run.
See IMP.parallel for more information.
An array of slaves on a Sun Grid Engine system, started with 'qsub'.
def get_results_unordered
Run all of the tasks on available slaves, and return results.
Manages slaves and contexts.
A slave running on the same machine as the master.