10 from __future__
import print_function, division, absolute_import
15 from sys
import version_info
16 if version_info >= (2, 6, 0):
17 def swig_import_helper():
18 from os.path
import dirname
22 fp, pathname, description = imp.find_module(
'_IMP_parallel', [dirname(__file__)])
28 _mod = imp.load_module(
'_IMP_parallel', fp, pathname, description)
32 _IMP_parallel = swig_import_helper()
33 del swig_import_helper
38 _swig_property = property
43 def _swig_setattr_nondynamic(self, class_type, name, value, static=1):
44 if (name ==
"thisown"):
45 return self.this.own(value)
47 if type(value).__name__ ==
'SwigPyObject':
48 self.__dict__[name] = value
50 method = class_type.__swig_setmethods__.get(name,
None)
52 return method(self, value)
54 object.__setattr__(self, name, value)
56 raise AttributeError(
"You cannot add attributes to %s" % self)
59 def _swig_setattr(self, class_type, name, value):
60 return _swig_setattr_nondynamic(self, class_type, name, value, 0)
63 def _swig_getattr_nondynamic(self, class_type, name, static=1):
64 if (name ==
"thisown"):
65 return self.this.own()
66 method = class_type.__swig_getmethods__.get(name,
None)
70 return object.__getattr__(self, name)
72 raise AttributeError(name)
74 def _swig_getattr(self, class_type, name):
75 return _swig_getattr_nondynamic(self, class_type, name, 0)
80 strthis =
"proxy of " + self.this.__repr__()
83 return "<%s.%s; %s >" % (self.__class__.__module__, self.__class__.__name__, strthis,)
88 except AttributeError:
95 def _swig_setattr_nondynamic_method(set):
96 def set_attr(self, name, value):
97 if (name ==
"thisown"):
98 return self.this.own(value)
99 if hasattr(self, name)
or (name ==
"this"):
100 set(self, name, value)
102 raise AttributeError(
"You cannot add attributes to %s" % self)
108 weakref_proxy = weakref.proxy
110 weakref_proxy =
lambda x: x
113 class IMP_PARALLEL_SwigPyIterator(object):
114 """Proxy of C++ swig::IMP_PARALLEL_SwigPyIterator class"""
115 thisown = _swig_property(
lambda x: x.this.own(),
lambda x, v: x.this.own(v), doc=
'The membership flag')
117 def __init__(self, *args, **kwargs):
118 raise AttributeError(
"No constructor defined - class is abstract")
119 __repr__ = _swig_repr
120 __swig_destroy__ = _IMP_parallel.delete_IMP_PARALLEL_SwigPyIterator
121 __del__ =
lambda self:
None
124 """value(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
125 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_value(self)
130 incr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
131 incr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
133 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_incr(self, n)
138 decr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
139 decr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
141 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_decr(self, n)
144 def distance(self, x):
145 """distance(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t"""
146 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_distance(self, x)
150 """equal(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
151 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_equal(self, x)
155 """copy(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator"""
156 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_copy(self)
160 """next(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
161 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_next(self)
165 """__next__(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
166 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___next__(self)
170 """previous(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
171 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_previous(self)
174 def advance(self, n):
175 """advance(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
176 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_advance(self, n)
180 """__eq__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
181 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___eq__(self, x)
185 """__ne__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
186 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___ne__(self, x)
189 def __iadd__(self, n):
190 """__iadd__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
191 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___iadd__(self, n)
194 def __isub__(self, n):
195 """__isub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
196 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___isub__(self, n)
199 def __add__(self, n):
200 """__add__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
201 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___add__(self, n)
204 def __sub__(self, *args):
206 __sub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator
207 __sub__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t
209 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___sub__(self, *args)
213 IMP_PARALLEL_SwigPyIterator_swigregister = _IMP_parallel.IMP_PARALLEL_SwigPyIterator_swigregister
214 IMP_PARALLEL_SwigPyIterator_swigregister(IMP_PARALLEL_SwigPyIterator)
223 _IMP_parallel.IMP_DEBUG_swigconstant(_IMP_parallel)
224 IMP_DEBUG = _IMP_parallel.IMP_DEBUG
226 _IMP_parallel.IMP_RELEASE_swigconstant(_IMP_parallel)
227 IMP_RELEASE = _IMP_parallel.IMP_RELEASE
229 _IMP_parallel.IMP_SILENT_swigconstant(_IMP_parallel)
230 IMP_SILENT = _IMP_parallel.IMP_SILENT
232 _IMP_parallel.IMP_PROGRESS_swigconstant(_IMP_parallel)
233 IMP_PROGRESS = _IMP_parallel.IMP_PROGRESS
235 _IMP_parallel.IMP_TERSE_swigconstant(_IMP_parallel)
236 IMP_TERSE = _IMP_parallel.IMP_TERSE
238 _IMP_parallel.IMP_VERBOSE_swigconstant(_IMP_parallel)
239 IMP_VERBOSE = _IMP_parallel.IMP_VERBOSE
241 _IMP_parallel.IMP_MEMORY_swigconstant(_IMP_parallel)
242 IMP_MEMORY = _IMP_parallel.IMP_MEMORY
244 _IMP_parallel.IMP_NONE_swigconstant(_IMP_parallel)
245 IMP_NONE = _IMP_parallel.IMP_NONE
247 _IMP_parallel.IMP_USAGE_swigconstant(_IMP_parallel)
248 IMP_USAGE = _IMP_parallel.IMP_USAGE
250 _IMP_parallel.IMP_INTERNAL_swigconstant(_IMP_parallel)
251 IMP_INTERNAL = _IMP_parallel.IMP_INTERNAL
253 _IMP_parallel.IMP_KERNEL_HAS_LOG4CXX_swigconstant(_IMP_parallel)
254 IMP_KERNEL_HAS_LOG4CXX = _IMP_parallel.IMP_KERNEL_HAS_LOG4CXX
256 _IMP_parallel.IMP_COMPILER_HAS_AUTO_swigconstant(_IMP_parallel)
257 IMP_COMPILER_HAS_AUTO = _IMP_parallel.IMP_COMPILER_HAS_AUTO
259 _IMP_parallel.IMP_COMPILER_HAS_DEBUG_VECTOR_swigconstant(_IMP_parallel)
260 IMP_COMPILER_HAS_DEBUG_VECTOR = _IMP_parallel.IMP_COMPILER_HAS_DEBUG_VECTOR
262 _IMP_parallel.IMP_COMPILER_HAS_UNIQUE_PTR_swigconstant(_IMP_parallel)
263 IMP_COMPILER_HAS_UNIQUE_PTR = _IMP_parallel.IMP_COMPILER_HAS_UNIQUE_PTR
265 _IMP_parallel.IMP_KERNEL_HAS_BOOST_RANDOM_swigconstant(_IMP_parallel)
266 IMP_KERNEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_KERNEL_HAS_BOOST_RANDOM
268 _IMP_parallel.IMP_KERNEL_HAS_GPERFTOOLS_swigconstant(_IMP_parallel)
269 IMP_KERNEL_HAS_GPERFTOOLS = _IMP_parallel.IMP_KERNEL_HAS_GPERFTOOLS
271 _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPCHECKER_swigconstant(_IMP_parallel)
272 IMP_KERNEL_HAS_TCMALLOC_HEAPCHECKER = _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPCHECKER
274 _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPPROFILER_swigconstant(_IMP_parallel)
275 IMP_KERNEL_HAS_TCMALLOC_HEAPPROFILER = _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPPROFILER
277 _IMP_parallel.IMPKERNEL_SHOW_WARNINGS_swigconstant(_IMP_parallel)
278 IMPKERNEL_SHOW_WARNINGS = _IMP_parallel.IMPKERNEL_SHOW_WARNINGS
281 class _DirectorObjects(object):
282 """@internal Simple class to keep references to director objects
283 to prevent premature deletion."""
286 def register(self, obj):
287 """Take a reference to a director object; will only work for
288 refcounted C++ classes"""
289 if hasattr(obj,
'get_ref_count'):
290 self._objects.append(obj)
292 """Only drop our reference and allow cleanup by Python if no other
293 Python references exist (we hold 3 references: one in self._objects,
294 one in x, and one in the argument list for getrefcount) *and* no
295 other C++ references exist (the Python object always holds one)"""
296 objs = [x
for x
in self._objects
if sys.getrefcount(x) > 3 \
297 or x.get_ref_count() > 1]
301 def get_object_count(self):
302 """Get number of director objects (useful for testing only)"""
303 return len(self._objects)
304 _director_objects = _DirectorObjects()
306 class _ostream(object):
307 """Proxy of C++ std::ostream class"""
308 thisown = _swig_property(
lambda x: x.this.own(),
lambda x, v: x.this.own(v), doc=
'The membership flag')
310 def __init__(self, *args, **kwargs):
311 raise AttributeError(
"No constructor defined")
312 __repr__ = _swig_repr
314 def write(self, osa_buf):
315 """write(_ostream self, char const * osa_buf)"""
316 return _IMP_parallel._ostream_write(self, osa_buf)
318 _ostream_swigregister = _IMP_parallel._ostream_swigregister
319 _ostream_swigregister(_ostream)
322 _IMP_parallel.IMP_COMPILER_HAS_OVERRIDE_swigconstant(_IMP_parallel)
323 IMP_COMPILER_HAS_OVERRIDE = _IMP_parallel.IMP_COMPILER_HAS_OVERRIDE
325 _IMP_parallel.IMP_COMPILER_HAS_FINAL_swigconstant(_IMP_parallel)
326 IMP_COMPILER_HAS_FINAL = _IMP_parallel.IMP_COMPILER_HAS_FINAL
328 _IMP_parallel.IMP_HAS_NOEXCEPT_swigconstant(_IMP_parallel)
329 IMP_HAS_NOEXCEPT = _IMP_parallel.IMP_HAS_NOEXCEPT
332 _IMP_parallel.IMP_PARALLEL_HAS_BOOST_FILESYSTEM_swigconstant(_IMP_parallel)
333 IMP_PARALLEL_HAS_BOOST_FILESYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_FILESYSTEM
335 _IMP_parallel.IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS_swigconstant(_IMP_parallel)
336 IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS
338 _IMP_parallel.IMP_PARALLEL_HAS_BOOST_RANDOM_swigconstant(_IMP_parallel)
339 IMP_PARALLEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_RANDOM
341 _IMP_parallel.IMP_PARALLEL_HAS_BOOST_SYSTEM_swigconstant(_IMP_parallel)
342 IMP_PARALLEL_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_SYSTEM
344 _IMP_parallel.IMPPARALLEL_SHOW_WARNINGS_swigconstant(_IMP_parallel)
345 IMPPARALLEL_SHOW_WARNINGS = _IMP_parallel.IMPPARALLEL_SHOW_WARNINGS
357 import cPickle
as pickle
368 _import_time_path = sys.path[:]
371 """Base class for all errors specific to the parallel module"""
375 class _NoMoreTasksError(Error):
380 """Error raised if all slaves failed, so tasks cannot be run"""
385 """Error raised if a problem occurs with the network"""
390 """Error raised if a slave has an unhandled exception"""
391 def __init__(self, exc, traceback, slave):
393 self.traceback = traceback
397 errstr = str(self.exc.__class__).replace(
"exceptions.",
"")
398 return "%s: %s from %s\nRemote traceback:\n%s" \
399 % (errstr, str(self.exc), str(self.slave), self.traceback)
401 class _Communicator(object):
402 """Simple support for sending Python pickled objects over the network"""
408 def _send(self, obj):
411 p.pack_string(pickle.dumps(obj, -1))
415 p.pack_string(pickle.dumps(obj, 0))
416 self._socket.sendall(p.get_buffer())
418 def get_data_pending(self):
419 return len(self._ibuffer) > 0
424 obj, self._ibuffer = self._unpickle(self._ibuffer)
425 if isinstance(obj, _ErrorWrapper):
429 except (IndexError, EOFError):
431 data = self._socket.recv(4096)
432 except socket.error
as detail:
434 % (str(self), str(detail)))
436 self._ibuffer += data
440 def _unpickle(self, ibuffer):
441 p = xdrlib.Unpacker(ibuffer)
442 obj = p.unpack_string()
443 return (pickle.loads(obj), ibuffer[p.get_position():])
447 """Representation of a single slave.
448 Each slave uses a single thread of execution (i.e. a single CPU core)
449 to run tasks sequentially.
450 Slave is an abstract class; instead of using this class directly, use
451 a subclass such as LocalSlave or SGEQsubSlaveArray."""
454 _Communicator.__init__(self)
455 self._state = slavestate.init
458 self.update_contact_time()
460 def _start(self, command, unique_id, output):
461 """Start the slave running on the remote host; override in subclasses"""
462 self._state = slavestate.started
464 def _accept_connection(self, sock):
466 self._state = slavestate.connected
467 self.update_contact_time()
469 def _set_python_search_path(self, path):
470 self._send(_SetPathAction(path))
472 def update_contact_time(self):
473 self.last_contact_time = time.time()
475 def get_contact_timed_out(self, timeout):
476 return (time.time() - self.last_contact_time) > timeout
478 def _start_task(self, task, context):
479 if not self._ready_for_task(context)
and not self._ready_for_task(
None):
480 raise TypeError(
"%s not ready for task" % str(self))
481 if self._context != context:
482 self._context = context
483 self._send(_ContextWrapper(context._startup))
484 self._state = slavestate.running_task
486 self._send(_TaskWrapper(task))
488 def _get_finished_task(self):
491 self.update_contact_time()
492 if isinstance(r, _HeartBeat):
493 if not self.get_data_pending():
500 self._state = slavestate.connected
507 self._state = slavestate.dead
510 def _ready_to_start(self):
511 return self._state == slavestate.init
513 def _ready_for_task(self, context):
514 return self._state == slavestate.connected \
515 and self._context == context
517 def _running_task(self, context):
518 return self._state == slavestate.running_task \
519 and self._context == context
523 """Representation of an array of slaves.
524 This is similar to Slave, except that it represents a collection of
525 slaves that are controlled together, such as a batch submission system
526 array job on a compute cluster.
527 Slave is an abstract class; instead of using this class directly, use
528 a subclass such as SGEQsubSlaveArray."""
530 def _get_slaves(self):
531 """Return a list of Slave objects contained within this array"""
535 """Do any necessary startup after all contained Slaves have started"""
539 class LocalSlave(Slave):
540 """A slave running on the same machine as the master."""
542 def _start(self, command, unique_id, output):
543 Slave._start(self, command, unique_id, output)
544 cmdline =
"%s %s" % (command, unique_id)
545 _run_background(cmdline, output)
548 return "<LocalSlave>"
551 class _SGEQsubSlave(Slave):
552 def __init__(self, array):
557 def _start(self, command, unique_id, output):
558 Slave._start(self, command, unique_id, output)
559 self._array._slave_started(unique_id, output, self)
565 return "<SGE qsub slave, ID %s>" % jobid
569 """An array of slaves on a Sun Grid Engine system, started with 'qsub'.
570 To use this class, the master process must be running on a machine that
571 can submit Sun Grid Engine (SGE) jobs using the 'qsub' command (this
572 is termed a 'submit host' by SGE). The class starts an SGE job array
573 (every slave has the same SGE job ID, but a different task ID).
577 standard_options =
'-j y -cwd -r n -o sge-errors'
581 @param numslave The number of slaves, which corresponds to the
582 number of tasks in the SGE job.
583 @param options A string of SGE options that are passed on the 'qsub'
584 command line. This is added to standard_options.
586 self._numslave = numslave
587 self._options = options
588 self._starting_slaves = []
591 def _get_slaves(self):
592 """Return a list of Slave objects contained within this array"""
593 return [_SGEQsubSlave(self)
for x
in range(self._numslave)]
595 def _slave_started(self, command, output, slave):
596 self._starting_slaves.append((command, output, slave))
598 def _start(self, command):
599 qsub =
"qsub -S /bin/sh %s %s -t 1-%d" % \
600 (self._options, self.standard_options,
601 len(self._starting_slaves))
604 (inp, out) = (a.stdin, a.stdout)
605 slave_uid =
" ".join([repr(s[0])
for s
in self._starting_slaves])
606 slave_out =
" ".join([repr(s[1])
for s
in self._starting_slaves])
607 inp.write(
"#!/bin/sh\n")
608 inp.write(
"uid=( '' %s )\n" % slave_uid)
609 inp.write(
"out=( '' %s )\n" % slave_out)
610 inp.write(
"myuid=${uid[$SGE_TASK_ID]}\n")
611 inp.write(
"myout=${out[$SGE_TASK_ID]}\n")
612 inp.write(
"%s $myuid > $myout 2>&1\n" % command)
614 outlines = out.readlines()
616 for line
in outlines:
617 print(line.rstrip(
'\r\n'))
618 a.require_clean_exit()
619 self._set_jobid(outlines)
620 self._starting_slaves = []
622 def _set_jobid(self, outlines):
623 """Try to figure out the job ID from the SGE qsub output"""
624 if len(outlines) > 0:
625 m = re.compile(
r"\d+").search(outlines[0])
627 self._jobid = int(m.group())
628 for (num, slave)
in enumerate(self._starting_slaves):
629 slave[2]._jobid =
"%d.%d" % (self._jobid, num+1)
632 class _SGEPESlave(
Slave):
637 def _start(self, command, unique_id, output):
638 Slave._start(self, command, unique_id, output)
639 cmdline =
"qrsh -inherit -V %s %s %s" % (self._host, command, unique_id)
640 _run_background(cmdline, output)
643 return "<SGE PE slave on %s>" % self._host
647 """An array of slaves in a Sun Grid Engine system parallel environment.
648 In order to use this class, the master must be run via Sun Grid Engine's
649 'qsub' command and submitted to a parallel environment using the qsub
650 -pe option. This class will start slaves on every node in the parallel
651 environment (including the node running the master). Each slave is
652 started using the 'qrsh' command with the '-inherit' option."""
654 def _get_slaves(self):
657 pe = os.environ[
'PE_HOSTFILE']
663 (node, num, queue) = line.split(
None, 2)
664 for i
in range(int(num)):
665 slaves.append(_SGEPESlave(node))
675 """A collection of tasks that run in the same environment.
676 Context objects are typically created by calling Manager::get_context().
680 self._manager = manager
681 self._startup = startup
685 """Add a task to this context.
686 Tasks are any Python callable object that can be pickled (e.g. a
687 function or a class that implements the \_\_call\_\_ method). When
688 the task is run on the slave its arguments are the return value
689 from this context's startup function."""
690 self._tasks.append(task)
693 """Run all of the tasks on available slaves, and return results.
694 If there are more tasks than slaves, subsequent tasks are
695 started only once a running task completes: each slave only runs
696 a single task at a time. As each task completes, the return value(s)
697 from the task callable are returned from this method, as a
698 Python generator. Note that the results are returned in the order
699 that tasks complete, which may not be the same as the order they
702 \exception NoMoreSlavesError there are no slaves available
703 to run the tasks (or they all failed during execution).
704 \exception RemoteError a slave encountered an unhandled exception.
705 \exception NetworkError the master lost (or was unable to
706 establish) communication with any slave.
708 return self._manager._get_results_unordered(self)
712 """Manages slaves and contexts.
715 connect_timeout = 7200
718 heartbeat_timeout = 7200
720 def __init__(self, python=None, host=None, output='slave%d.output'):
722 @param python If not None, the command to run to start a Python
723 interpreter that can import the IMP module. Otherwise,
724 the same interpreter that the master is currently using
725 is used. This is passed to the shell, so a full command
726 line (including multiple words separated by spaces) can
727 be used if necessary.
728 @param host The hostname that slaves use to connect back to the
729 master. If not specified, the master machine's primary
730 IP address is used. On multi-homed machines, such as
731 compute cluster headnodes, this may need to be changed
732 to allow all slaves to reach the master (typically the
733 name of the machine's internal network address is
734 needed). If only running local slaves, 'localhost' can
735 be used to prohibit connections across the network.
736 @param output A format string used to name slave output files. It is
737 given the numeric slave id, so for example the default
738 value 'slave\%d.output' will yield output files called
739 slave0.output, slave1.output, etc.
742 self._python = sys.executable
744 self._python = python
746 self._output = output
747 self._all_slaves = []
748 self._starting_slaves = {}
749 self._slave_arrays = []
754 self._host = socket.gethostbyname_ex(socket.gethostname())[-1][0]
755 self._listen_sock = _ListenSocket(self._host, self.connect_timeout)
758 """Add a Slave object."""
759 if hasattr(slave,
'_get_slaves'):
760 self._slave_arrays.append(slave)
762 self._all_slaves.append(slave)
765 """Create and return a new Context in which tasks can be run.
766 @param startup If not None, a callable (Python function or class
767 that implements the \_\_call\_\_ method) that sets up
768 the slave to run tasks. This method is only called
769 once per slave. The return values from this method
770 are passed to the task object when it runs on
772 @return A new Context object.
776 def _get_results_unordered(self, context):
777 """Run all of a context's tasks, and yield results"""
778 self._send_tasks_to_slaves(context)
781 for task
in self._get_finished_tasks(context):
782 tasks_queued = len(context._tasks)
786 if len(context._tasks) > tasks_queued:
787 self._send_tasks_to_slaves(context)
788 except _NoMoreTasksError:
791 def _start_all_slaves(self):
792 for array
in self._slave_arrays:
793 self._all_slaves.extend(array._get_slaves())
795 command = (
"%s -c \"import IMP.parallel.slave_handler as s; s.main()\" "
796 "%s %d") % (self._python, self._host, self._listen_sock.port)
798 for (num, slave)
in enumerate(self._all_slaves):
799 if slave._ready_to_start():
800 unique_id = self._get_unique_id(num)
801 self._starting_slaves[unique_id] = slave
802 slave._start(command, unique_id, self._output % num)
804 for array
in self._slave_arrays:
805 array._start(command)
806 self._slave_arrays = []
808 def _get_unique_id(self, num):
810 for i
in range(0, 8):
811 id += chr(random.randint(0, 25) + ord(
'A'))
814 def _send_tasks_to_slaves(self, context):
815 self._start_all_slaves()
817 available_slaves = [a
for a
in self._all_slaves
818 if a._ready_for_task(context)] + \
819 [a
for a
in self._all_slaves
820 if a._ready_for_task(
None)]
821 for slave
in available_slaves:
822 if len(context._tasks) == 0:
825 self._send_task_to_slave(slave, context)
827 def _send_task_to_slave(self, slave, context):
828 if len(context._tasks) == 0:
830 t = context._tasks[0]
832 slave._start_task(t, context)
833 context._tasks.pop(0)
834 except socket.error
as detail:
837 def _get_finished_tasks(self, context):
839 events = self._get_network_events(context)
841 self._kill_all_running_slaves(context)
843 task = self._process_event(event, context)
847 def _process_event(self, event, context):
848 if event == self._listen_sock:
850 (conn, addr) = self._listen_sock.accept()
851 new_slave = self._accept_slave(conn, context)
852 elif event._running_task(context):
854 task = event._get_finished_task()
856 self._send_task_to_slave(event, context)
859 self._kill_timed_out_slaves(context)
860 except NetworkError
as detail:
862 print(
"Slave %s failed (%s): rescheduling task %s" \
863 % (str(event), str(detail), str(task)))
864 context._tasks.append(task)
865 self._send_tasks_to_slaves(context)
869 def _kill_timed_out_slaves(self, context):
870 timed_out = [a
for a
in self._all_slaves
if a._running_task(context) \
871 and a.get_contact_timed_out(self.heartbeat_timeout)]
872 for slave
in timed_out:
874 print(
"Did not hear from slave %s in %d seconds; rescheduling "
875 "task %s" % (str(slave), self.heartbeat_timeout, str(task)))
876 context._tasks.append(task)
877 if len(timed_out) > 0:
878 self._send_tasks_to_slaves(context)
880 def _kill_all_running_slaves(self, context):
881 running = [a
for a
in self._all_slaves
if a._running_task(context)]
882 for slave
in running:
884 context._tasks.append(task)
885 raise NetworkError(
"Did not hear from any running slave in "
886 "%d seconds" % self.heartbeat_timeout)
888 def _accept_slave(self, sock, context):
889 sock.setblocking(
True)
890 identifier = sock.recv(1024)
892 identifier = identifier.decode(
'ascii')
893 if identifier
and identifier
in self._starting_slaves:
894 slave = self._starting_slaves.pop(identifier)
895 slave._accept_connection(sock)
896 print(
"Identified slave %s " % str(slave))
897 self._init_slave(slave)
898 self._send_task_to_slave(slave, context)
901 print(
"Ignoring request from unknown slave")
903 def _init_slave(self, slave):
904 if _import_time_path[0] !=
'':
905 slave._set_python_search_path(_import_time_path[0])
906 if sys.path[0] !=
'' and sys.path[0] != _import_time_path[0]:
907 slave._set_python_search_path(sys.path[0])
909 def _get_network_events(self, context):
910 running = [a
for a
in self._all_slaves
if a._running_task(context)]
911 if len(running) == 0:
912 if len(context._tasks) == 0:
913 raise _NoMoreTasksError()
914 elif len(self._starting_slaves) == 0:
918 return util._poll_events(self._listen_sock, running,
919 self.heartbeat_timeout)
923 def get_module_version():
924 """get_module_version() -> std::string const"""
925 return _IMP_parallel.get_module_version()
928 """get_example_path(std::string fname) -> std::string"""
929 return _IMP_parallel.get_example_path(fname)
932 """get_data_path(std::string fname) -> std::string"""
933 return _IMP_parallel.get_data_path(fname)
935 from .
import _version_check
936 _version_check.check_version(get_module_version())
937 __version__ = get_module_version()
Representation of an array of slaves.
std::string get_data_path(std::string file_name)
Return the full path to one of this module's data files.
Error raised if a slave has an unhandled exception.
An array of slaves in a Sun Grid Engine system parallel environment.
A collection of tasks that run in the same environment.
Representation of a single slave.
std::string get_example_path(std::string file_name)
Return the full path to one of this module's example files.
def add_task
Add a task to this context.
Utilities for the IMP.parallel module.
Base class for all errors specific to the parallel module.
The general base class for IMP exceptions.
def add_slave
Add a Slave object.
def get_context
Create and return a new Context in which tasks can be run.
Error raised if a problem occurs with the network.
Error raised if all slaves failed, so tasks cannot be run.
Distribute IMP tasks to multiple processors or machines.
An array of slaves on a Sun Grid Engine system, started with 'qsub'.
def get_results_unordered
Run all of the tasks on available slaves, and return results.
Manages slaves and contexts.
A slave running on the same machine as the master.