10 from __future__
import print_function, division, absolute_import
15 from sys
import version_info
16 if version_info >= (2, 6, 0):
17 def swig_import_helper():
18 from os.path
import dirname
22 fp, pathname, description = imp.find_module(
'_IMP_parallel', [dirname(__file__)])
28 _mod = imp.load_module(
'_IMP_parallel', fp, pathname, description)
32 _IMP_parallel = swig_import_helper()
33 del swig_import_helper
38 _swig_property = property
43 def _swig_setattr_nondynamic(self, class_type, name, value, static=1):
44 if (name ==
"thisown"):
45 return self.this.own(value)
47 if type(value).__name__ ==
'SwigPyObject':
48 self.__dict__[name] = value
50 method = class_type.__swig_setmethods__.get(name,
None)
52 return method(self, value)
54 object.__setattr__(self, name, value)
56 raise AttributeError(
"You cannot add attributes to %s" % self)
59 def _swig_setattr(self, class_type, name, value):
60 return _swig_setattr_nondynamic(self, class_type, name, value, 0)
63 def _swig_getattr_nondynamic(self, class_type, name, static=1):
64 if (name ==
"thisown"):
65 return self.this.own()
66 method = class_type.__swig_getmethods__.get(name,
None)
70 return object.__getattr__(self, name)
72 raise AttributeError(name)
74 def _swig_getattr(self, class_type, name):
75 return _swig_getattr_nondynamic(self, class_type, name, 0)
80 strthis =
"proxy of " + self.this.__repr__()
83 return "<%s.%s; %s >" % (self.__class__.__module__, self.__class__.__name__, strthis,)
88 except AttributeError:
95 def _swig_setattr_nondynamic_method(set):
96 def set_attr(self, name, value):
97 if (name ==
"thisown"):
98 return self.this.own(value)
99 if hasattr(self, name)
or (name ==
"this"):
100 set(self, name, value)
102 raise AttributeError(
"You cannot add attributes to %s" % self)
108 weakref_proxy = weakref.proxy
110 weakref_proxy =
lambda x: x
113 class IMP_PARALLEL_SwigPyIterator(object):
114 """Proxy of C++ swig::IMP_PARALLEL_SwigPyIterator class"""
115 thisown = _swig_property(
lambda x: x.this.own(),
lambda x, v: x.this.own(v), doc=
'The membership flag')
117 def __init__(self, *args, **kwargs):
118 raise AttributeError(
"No constructor defined - class is abstract")
119 __repr__ = _swig_repr
120 __swig_destroy__ = _IMP_parallel.delete_IMP_PARALLEL_SwigPyIterator
121 __del__ =
lambda self:
None
124 """value(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
125 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_value(self)
130 incr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
131 incr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
133 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_incr(self, n)
138 decr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
139 decr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
141 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_decr(self, n)
144 def distance(self, x):
145 """distance(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t"""
146 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_distance(self, x)
150 """equal(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
151 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_equal(self, x)
155 """copy(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator"""
156 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_copy(self)
160 """next(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
161 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_next(self)
165 """__next__(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
166 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___next__(self)
170 """previous(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
171 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_previous(self)
174 def advance(self, n):
175 """advance(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
176 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_advance(self, n)
180 """__eq__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
181 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___eq__(self, x)
185 """__ne__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
186 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___ne__(self, x)
189 def __iadd__(self, n):
190 """__iadd__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
191 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___iadd__(self, n)
194 def __isub__(self, n):
195 """__isub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
196 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___isub__(self, n)
199 def __add__(self, n):
200 """__add__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
201 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___add__(self, n)
204 def __sub__(self, *args):
206 __sub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator
207 __sub__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t
209 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___sub__(self, *args)
213 IMP_PARALLEL_SwigPyIterator_swigregister = _IMP_parallel.IMP_PARALLEL_SwigPyIterator_swigregister
214 IMP_PARALLEL_SwigPyIterator_swigregister(IMP_PARALLEL_SwigPyIterator)
222 _IMP_parallel.IMP_DEBUG_swigconstant(_IMP_parallel)
223 IMP_DEBUG = _IMP_parallel.IMP_DEBUG
225 _IMP_parallel.IMP_RELEASE_swigconstant(_IMP_parallel)
226 IMP_RELEASE = _IMP_parallel.IMP_RELEASE
228 _IMP_parallel.IMP_SILENT_swigconstant(_IMP_parallel)
229 IMP_SILENT = _IMP_parallel.IMP_SILENT
231 _IMP_parallel.IMP_PROGRESS_swigconstant(_IMP_parallel)
232 IMP_PROGRESS = _IMP_parallel.IMP_PROGRESS
234 _IMP_parallel.IMP_TERSE_swigconstant(_IMP_parallel)
235 IMP_TERSE = _IMP_parallel.IMP_TERSE
237 _IMP_parallel.IMP_VERBOSE_swigconstant(_IMP_parallel)
238 IMP_VERBOSE = _IMP_parallel.IMP_VERBOSE
240 _IMP_parallel.IMP_MEMORY_swigconstant(_IMP_parallel)
241 IMP_MEMORY = _IMP_parallel.IMP_MEMORY
243 _IMP_parallel.IMP_NONE_swigconstant(_IMP_parallel)
244 IMP_NONE = _IMP_parallel.IMP_NONE
246 _IMP_parallel.IMP_USAGE_swigconstant(_IMP_parallel)
247 IMP_USAGE = _IMP_parallel.IMP_USAGE
249 _IMP_parallel.IMP_INTERNAL_swigconstant(_IMP_parallel)
250 IMP_INTERNAL = _IMP_parallel.IMP_INTERNAL
252 _IMP_parallel.IMP_KERNEL_HAS_LOG4CXX_swigconstant(_IMP_parallel)
253 IMP_KERNEL_HAS_LOG4CXX = _IMP_parallel.IMP_KERNEL_HAS_LOG4CXX
255 _IMP_parallel.IMP_COMPILER_HAS_AUTO_swigconstant(_IMP_parallel)
256 IMP_COMPILER_HAS_AUTO = _IMP_parallel.IMP_COMPILER_HAS_AUTO
258 _IMP_parallel.IMP_COMPILER_HAS_DEBUG_VECTOR_swigconstant(_IMP_parallel)
259 IMP_COMPILER_HAS_DEBUG_VECTOR = _IMP_parallel.IMP_COMPILER_HAS_DEBUG_VECTOR
261 _IMP_parallel.IMP_KERNEL_HAS_BOOST_RANDOM_swigconstant(_IMP_parallel)
262 IMP_KERNEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_KERNEL_HAS_BOOST_RANDOM
264 _IMP_parallel.IMP_KERNEL_HAS_GPERFTOOLS_swigconstant(_IMP_parallel)
265 IMP_KERNEL_HAS_GPERFTOOLS = _IMP_parallel.IMP_KERNEL_HAS_GPERFTOOLS
267 _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPCHECKER_swigconstant(_IMP_parallel)
268 IMP_KERNEL_HAS_TCMALLOC_HEAPCHECKER = _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPCHECKER
270 _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPPROFILER_swigconstant(_IMP_parallel)
271 IMP_KERNEL_HAS_TCMALLOC_HEAPPROFILER = _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPPROFILER
273 _IMP_parallel.IMPKERNEL_SHOW_WARNINGS_swigconstant(_IMP_parallel)
274 IMPKERNEL_SHOW_WARNINGS = _IMP_parallel.IMPKERNEL_SHOW_WARNINGS
276 class _DirectorObjects(object):
277 """@internal Simple class to keep references to director objects
278 to prevent premature deletion."""
281 def register(self, obj):
282 """Take a reference to a director object; will only work for
283 refcounted C++ classes"""
284 if hasattr(obj,
'get_ref_count'):
285 self._objects.append(obj)
287 """Only drop our reference and allow cleanup by Python if no other
288 Python references exist (we hold 3 references: one in self._objects,
289 one in x, and one in the argument list for getrefcount) *and* no
290 other C++ references exist (the Python object always holds one)"""
291 objs = [x
for x
in self._objects
if sys.getrefcount(x) > 3 \
292 or x.get_ref_count() > 1]
296 def get_object_count(self):
297 """Get number of director objects (useful for testing only)"""
298 return len(self._objects)
299 _director_objects = _DirectorObjects()
301 class _ostream(object):
302 """Proxy of C++ std::ostream class"""
303 thisown = _swig_property(
lambda x: x.this.own(),
lambda x, v: x.this.own(v), doc=
'The membership flag')
305 def __init__(self, *args, **kwargs):
306 raise AttributeError(
"No constructor defined")
307 __repr__ = _swig_repr
309 def write(self, osa_buf):
310 """write(_ostream self, char const * osa_buf)"""
311 return _IMP_parallel._ostream_write(self, osa_buf)
313 _ostream_swigregister = _IMP_parallel._ostream_swigregister
314 _ostream_swigregister(_ostream)
317 _IMP_parallel.IMP_COMPILER_HAS_OVERRIDE_swigconstant(_IMP_parallel)
318 IMP_COMPILER_HAS_OVERRIDE = _IMP_parallel.IMP_COMPILER_HAS_OVERRIDE
320 _IMP_parallel.IMP_COMPILER_HAS_FINAL_swigconstant(_IMP_parallel)
321 IMP_COMPILER_HAS_FINAL = _IMP_parallel.IMP_COMPILER_HAS_FINAL
323 _IMP_parallel.IMP_HAS_NOEXCEPT_swigconstant(_IMP_parallel)
324 IMP_HAS_NOEXCEPT = _IMP_parallel.IMP_HAS_NOEXCEPT
327 _IMP_parallel.IMP_PARALLEL_HAS_BOOST_FILESYSTEM_swigconstant(_IMP_parallel)
328 IMP_PARALLEL_HAS_BOOST_FILESYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_FILESYSTEM
330 _IMP_parallel.IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS_swigconstant(_IMP_parallel)
331 IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS
333 _IMP_parallel.IMP_PARALLEL_HAS_BOOST_RANDOM_swigconstant(_IMP_parallel)
334 IMP_PARALLEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_RANDOM
336 _IMP_parallel.IMP_PARALLEL_HAS_BOOST_SYSTEM_swigconstant(_IMP_parallel)
337 IMP_PARALLEL_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_SYSTEM
339 _IMP_parallel.IMPPARALLEL_SHOW_WARNINGS_swigconstant(_IMP_parallel)
340 IMPPARALLEL_SHOW_WARNINGS = _IMP_parallel.IMPPARALLEL_SHOW_WARNINGS
350 import cPickle
as pickle
361 _import_time_path = sys.path[:]
364 """Base class for all errors specific to the parallel module"""
368 class _NoMoreTasksError(Error):
373 """Error raised if all slaves failed, so tasks cannot be run"""
378 """Error raised if a problem occurs with the network"""
383 """Error raised if a slave has an unhandled exception"""
384 def __init__(self, exc, traceback, slave):
386 self.traceback = traceback
390 errstr = str(self.exc.__class__).replace(
"exceptions.",
"")
391 return "%s: %s from %s\nRemote traceback:\n%s" \
392 % (errstr, str(self.exc), str(self.slave), self.traceback)
394 class _Communicator(object):
395 """Simple support for sending Python pickled objects over the network"""
401 def _send(self, obj):
404 p.pack_string(pickle.dumps(obj, -1))
408 p.pack_string(pickle.dumps(obj, 0))
409 self._socket.sendall(p.get_buffer())
411 def get_data_pending(self):
412 return len(self._ibuffer) > 0
417 obj, self._ibuffer = self._unpickle(self._ibuffer)
418 if isinstance(obj, _ErrorWrapper):
422 except (IndexError, EOFError):
424 data = self._socket.recv(4096)
425 except socket.error
as detail:
427 % (str(self), str(detail)))
429 self._ibuffer += data
433 def _unpickle(self, ibuffer):
434 p = xdrlib.Unpacker(ibuffer)
435 obj = p.unpack_string()
436 return (pickle.loads(obj), ibuffer[p.get_position():])
440 """Representation of a single slave.
441 Each slave uses a single thread of execution (i.e. a single CPU core)
442 to run tasks sequentially.
443 Slave is an abstract class; instead of using this class directly, use
444 a subclass such as LocalSlave or SGEQsubSlaveArray."""
447 _Communicator.__init__(self)
448 self._state = slavestate.init
451 self.update_contact_time()
453 def _start(self, command, unique_id, output):
454 """Start the slave running on the remote host; override in subclasses"""
455 self._state = slavestate.started
457 def _accept_connection(self, sock):
459 self._state = slavestate.connected
460 self.update_contact_time()
462 def _set_python_search_path(self, path):
463 self._send(_SetPathAction(path))
465 def update_contact_time(self):
466 self.last_contact_time = time.time()
468 def get_contact_timed_out(self, timeout):
469 return (time.time() - self.last_contact_time) > timeout
471 def _start_task(self, task, context):
472 if not self._ready_for_task(context)
and not self._ready_for_task(
None):
473 raise TypeError(
"%s not ready for task" % str(self))
474 if self._context != context:
475 self._context = context
476 self._send(_ContextWrapper(context._startup))
477 self._state = slavestate.running_task
479 self._send(_TaskWrapper(task))
481 def _get_finished_task(self):
484 self.update_contact_time()
485 if isinstance(r, _HeartBeat):
486 if not self.get_data_pending():
493 self._state = slavestate.connected
500 self._state = slavestate.dead
503 def _ready_to_start(self):
504 return self._state == slavestate.init
506 def _ready_for_task(self, context):
507 return self._state == slavestate.connected \
508 and self._context == context
510 def _running_task(self, context):
511 return self._state == slavestate.running_task \
512 and self._context == context
516 """Representation of an array of slaves.
517 This is similar to Slave, except that it represents a collection of
518 slaves that are controlled together, such as a batch submission system
519 array job on a compute cluster.
520 Slave is an abstract class; instead of using this class directly, use
521 a subclass such as SGEQsubSlaveArray."""
523 def _get_slaves(self):
524 """Return a list of Slave objects contained within this array"""
528 """Do any necessary startup after all contained Slaves have started"""
532 class LocalSlave(Slave):
533 """A slave running on the same machine as the master."""
535 def _start(self, command, unique_id, output):
536 Slave._start(self, command, unique_id, output)
537 cmdline =
"%s %s" % (command, unique_id)
538 _run_background(cmdline, output)
541 return "<LocalSlave>"
544 class _SGEQsubSlave(Slave):
545 def __init__(self, array):
550 def _start(self, command, unique_id, output):
551 Slave._start(self, command, unique_id, output)
552 self._array._slave_started(unique_id, output, self)
558 return "<SGE qsub slave, ID %s>" % jobid
562 """An array of slaves on a Sun Grid Engine system, started with 'qsub'.
563 To use this class, the master process must be running on a machine that
564 can submit Sun Grid Engine (SGE) jobs using the 'qsub' command (this
565 is termed a 'submit host' by SGE). The class starts an SGE job array
566 (every slave has the same SGE job ID, but a different task ID).
570 standard_options =
'-j y -cwd -r n -o sge-errors'
574 @param numslave The number of slaves, which correponds to the
575 number of tasks in the SGE job.
576 @param options A string of SGE options that are passed on the 'qsub'
577 command line. This is added to standard_options.
579 self._numslave = numslave
580 self._options = options
581 self._starting_slaves = []
584 def _get_slaves(self):
585 """Return a list of Slave objects contained within this array"""
586 return [_SGEQsubSlave(self)
for x
in range(self._numslave)]
588 def _slave_started(self, command, output, slave):
589 self._starting_slaves.append((command, output, slave))
591 def _start(self, command):
592 qsub =
"qsub -S /bin/sh %s %s -t 1-%d" % \
593 (self._options, self.standard_options,
594 len(self._starting_slaves))
597 (inp, out) = (a.stdin, a.stdout)
598 slave_uid =
" ".join([repr(s[0])
for s
in self._starting_slaves])
599 slave_out =
" ".join([repr(s[1])
for s
in self._starting_slaves])
600 inp.write(
"#!/bin/sh\n")
601 inp.write(
"uid=( '' %s )\n" % slave_uid)
602 inp.write(
"out=( '' %s )\n" % slave_out)
603 inp.write(
"myuid=${uid[$SGE_TASK_ID]}\n")
604 inp.write(
"myout=${out[$SGE_TASK_ID]}\n")
605 inp.write(
"%s $myuid > $myout 2>&1\n" % command)
607 outlines = out.readlines()
609 for line
in outlines:
610 print(line.rstrip(
'\r\n'))
611 a.require_clean_exit()
612 self._set_jobid(outlines)
613 self._starting_slaves = []
615 def _set_jobid(self, outlines):
616 """Try to figure out the job ID from the SGE qsub output"""
617 if len(outlines) > 0:
618 m = re.compile(
r"\d+").search(outlines[0])
620 self._jobid = int(m.group())
621 for (num, slave)
in enumerate(self._starting_slaves):
622 slave[2]._jobid =
"%d.%d" % (self._jobid, num+1)
625 class _SGEPESlave(
Slave):
630 def _start(self, command, unique_id, output):
631 Slave._start(self, command, unique_id, output)
632 cmdline =
"qrsh -inherit -V %s %s %s" % (self._host, command, unique_id)
633 _run_background(cmdline, output)
636 return "<SGE PE slave on %s>" % self._host
640 """An array of slaves in a Sun Grid Engine system parallel environment.
641 In order to use this class, the master must be run via Sun Grid Engine's
642 'qsub' command and submitted to a parallel environment using the qsub
643 -pe option. This class will start slaves on every node in the parallel
644 environment (including the node running the master). Each slave is
645 started using the 'qrsh' command with the '-inherit' option."""
647 def _get_slaves(self):
650 pe = os.environ[
'PE_HOSTFILE']
656 (node, num, queue) = line.split(
None, 2)
657 for i
in range(int(num)):
658 slaves.append(_SGEPESlave(node))
668 """A collection of tasks that run in the same environment.
669 Context objects are typically created by calling Manager::get_context().
673 self._manager = manager
674 self._startup = startup
678 """Add a task to this context.
679 Tasks are any Python callable object that can be pickled (e.g. a
680 function or a class that implements the \_\_call\_\_ method). When
681 the task is run on the slave its arguments are the return value
682 from this context's startup function."""
683 self._tasks.append(task)
686 """Run all of the tasks on available slaves, and return results.
687 If there are more tasks than slaves, subsequent tasks are
688 started only once a running task completes: each slave only runs
689 a single task at a time. As each task completes, the return value(s)
690 from the task callable are returned from this method, as a
691 Python generator. Note that the results are returned in the order
692 that tasks complete, which may not be the same as the order they
695 \exception NoMoreSlavesError there are no slaves available
696 to run the tasks (or they all failed during execution).
697 \exception RemoteError a slave encountered an unhandled exception.
698 \exception NetworkError the master lost (or was unable to
699 establish) communication with any slave.
701 return self._manager._get_results_unordered(self)
705 """Manages slaves and contexts.
708 connect_timeout = 7200
711 heartbeat_timeout = 7200
713 def __init__(self, python=None, host=None, output='slave%d.output'):
715 @param python If not None, the command to run to start a Python
716 interpreter that can import the IMP module. Otherwise,
717 the same interpreter that the master is currently using
718 is used. This is passed to the shell, so a full command
719 line (including multiple words separated by spaces) can
720 be used if necessary.
721 @param host The hostname that slaves use to connect back to the
722 master. If not specified, the master machine's primary
723 IP address is used. On multi-homed machines, such as
724 compute cluster headnodes, this may need to be changed
725 to allow all slaves to reach the master (typically the
726 name of the machine's internal network address is
727 needed). If only running local slaves, 'localhost' can
728 be used to prohibit connections across the network.
729 @param output A format string used to name slave output files. It is
730 given the numeric slave id, so for example the default
731 value 'slave\%d.output' will yield output files called
732 slave0.output, slave1.output, etc.
735 self._python = sys.executable
737 self._python = python
739 self._output = output
740 self._all_slaves = []
741 self._starting_slaves = {}
742 self._slave_arrays = []
747 self._host = socket.gethostbyname_ex(socket.gethostname())[-1][0]
748 self._listen_sock = _ListenSocket(self._host, self.connect_timeout)
751 """Add a Slave object."""
752 if hasattr(slave,
'_get_slaves'):
753 self._slave_arrays.append(slave)
755 self._all_slaves.append(slave)
758 """Create and return a new Context in which tasks can be run.
759 @param startup If not None, a callable (Python function or class
760 that implements the \_\_call\_\_ method) that sets up
761 the slave to run tasks. This method is only called
762 once per slave. The return values from this method
763 are passed to the task object when it runs on
765 @return A new Context object.
769 def _get_results_unordered(self, context):
770 """Run all of a context's tasks, and yield results"""
771 self._send_tasks_to_slaves(context)
774 for task
in self._get_finished_tasks(context):
775 tasks_queued = len(context._tasks)
779 if len(context._tasks) > tasks_queued:
780 self._send_tasks_to_slaves(context)
781 except _NoMoreTasksError:
784 def _start_all_slaves(self):
785 for array
in self._slave_arrays:
786 self._all_slaves.extend(array._get_slaves())
788 command = (
"%s -c \"import IMP.parallel.slave_handler as s; s.main()\" "
789 "%s %d") % (self._python, self._host, self._listen_sock.port)
791 for (num, slave)
in enumerate(self._all_slaves):
792 if slave._ready_to_start():
793 unique_id = self._get_unique_id(num)
794 self._starting_slaves[unique_id] = slave
795 slave._start(command, unique_id, self._output % num)
797 for array
in self._slave_arrays:
798 array._start(command)
799 self._slave_arrays = []
801 def _get_unique_id(self, num):
803 for i
in range(0, 8):
804 id += chr(random.randint(0, 25) + ord(
'A'))
807 def _send_tasks_to_slaves(self, context):
808 self._start_all_slaves()
810 available_slaves = [a
for a
in self._all_slaves
811 if a._ready_for_task(context)] + \
812 [a
for a
in self._all_slaves
813 if a._ready_for_task(
None)]
814 for slave
in available_slaves:
815 if len(context._tasks) == 0:
818 self._send_task_to_slave(slave, context)
820 def _send_task_to_slave(self, slave, context):
821 if len(context._tasks) == 0:
823 t = context._tasks[0]
825 slave._start_task(t, context)
826 context._tasks.pop(0)
827 except socket.error
as detail:
830 def _get_finished_tasks(self, context):
832 events = self._get_network_events(context)
834 self._kill_all_running_slaves(context)
836 task = self._process_event(event, context)
840 def _process_event(self, event, context):
841 if event == self._listen_sock:
843 (conn, addr) = self._listen_sock.accept()
844 new_slave = self._accept_slave(conn, context)
845 elif event._running_task(context):
847 task = event._get_finished_task()
849 self._send_task_to_slave(event, context)
852 self._kill_timed_out_slaves(context)
853 except NetworkError
as detail:
855 print(
"Slave %s failed (%s): rescheduling task %s" \
856 % (str(event), str(detail), str(task)))
857 context._tasks.append(task)
858 self._send_tasks_to_slaves(context)
862 def _kill_timed_out_slaves(self, context):
863 timed_out = [a
for a
in self._all_slaves
if a._running_task(context) \
864 and a.get_contact_timed_out(self.heartbeat_timeout)]
865 for slave
in timed_out:
867 print(
"Did not hear from slave %s in %d seconds; rescheduling "
868 "task %s" % (str(slave), self.heartbeat_timeout, str(task)))
869 context._tasks.append(task)
870 if len(timed_out) > 0:
871 self._send_tasks_to_slaves(context)
873 def _kill_all_running_slaves(self, context):
874 running = [a
for a
in self._all_slaves
if a._running_task(context)]
875 for slave
in running:
877 context._tasks.append(task)
878 raise NetworkError(
"Did not hear from any running slave in "
879 "%d seconds" % self.heartbeat_timeout)
881 def _accept_slave(self, sock, context):
882 sock.setblocking(
True)
883 identifier = sock.recv(1024)
885 identifier = identifier.decode(
'ascii')
886 if identifier
and identifier
in self._starting_slaves:
887 slave = self._starting_slaves.pop(identifier)
888 slave._accept_connection(sock)
889 print(
"Identified slave %s " % str(slave))
890 self._init_slave(slave)
891 self._send_task_to_slave(slave, context)
894 print(
"Ignoring request from unknown slave")
896 def _init_slave(self, slave):
897 if _import_time_path[0] !=
'':
898 slave._set_python_search_path(_import_time_path[0])
899 if sys.path[0] !=
'' and sys.path[0] != _import_time_path[0]:
900 slave._set_python_search_path(sys.path[0])
902 def _get_network_events(self, context):
903 running = [a
for a
in self._all_slaves
if a._running_task(context)]
904 if len(running) == 0:
905 if len(context._tasks) == 0:
906 raise _NoMoreTasksError()
907 elif len(self._starting_slaves) == 0:
911 return util._poll_events(self._listen_sock, running,
912 self.heartbeat_timeout)
916 def get_module_version():
917 """get_module_version() -> std::string const"""
918 return _IMP_parallel.get_module_version()
921 """get_example_path(std::string fname) -> std::string"""
922 return _IMP_parallel.get_example_path(fname)
925 """get_data_path(std::string fname) -> std::string"""
926 return _IMP_parallel.get_data_path(fname)
927 from .
import _version_check
928 _version_check.check_version(get_module_version())
929 __version__ = get_module_version()
Representation of an array of slaves.
std::string get_data_path(std::string file_name)
Return the full path to one of this module's data files.
Error raised if a slave has an unhandled exception.
An array of slaves in a Sun Grid Engine system parallel environment.
A collection of tasks that run in the same environment.
Representation of a single slave.
std::string get_example_path(std::string file_name)
Return the full path to one of this module's example files.
def add_task
Add a task to this context.
Utilities for the IMP.parallel module.
Base class for all errors specific to the parallel module.
The general base class for IMP exceptions.
def add_slave
Add a Slave object.
def get_context
Create and return a new Context in which tasks can be run.
Error raised if a problem occurs with the network.
Error raised if all slaves failed, so tasks cannot be run.
Distribute IMP tasks to multiple processors or machines.
An array of slaves on a Sun Grid Engine system, started with 'qsub'.
def get_results_unordered
Run all of the tasks on available slaves, and return results.
Manages slaves and contexts.
A slave running on the same machine as the master.