10 from __future__
import print_function, division, absolute_import
15 from sys
import version_info
16 if version_info >= (2, 6, 0):
17 def swig_import_helper():
18 from os.path
import dirname
22 fp, pathname, description = imp.find_module(
'_IMP_parallel', [dirname(__file__)])
28 _mod = imp.load_module(
'_IMP_parallel', fp, pathname, description)
32 _IMP_parallel = swig_import_helper()
33 del swig_import_helper
38 _swig_property = property
43 def _swig_setattr_nondynamic(self, class_type, name, value, static=1):
44 if (name ==
"thisown"):
45 return self.this.own(value)
47 if type(value).__name__ ==
'SwigPyObject':
48 self.__dict__[name] = value
50 method = class_type.__swig_setmethods__.get(name,
None)
52 return method(self, value)
54 object.__setattr__(self, name, value)
56 raise AttributeError(
"You cannot add attributes to %s" % self)
59 def _swig_setattr(self, class_type, name, value):
60 return _swig_setattr_nondynamic(self, class_type, name, value, 0)
63 def _swig_getattr_nondynamic(self, class_type, name, static=1):
64 if (name ==
"thisown"):
65 return self.this.own()
66 method = class_type.__swig_getmethods__.get(name,
None)
70 return object.__getattr__(self, name)
72 raise AttributeError(name)
74 def _swig_getattr(self, class_type, name):
75 return _swig_getattr_nondynamic(self, class_type, name, 0)
80 strthis =
"proxy of " + self.this.__repr__()
83 return "<%s.%s; %s >" % (self.__class__.__module__, self.__class__.__name__, strthis,)
88 except AttributeError:
95 def _swig_setattr_nondynamic_method(set):
96 def set_attr(self, name, value):
97 if (name ==
"thisown"):
98 return self.this.own(value)
99 if hasattr(self, name)
or (name ==
"this"):
100 set(self, name, value)
102 raise AttributeError(
"You cannot add attributes to %s" % self)
108 weakref_proxy = weakref.proxy
110 weakref_proxy =
lambda x: x
113 class IMP_PARALLEL_SwigPyIterator(object):
114 """Proxy of C++ swig::IMP_PARALLEL_SwigPyIterator class."""
116 thisown = _swig_property(
lambda x: x.this.own(),
lambda x, v: x.this.own(v), doc=
'The membership flag')
118 def __init__(self, *args, **kwargs):
119 raise AttributeError(
"No constructor defined - class is abstract")
120 __repr__ = _swig_repr
121 __swig_destroy__ = _IMP_parallel.delete_IMP_PARALLEL_SwigPyIterator
122 __del__ =
lambda self:
None
125 """value(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
126 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_value(self)
131 incr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
132 incr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
134 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_incr(self, n)
139 decr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
140 decr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
142 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_decr(self, n)
145 def distance(self, x):
146 """distance(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t"""
147 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_distance(self, x)
151 """equal(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
152 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_equal(self, x)
156 """copy(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator"""
157 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_copy(self)
161 """next(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
162 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_next(self)
166 """__next__(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
167 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___next__(self)
171 """previous(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
172 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_previous(self)
175 def advance(self, n):
176 """advance(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
177 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_advance(self, n)
181 """__eq__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
182 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___eq__(self, x)
186 """__ne__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
187 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___ne__(self, x)
190 def __iadd__(self, n):
191 """__iadd__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
192 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___iadd__(self, n)
195 def __isub__(self, n):
196 """__isub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
197 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___isub__(self, n)
200 def __add__(self, n):
201 """__add__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
202 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___add__(self, n)
205 def __sub__(self, *args):
207 __sub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator
208 __sub__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t
210 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___sub__(self, *args)
214 IMP_PARALLEL_SwigPyIterator_swigregister = _IMP_parallel.IMP_PARALLEL_SwigPyIterator_swigregister
215 IMP_PARALLEL_SwigPyIterator_swigregister(IMP_PARALLEL_SwigPyIterator)
224 _IMP_parallel.IMP_DEBUG_swigconstant(_IMP_parallel)
225 IMP_DEBUG = _IMP_parallel.IMP_DEBUG
227 _IMP_parallel.IMP_RELEASE_swigconstant(_IMP_parallel)
228 IMP_RELEASE = _IMP_parallel.IMP_RELEASE
230 _IMP_parallel.IMP_SILENT_swigconstant(_IMP_parallel)
231 IMP_SILENT = _IMP_parallel.IMP_SILENT
233 _IMP_parallel.IMP_PROGRESS_swigconstant(_IMP_parallel)
234 IMP_PROGRESS = _IMP_parallel.IMP_PROGRESS
236 _IMP_parallel.IMP_TERSE_swigconstant(_IMP_parallel)
237 IMP_TERSE = _IMP_parallel.IMP_TERSE
239 _IMP_parallel.IMP_VERBOSE_swigconstant(_IMP_parallel)
240 IMP_VERBOSE = _IMP_parallel.IMP_VERBOSE
242 _IMP_parallel.IMP_MEMORY_swigconstant(_IMP_parallel)
243 IMP_MEMORY = _IMP_parallel.IMP_MEMORY
245 _IMP_parallel.IMP_NONE_swigconstant(_IMP_parallel)
246 IMP_NONE = _IMP_parallel.IMP_NONE
248 _IMP_parallel.IMP_USAGE_swigconstant(_IMP_parallel)
249 IMP_USAGE = _IMP_parallel.IMP_USAGE
251 _IMP_parallel.IMP_INTERNAL_swigconstant(_IMP_parallel)
252 IMP_INTERNAL = _IMP_parallel.IMP_INTERNAL
254 _IMP_parallel.IMP_KERNEL_HAS_LOG4CXX_swigconstant(_IMP_parallel)
255 IMP_KERNEL_HAS_LOG4CXX = _IMP_parallel.IMP_KERNEL_HAS_LOG4CXX
257 _IMP_parallel.IMP_COMPILER_HAS_AUTO_swigconstant(_IMP_parallel)
258 IMP_COMPILER_HAS_AUTO = _IMP_parallel.IMP_COMPILER_HAS_AUTO
260 _IMP_parallel.IMP_COMPILER_HAS_DEBUG_VECTOR_swigconstant(_IMP_parallel)
261 IMP_COMPILER_HAS_DEBUG_VECTOR = _IMP_parallel.IMP_COMPILER_HAS_DEBUG_VECTOR
263 _IMP_parallel.IMP_COMPILER_HAS_UNIQUE_PTR_swigconstant(_IMP_parallel)
264 IMP_COMPILER_HAS_UNIQUE_PTR = _IMP_parallel.IMP_COMPILER_HAS_UNIQUE_PTR
266 _IMP_parallel.IMP_KERNEL_HAS_BOOST_RANDOM_swigconstant(_IMP_parallel)
267 IMP_KERNEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_KERNEL_HAS_BOOST_RANDOM
269 _IMP_parallel.IMP_KERNEL_HAS_GPERFTOOLS_swigconstant(_IMP_parallel)
270 IMP_KERNEL_HAS_GPERFTOOLS = _IMP_parallel.IMP_KERNEL_HAS_GPERFTOOLS
272 _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPCHECKER_swigconstant(_IMP_parallel)
273 IMP_KERNEL_HAS_TCMALLOC_HEAPCHECKER = _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPCHECKER
275 _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPPROFILER_swigconstant(_IMP_parallel)
276 IMP_KERNEL_HAS_TCMALLOC_HEAPPROFILER = _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPPROFILER
278 _IMP_parallel.IMPKERNEL_SHOW_WARNINGS_swigconstant(_IMP_parallel)
279 IMPKERNEL_SHOW_WARNINGS = _IMP_parallel.IMPKERNEL_SHOW_WARNINGS
282 class _DirectorObjects(object):
283 """@internal Simple class to keep references to director objects
284 to prevent premature deletion."""
287 def register(self, obj):
288 """Take a reference to a director object; will only work for
289 refcounted C++ classes"""
290 if hasattr(obj,
'get_ref_count'):
291 self._objects.append(obj)
293 """Only drop our reference and allow cleanup by Python if no other
294 Python references exist (we hold 3 references: one in self._objects,
295 one in x, and one in the argument list for getrefcount) *and* no
296 other C++ references exist (the Python object always holds one)"""
297 objs = [x
for x
in self._objects
if sys.getrefcount(x) > 3 \
298 or x.get_ref_count() > 1]
302 def get_object_count(self):
303 """Get number of director objects (useful for testing only)"""
304 return len(self._objects)
305 _director_objects = _DirectorObjects()
307 class _ostream(object):
308 """Proxy of C++ std::ostream class."""
310 thisown = _swig_property(
lambda x: x.this.own(),
lambda x, v: x.this.own(v), doc=
'The membership flag')
312 def __init__(self, *args, **kwargs):
313 raise AttributeError(
"No constructor defined")
314 __repr__ = _swig_repr
316 def write(self, osa_buf):
317 """write(_ostream self, char const * osa_buf)"""
318 return _IMP_parallel._ostream_write(self, osa_buf)
320 _ostream_swigregister = _IMP_parallel._ostream_swigregister
321 _ostream_swigregister(_ostream)
324 _IMP_parallel.IMP_COMPILER_HAS_OVERRIDE_swigconstant(_IMP_parallel)
325 IMP_COMPILER_HAS_OVERRIDE = _IMP_parallel.IMP_COMPILER_HAS_OVERRIDE
327 _IMP_parallel.IMP_COMPILER_HAS_FINAL_swigconstant(_IMP_parallel)
328 IMP_COMPILER_HAS_FINAL = _IMP_parallel.IMP_COMPILER_HAS_FINAL
330 _IMP_parallel.IMP_HAS_NOEXCEPT_swigconstant(_IMP_parallel)
331 IMP_HAS_NOEXCEPT = _IMP_parallel.IMP_HAS_NOEXCEPT
334 _IMP_parallel.IMP_PARALLEL_HAS_BOOST_FILESYSTEM_swigconstant(_IMP_parallel)
335 IMP_PARALLEL_HAS_BOOST_FILESYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_FILESYSTEM
337 _IMP_parallel.IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS_swigconstant(_IMP_parallel)
338 IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS
340 _IMP_parallel.IMP_PARALLEL_HAS_BOOST_RANDOM_swigconstant(_IMP_parallel)
341 IMP_PARALLEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_RANDOM
343 _IMP_parallel.IMP_PARALLEL_HAS_BOOST_SYSTEM_swigconstant(_IMP_parallel)
344 IMP_PARALLEL_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_SYSTEM
346 _IMP_parallel.IMPPARALLEL_SHOW_WARNINGS_swigconstant(_IMP_parallel)
347 IMPPARALLEL_SHOW_WARNINGS = _IMP_parallel.IMPPARALLEL_SHOW_WARNINGS
359 import cPickle
as pickle
370 _import_time_path = sys.path[:]
373 """Base class for all errors specific to the parallel module"""
377 class _NoMoreTasksError(Error):
382 """Error raised if all slaves failed, so tasks cannot be run"""
387 """Error raised if a problem occurs with the network"""
392 """Error raised if a slave has an unhandled exception"""
393 def __init__(self, exc, traceback, slave):
395 self.traceback = traceback
399 errstr = str(self.exc.__class__).replace(
"exceptions.",
"")
400 return "%s: %s from %s\nRemote traceback:\n%s" \
401 % (errstr, str(self.exc), str(self.slave), self.traceback)
403 class _Communicator(object):
404 """Simple support for sending Python pickled objects over the network"""
410 def _send(self, obj):
413 p.pack_string(pickle.dumps(obj, -1))
417 p.pack_string(pickle.dumps(obj, 0))
418 self._socket.sendall(p.get_buffer())
420 def get_data_pending(self):
421 return len(self._ibuffer) > 0
426 obj, self._ibuffer = self._unpickle(self._ibuffer)
427 if isinstance(obj, _ErrorWrapper):
431 except (IndexError, EOFError):
433 data = self._socket.recv(4096)
434 except socket.error
as detail:
436 % (str(self), str(detail)))
438 self._ibuffer += data
442 def _unpickle(self, ibuffer):
443 p = xdrlib.Unpacker(ibuffer)
444 obj = p.unpack_string()
445 return (pickle.loads(obj), ibuffer[p.get_position():])
449 """Representation of a single slave.
450 Each slave uses a single thread of execution (i.e. a single CPU core)
451 to run tasks sequentially.
452 Slave is an abstract class; instead of using this class directly, use
453 a subclass such as LocalSlave or SGEQsubSlaveArray."""
456 _Communicator.__init__(self)
457 self._state = slavestate.init
460 self.update_contact_time()
462 def _start(self, command, unique_id, output):
463 """Start the slave running on the remote host; override in subclasses"""
464 self._state = slavestate.started
466 def _accept_connection(self, sock):
468 self._state = slavestate.connected
469 self.update_contact_time()
471 def _set_python_search_path(self, path):
472 self._send(_SetPathAction(path))
474 def update_contact_time(self):
475 self.last_contact_time = time.time()
477 def get_contact_timed_out(self, timeout):
478 return (time.time() - self.last_contact_time) > timeout
480 def _start_task(self, task, context):
481 if not self._ready_for_task(context)
and not self._ready_for_task(
None):
482 raise TypeError(
"%s not ready for task" % str(self))
483 if self._context != context:
484 self._context = context
485 self._send(_ContextWrapper(context._startup))
486 self._state = slavestate.running_task
488 self._send(_TaskWrapper(task))
490 def _get_finished_task(self):
493 self.update_contact_time()
494 if isinstance(r, _HeartBeat):
495 if not self.get_data_pending():
502 self._state = slavestate.connected
509 self._state = slavestate.dead
512 def _ready_to_start(self):
513 return self._state == slavestate.init
515 def _ready_for_task(self, context):
516 return self._state == slavestate.connected \
517 and self._context == context
519 def _running_task(self, context):
520 return self._state == slavestate.running_task \
521 and self._context == context
525 """Representation of an array of slaves.
526 This is similar to Slave, except that it represents a collection of
527 slaves that are controlled together, such as a batch submission system
528 array job on a compute cluster.
529 Slave is an abstract class; instead of using this class directly, use
530 a subclass such as SGEQsubSlaveArray."""
532 def _get_slaves(self):
533 """Return a list of Slave objects contained within this array"""
537 """Do any necessary startup after all contained Slaves have started"""
541 class LocalSlave(Slave):
542 """A slave running on the same machine as the master."""
544 def _start(self, command, unique_id, output):
545 Slave._start(self, command, unique_id, output)
546 cmdline =
"%s %s" % (command, unique_id)
547 _run_background(cmdline, output)
550 return "<LocalSlave>"
553 class _SGEQsubSlave(Slave):
554 def __init__(self, array):
559 def _start(self, command, unique_id, output):
560 Slave._start(self, command, unique_id, output)
561 self._array._slave_started(unique_id, output, self)
567 return "<SGE qsub slave, ID %s>" % jobid
571 """An array of slaves on a Sun Grid Engine system, started with 'qsub'.
572 To use this class, the master process must be running on a machine that
573 can submit Sun Grid Engine (SGE) jobs using the 'qsub' command (this
574 is termed a 'submit host' by SGE). The class starts an SGE job array
575 (every slave has the same SGE job ID, but a different task ID).
579 standard_options =
'-j y -cwd -r n -o sge-errors'
583 @param numslave The number of slaves, which corresponds to the
584 number of tasks in the SGE job.
585 @param options A string of SGE options that are passed on the 'qsub'
586 command line. This is added to standard_options.
588 self._numslave = numslave
589 self._options = options
590 self._starting_slaves = []
593 def _get_slaves(self):
594 """Return a list of Slave objects contained within this array"""
595 return [_SGEQsubSlave(self)
for x
in range(self._numslave)]
597 def _slave_started(self, command, output, slave):
598 self._starting_slaves.append((command, output, slave))
600 def _start(self, command):
601 qsub =
"qsub -S /bin/sh %s %s -t 1-%d" % \
602 (self._options, self.standard_options,
603 len(self._starting_slaves))
606 (inp, out) = (a.stdin, a.stdout)
607 slave_uid =
" ".join([repr(s[0])
for s
in self._starting_slaves])
608 slave_out =
" ".join([repr(s[1])
for s
in self._starting_slaves])
609 inp.write(
"#!/bin/sh\n")
610 inp.write(
"uid=( '' %s )\n" % slave_uid)
611 inp.write(
"out=( '' %s )\n" % slave_out)
612 inp.write(
"myuid=${uid[$SGE_TASK_ID]}\n")
613 inp.write(
"myout=${out[$SGE_TASK_ID]}\n")
614 inp.write(
"%s $myuid > $myout 2>&1\n" % command)
616 outlines = out.readlines()
618 for line
in outlines:
619 print(line.rstrip(
'\r\n'))
620 a.require_clean_exit()
621 self._set_jobid(outlines)
622 self._starting_slaves = []
624 def _set_jobid(self, outlines):
625 """Try to figure out the job ID from the SGE qsub output"""
626 if len(outlines) > 0:
627 m = re.compile(
r"\d+").search(outlines[0])
629 self._jobid = int(m.group())
630 for (num, slave)
in enumerate(self._starting_slaves):
631 slave[2]._jobid =
"%d.%d" % (self._jobid, num+1)
634 class _SGEPESlave(
Slave):
639 def _start(self, command, unique_id, output):
640 Slave._start(self, command, unique_id, output)
641 cmdline =
"qrsh -inherit -V %s %s %s" % (self._host, command, unique_id)
642 _run_background(cmdline, output)
645 return "<SGE PE slave on %s>" % self._host
649 """An array of slaves in a Sun Grid Engine system parallel environment.
650 In order to use this class, the master must be run via Sun Grid Engine's
651 'qsub' command and submitted to a parallel environment using the qsub
652 -pe option. This class will start slaves on every node in the parallel
653 environment (including the node running the master). Each slave is
654 started using the 'qrsh' command with the '-inherit' option."""
656 def _get_slaves(self):
659 pe = os.environ[
'PE_HOSTFILE']
665 (node, num, queue) = line.split(
None, 2)
666 for i
in range(int(num)):
667 slaves.append(_SGEPESlave(node))
677 """A collection of tasks that run in the same environment.
678 Context objects are typically created by calling Manager::get_context().
682 self._manager = manager
683 self._startup = startup
687 """Add a task to this context.
688 Tasks are any Python callable object that can be pickled (e.g. a
689 function or a class that implements the \_\_call\_\_ method). When
690 the task is run on the slave its arguments are the return value
691 from this context's startup function."""
692 self._tasks.append(task)
695 """Run all of the tasks on available slaves, and return results.
696 If there are more tasks than slaves, subsequent tasks are
697 started only once a running task completes: each slave only runs
698 a single task at a time. As each task completes, the return value(s)
699 from the task callable are returned from this method, as a
700 Python generator. Note that the results are returned in the order
701 that tasks complete, which may not be the same as the order they
704 \exception NoMoreSlavesError there are no slaves available
705 to run the tasks (or they all failed during execution).
706 \exception RemoteError a slave encountered an unhandled exception.
707 \exception NetworkError the master lost (or was unable to
708 establish) communication with any slave.
710 return self._manager._get_results_unordered(self)
714 """Manages slaves and contexts.
717 connect_timeout = 7200
720 heartbeat_timeout = 7200
722 def __init__(self, python=None, host=None, output='slave%d.output'):
724 @param python If not None, the command to run to start a Python
725 interpreter that can import the IMP module. Otherwise,
726 the same interpreter that the master is currently using
727 is used. This is passed to the shell, so a full command
728 line (including multiple words separated by spaces) can
729 be used if necessary.
730 @param host The hostname that slaves use to connect back to the
731 master. If not specified, the master machine's primary
732 IP address is used. On multi-homed machines, such as
733 compute cluster headnodes, this may need to be changed
734 to allow all slaves to reach the master (typically the
735 name of the machine's internal network address is
736 needed). If only running local slaves, 'localhost' can
737 be used to prohibit connections across the network.
738 @param output A format string used to name slave output files. It is
739 given the numeric slave id, so for example the default
740 value 'slave\%d.output' will yield output files called
741 slave0.output, slave1.output, etc.
744 self._python = sys.executable
746 self._python = python
748 self._output = output
749 self._all_slaves = []
750 self._starting_slaves = {}
751 self._slave_arrays = []
756 self._host = socket.gethostbyname_ex(socket.gethostname())[-1][0]
757 self._listen_sock = _ListenSocket(self._host, self.connect_timeout)
760 """Add a Slave object."""
761 if hasattr(slave,
'_get_slaves'):
762 self._slave_arrays.append(slave)
764 self._all_slaves.append(slave)
767 """Create and return a new Context in which tasks can be run.
768 @param startup If not None, a callable (Python function or class
769 that implements the \_\_call\_\_ method) that sets up
770 the slave to run tasks. This method is only called
771 once per slave. The return values from this method
772 are passed to the task object when it runs on
774 @return A new Context object.
778 def _get_results_unordered(self, context):
779 """Run all of a context's tasks, and yield results"""
780 self._send_tasks_to_slaves(context)
783 for task
in self._get_finished_tasks(context):
784 tasks_queued = len(context._tasks)
788 if len(context._tasks) > tasks_queued:
789 self._send_tasks_to_slaves(context)
790 except _NoMoreTasksError:
793 def _start_all_slaves(self):
794 for array
in self._slave_arrays:
795 self._all_slaves.extend(array._get_slaves())
797 command = (
"%s -c \"import IMP.parallel.slave_handler as s; s.main()\" "
798 "%s %d") % (self._python, self._host, self._listen_sock.port)
800 for (num, slave)
in enumerate(self._all_slaves):
801 if slave._ready_to_start():
802 unique_id = self._get_unique_id(num)
803 self._starting_slaves[unique_id] = slave
804 slave._start(command, unique_id, self._output % num)
806 for array
in self._slave_arrays:
807 array._start(command)
808 self._slave_arrays = []
810 def _get_unique_id(self, num):
812 for i
in range(0, 8):
813 id += chr(random.randint(0, 25) + ord(
'A'))
816 def _send_tasks_to_slaves(self, context):
817 self._start_all_slaves()
819 available_slaves = [a
for a
in self._all_slaves
820 if a._ready_for_task(context)] + \
821 [a
for a
in self._all_slaves
822 if a._ready_for_task(
None)]
823 for slave
in available_slaves:
824 if len(context._tasks) == 0:
827 self._send_task_to_slave(slave, context)
829 def _send_task_to_slave(self, slave, context):
830 if len(context._tasks) == 0:
832 t = context._tasks[0]
834 slave._start_task(t, context)
835 context._tasks.pop(0)
836 except socket.error
as detail:
839 def _get_finished_tasks(self, context):
841 events = self._get_network_events(context)
843 self._kill_all_running_slaves(context)
845 task = self._process_event(event, context)
849 def _process_event(self, event, context):
850 if event == self._listen_sock:
852 (conn, addr) = self._listen_sock.accept()
853 new_slave = self._accept_slave(conn, context)
854 elif event._running_task(context):
856 task = event._get_finished_task()
858 self._send_task_to_slave(event, context)
861 self._kill_timed_out_slaves(context)
862 except NetworkError
as detail:
864 print(
"Slave %s failed (%s): rescheduling task %s" \
865 % (str(event), str(detail), str(task)))
866 context._tasks.append(task)
867 self._send_tasks_to_slaves(context)
871 def _kill_timed_out_slaves(self, context):
872 timed_out = [a
for a
in self._all_slaves
if a._running_task(context) \
873 and a.get_contact_timed_out(self.heartbeat_timeout)]
874 for slave
in timed_out:
876 print(
"Did not hear from slave %s in %d seconds; rescheduling "
877 "task %s" % (str(slave), self.heartbeat_timeout, str(task)))
878 context._tasks.append(task)
879 if len(timed_out) > 0:
880 self._send_tasks_to_slaves(context)
882 def _kill_all_running_slaves(self, context):
883 running = [a
for a
in self._all_slaves
if a._running_task(context)]
884 for slave
in running:
886 context._tasks.append(task)
887 raise NetworkError(
"Did not hear from any running slave in "
888 "%d seconds" % self.heartbeat_timeout)
890 def _accept_slave(self, sock, context):
891 sock.setblocking(
True)
892 identifier = sock.recv(1024)
894 identifier = identifier.decode(
'ascii')
895 if identifier
and identifier
in self._starting_slaves:
896 slave = self._starting_slaves.pop(identifier)
897 slave._accept_connection(sock)
898 print(
"Identified slave %s " % str(slave))
899 self._init_slave(slave)
900 self._send_task_to_slave(slave, context)
903 print(
"Ignoring request from unknown slave")
905 def _init_slave(self, slave):
906 if _import_time_path[0] !=
'':
907 slave._set_python_search_path(_import_time_path[0])
908 if sys.path[0] !=
'' and sys.path[0] != _import_time_path[0]:
909 slave._set_python_search_path(sys.path[0])
911 def _get_network_events(self, context):
912 running = [a
for a
in self._all_slaves
if a._running_task(context)]
913 if len(running) == 0:
914 if len(context._tasks) == 0:
915 raise _NoMoreTasksError()
916 elif len(self._starting_slaves) == 0:
920 return util._poll_events(self._listen_sock, running,
921 self.heartbeat_timeout)
925 def get_module_version():
926 """get_module_version() -> std::string const"""
927 return _IMP_parallel.get_module_version()
930 """get_example_path(std::string fname) -> std::string"""
931 return _IMP_parallel.get_example_path(fname)
934 """get_data_path(std::string fname) -> std::string"""
935 return _IMP_parallel.get_data_path(fname)
937 from .
import _version_check
938 _version_check.check_version(get_module_version())
939 __version__ = get_module_version()
Representation of an array of slaves.
std::string get_data_path(std::string file_name)
Return the full path to one of this module's data files.
Error raised if a slave has an unhandled exception.
An array of slaves in a Sun Grid Engine system parallel environment.
A collection of tasks that run in the same environment.
Representation of a single slave.
std::string get_example_path(std::string file_name)
Return the full path to one of this module's example files.
def add_task
Add a task to this context.
Utilities for the IMP.parallel module.
Base class for all errors specific to the parallel module.
The general base class for IMP exceptions.
def add_slave
Add a Slave object.
def get_context
Create and return a new Context in which tasks can be run.
Error raised if a problem occurs with the network.
Error raised if all slaves failed, so tasks cannot be run.
Distribute IMP tasks to multiple processors or machines.
An array of slaves on a Sun Grid Engine system, started with 'qsub'.
def get_results_unordered
Run all of the tasks on available slaves, and return results.
Manages slaves and contexts.
A slave running on the same machine as the master.