10 from __future__
import print_function, division, absolute_import
15 from sys
import version_info
as _swig_python_version_info
16 if _swig_python_version_info >= (2, 7, 0):
17 def swig_import_helper():
19 pkg = __name__.rpartition(
'.')[0]
20 mname =
'.'.join((pkg,
'_IMP_parallel')).lstrip(
'.')
22 return importlib.import_module(mname)
24 return importlib.import_module(
'_IMP_parallel')
25 _IMP_parallel = swig_import_helper()
26 del swig_import_helper
27 elif _swig_python_version_info >= (2, 6, 0):
28 def swig_import_helper():
29 from os.path
import dirname
33 fp, pathname, description = imp.find_module(
'_IMP_parallel', [dirname(__file__)])
39 _mod = imp.load_module(
'_IMP_parallel', fp, pathname, description)
43 _IMP_parallel = swig_import_helper()
44 del swig_import_helper
47 del _swig_python_version_info
49 _swig_property = property
54 import builtins
as __builtin__
58 def _swig_setattr_nondynamic(self, class_type, name, value, static=1):
59 if (name ==
"thisown"):
60 return self.this.own(value)
62 if type(value).__name__ ==
'SwigPyObject':
63 self.__dict__[name] = value
65 method = class_type.__swig_setmethods__.get(name,
None)
67 return method(self, value)
69 object.__setattr__(self, name, value)
71 raise AttributeError(
"You cannot add attributes to %s" % self)
74 def _swig_setattr(self, class_type, name, value):
75 return _swig_setattr_nondynamic(self, class_type, name, value, 0)
78 def _swig_getattr(self, class_type, name):
79 if (name ==
"thisown"):
80 return self.this.own()
81 method = class_type.__swig_getmethods__.get(name,
None)
84 raise AttributeError(
"'%s' object has no attribute '%s'" % (class_type.__name__, name))
89 strthis =
"proxy of " + self.this.__repr__()
90 except __builtin__.Exception:
92 return "<%s.%s; %s >" % (self.__class__.__module__, self.__class__.__name__, strthis,)
95 def _swig_setattr_nondynamic_method(set):
96 def set_attr(self, name, value):
97 if (name ==
"thisown"):
98 return self.this.own(value)
99 if hasattr(self, name)
or (name ==
"this"):
100 set(self, name, value)
102 raise AttributeError(
"You cannot add attributes to %s" % self)
108 weakref_proxy = weakref.proxy
109 except __builtin__.Exception:
110 weakref_proxy =
lambda x: x
113 class IMP_PARALLEL_SwigPyIterator(object):
114 """Proxy of C++ swig::IMP_PARALLEL_SwigPyIterator class."""
116 thisown = _swig_property(
lambda x: x.this.own(),
lambda x, v: x.this.own(v), doc=
'The membership flag')
118 def __init__(self, *args, **kwargs):
119 raise AttributeError(
"No constructor defined - class is abstract")
120 __repr__ = _swig_repr
121 __swig_destroy__ = _IMP_parallel.delete_IMP_PARALLEL_SwigPyIterator
122 __del__ =
lambda self:
None
125 """value(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
126 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_value(self)
131 incr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
132 incr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
134 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_incr(self, n)
139 decr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
140 decr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
142 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_decr(self, n)
145 def distance(self, x):
146 """distance(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t"""
147 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_distance(self, x)
151 """equal(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
152 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_equal(self, x)
156 """copy(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator"""
157 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_copy(self)
161 """next(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
162 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_next(self)
166 """__next__(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
167 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___next__(self)
171 """previous(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
172 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_previous(self)
175 def advance(self, n):
176 """advance(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
177 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_advance(self, n)
181 """__eq__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
182 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___eq__(self, x)
186 """__ne__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
187 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___ne__(self, x)
190 def __iadd__(self, n):
191 """__iadd__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
192 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___iadd__(self, n)
195 def __isub__(self, n):
196 """__isub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
197 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___isub__(self, n)
200 def __add__(self, n):
201 """__add__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
202 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___add__(self, n)
205 def __sub__(self, *args):
207 __sub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator
208 __sub__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t
210 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___sub__(self, *args)
214 IMP_PARALLEL_SwigPyIterator_swigregister = _IMP_parallel.IMP_PARALLEL_SwigPyIterator_swigregister
215 IMP_PARALLEL_SwigPyIterator_swigregister(IMP_PARALLEL_SwigPyIterator)
223 IMP_DEBUG = _IMP_parallel.IMP_DEBUG
224 IMP_RELEASE = _IMP_parallel.IMP_RELEASE
225 IMP_SILENT = _IMP_parallel.IMP_SILENT
226 IMP_PROGRESS = _IMP_parallel.IMP_PROGRESS
227 IMP_TERSE = _IMP_parallel.IMP_TERSE
228 IMP_VERBOSE = _IMP_parallel.IMP_VERBOSE
229 IMP_MEMORY = _IMP_parallel.IMP_MEMORY
230 IMP_NONE = _IMP_parallel.IMP_NONE
231 IMP_USAGE = _IMP_parallel.IMP_USAGE
232 IMP_INTERNAL = _IMP_parallel.IMP_INTERNAL
233 IMP_KERNEL_HAS_LOG4CXX = _IMP_parallel.IMP_KERNEL_HAS_LOG4CXX
234 IMP_COMPILER_HAS_AUTO = _IMP_parallel.IMP_COMPILER_HAS_AUTO
235 IMP_COMPILER_HAS_DEBUG_VECTOR = _IMP_parallel.IMP_COMPILER_HAS_DEBUG_VECTOR
236 IMP_COMPILER_HAS_UNIQUE_PTR = _IMP_parallel.IMP_COMPILER_HAS_UNIQUE_PTR
237 IMP_KERNEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_KERNEL_HAS_BOOST_RANDOM
238 IMP_KERNEL_HAS_GPERFTOOLS = _IMP_parallel.IMP_KERNEL_HAS_GPERFTOOLS
239 IMP_KERNEL_HAS_TCMALLOC_HEAPCHECKER = _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPCHECKER
240 IMP_KERNEL_HAS_TCMALLOC_HEAPPROFILER = _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPPROFILER
241 IMPKERNEL_SHOW_WARNINGS = _IMP_parallel.IMPKERNEL_SHOW_WARNINGS
244 class _DirectorObjects(object):
245 """@internal Simple class to keep references to director objects
246 to prevent premature deletion."""
249 def register(self, obj):
250 """Take a reference to a director object; will only work for
251 refcounted C++ classes"""
252 if hasattr(obj,
'get_ref_count'):
253 self._objects.append(obj)
255 """Only drop our reference and allow cleanup by Python if no other
256 Python references exist (we hold 3 references: one in self._objects,
257 one in x, and one in the argument list for getrefcount) *and* no
258 other C++ references exist (the Python object always holds one)"""
259 objs = [x
for x
in self._objects
if sys.getrefcount(x) > 3 \
260 or x.get_ref_count() > 1]
264 def get_object_count(self):
265 """Get number of director objects (useful for testing only)"""
266 return len(self._objects)
267 _director_objects = _DirectorObjects()
269 class _ostream(object):
270 """Proxy of C++ std::ostream class."""
272 thisown = _swig_property(
lambda x: x.this.own(),
lambda x, v: x.this.own(v), doc=
'The membership flag')
274 def __init__(self, *args, **kwargs):
275 raise AttributeError(
"No constructor defined")
276 __repr__ = _swig_repr
278 def write(self, osa_buf):
279 """write(_ostream self, char const * osa_buf)"""
280 return _IMP_parallel._ostream_write(self, osa_buf)
282 _ostream_swigregister = _IMP_parallel._ostream_swigregister
283 _ostream_swigregister(_ostream)
285 IMP_COMPILER_HAS_OVERRIDE = _IMP_parallel.IMP_COMPILER_HAS_OVERRIDE
286 IMP_COMPILER_HAS_FINAL = _IMP_parallel.IMP_COMPILER_HAS_FINAL
287 IMP_HAS_NOEXCEPT = _IMP_parallel.IMP_HAS_NOEXCEPT
288 IMP_C_OPEN_BINARY = _IMP_parallel.IMP_C_OPEN_BINARY
290 IMP_PARALLEL_HAS_BOOST_FILESYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_FILESYSTEM
291 IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS
292 IMP_PARALLEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_RANDOM
293 IMP_PARALLEL_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_SYSTEM
294 IMPPARALLEL_SHOW_WARNINGS = _IMP_parallel.IMPPARALLEL_SHOW_WARNINGS
306 import cPickle
as pickle
317 _import_time_path = sys.path[:]
320 """Base class for all errors specific to the parallel module"""
324 class _NoMoreTasksError(Error):
329 """Error raised if all slaves failed, so tasks cannot be run"""
334 """Error raised if a problem occurs with the network"""
339 """Error raised if a slave has an unhandled exception"""
340 def __init__(self, exc, traceback, slave):
342 self.traceback = traceback
346 errstr = str(self.exc.__class__).replace(
"exceptions.",
"")
347 return "%s: %s from %s\nRemote traceback:\n%s" \
348 % (errstr, str(self.exc), str(self.slave), self.traceback)
350 class _Communicator(object):
351 """Simple support for sending Python pickled objects over the network"""
357 def _send(self, obj):
360 p.pack_string(pickle.dumps(obj, -1))
364 p.pack_string(pickle.dumps(obj, 0))
365 self._socket.sendall(p.get_buffer())
367 def get_data_pending(self):
368 return len(self._ibuffer) > 0
373 obj, self._ibuffer = self._unpickle(self._ibuffer)
374 if isinstance(obj, _ErrorWrapper):
378 except (IndexError, EOFError):
380 data = self._socket.recv(4096)
381 except socket.error
as detail:
383 % (str(self), str(detail)))
385 self._ibuffer += data
389 def _unpickle(self, ibuffer):
390 p = xdrlib.Unpacker(ibuffer)
391 obj = p.unpack_string()
392 return (pickle.loads(obj), ibuffer[p.get_position():])
396 """Representation of a single slave.
397 Each slave uses a single thread of execution (i.e. a single CPU core)
398 to run tasks sequentially.
399 Slave is an abstract class; instead of using this class directly, use
400 a subclass such as LocalSlave or SGEQsubSlaveArray."""
403 _Communicator.__init__(self)
404 self._state = slavestate.init
407 self.update_contact_time()
409 def _start(self, command, unique_id, output):
410 """Start the slave running on the remote host; override in subclasses"""
411 self._state = slavestate.started
413 def _accept_connection(self, sock):
415 self._state = slavestate.connected
416 self.update_contact_time()
418 def _set_python_search_path(self, path):
419 self._send(_SetPathAction(path))
421 def update_contact_time(self):
422 self.last_contact_time = time.time()
424 def get_contact_timed_out(self, timeout):
425 return (time.time() - self.last_contact_time) > timeout
427 def _start_task(self, task, context):
428 if not self._ready_for_task(context)
and not self._ready_for_task(
None):
429 raise TypeError(
"%s not ready for task" % str(self))
430 if self._context != context:
431 self._context = context
432 self._send(_ContextWrapper(context._startup))
433 self._state = slavestate.running_task
435 self._send(_TaskWrapper(task))
437 def _get_finished_task(self):
440 self.update_contact_time()
441 if isinstance(r, _HeartBeat):
442 if not self.get_data_pending():
449 self._state = slavestate.connected
456 self._state = slavestate.dead
459 def _ready_to_start(self):
460 return self._state == slavestate.init
462 def _ready_for_task(self, context):
463 return self._state == slavestate.connected \
464 and self._context == context
466 def _running_task(self, context):
467 return self._state == slavestate.running_task \
468 and self._context == context
472 """Representation of an array of slaves.
473 This is similar to Slave, except that it represents a collection of
474 slaves that are controlled together, such as a batch submission system
475 array job on a compute cluster.
476 Slave is an abstract class; instead of using this class directly, use
477 a subclass such as SGEQsubSlaveArray."""
479 def _get_slaves(self):
480 """Return a list of Slave objects contained within this array"""
484 """Do any necessary startup after all contained Slaves have started"""
488 class LocalSlave(Slave):
489 """A slave running on the same machine as the master."""
491 def _start(self, command, unique_id, output):
492 Slave._start(self, command, unique_id, output)
493 cmdline =
"%s %s" % (command, unique_id)
494 _run_background(cmdline, output)
497 return "<LocalSlave>"
500 class _SGEQsubSlave(Slave):
501 def __init__(self, array):
506 def _start(self, command, unique_id, output):
507 Slave._start(self, command, unique_id, output)
508 self._array._slave_started(unique_id, output, self)
514 return "<SGE qsub slave, ID %s>" % jobid
518 """An array of slaves on a Sun Grid Engine system, started with 'qsub'.
519 To use this class, the master process must be running on a machine that
520 can submit Sun Grid Engine (SGE) jobs using the 'qsub' command (this
521 is termed a 'submit host' by SGE). The class starts an SGE job array
522 (every slave has the same SGE job ID, but a different task ID).
526 standard_options =
'-j y -cwd -r n -o sge-errors'
530 @param numslave The number of slaves, which corresponds to the
531 number of tasks in the SGE job.
532 @param options A string of SGE options that are passed on the 'qsub'
533 command line. This is added to standard_options.
535 self._numslave = numslave
536 self._options = options
537 self._starting_slaves = []
540 def _get_slaves(self):
541 """Return a list of Slave objects contained within this array"""
542 return [_SGEQsubSlave(self)
for x
in range(self._numslave)]
544 def _slave_started(self, command, output, slave):
545 self._starting_slaves.append((command, output, slave))
547 def _start(self, command):
548 qsub =
"qsub -S /bin/sh %s %s -t 1-%d" % \
549 (self._options, self.standard_options,
550 len(self._starting_slaves))
553 (inp, out) = (a.stdin, a.stdout)
554 slave_uid =
" ".join([repr(s[0])
for s
in self._starting_slaves])
555 slave_out =
" ".join([repr(s[1])
for s
in self._starting_slaves])
556 inp.write(
"#!/bin/sh\n")
557 inp.write(
"uid=( '' %s )\n" % slave_uid)
558 inp.write(
"out=( '' %s )\n" % slave_out)
559 inp.write(
"myuid=${uid[$SGE_TASK_ID]}\n")
560 inp.write(
"myout=${out[$SGE_TASK_ID]}\n")
561 inp.write(
"%s $myuid > $myout 2>&1\n" % command)
563 outlines = out.readlines()
565 for line
in outlines:
566 print(line.rstrip(
'\r\n'))
567 a.require_clean_exit()
568 self._set_jobid(outlines)
569 self._starting_slaves = []
571 def _set_jobid(self, outlines):
572 """Try to figure out the job ID from the SGE qsub output"""
573 if len(outlines) > 0:
574 m = re.compile(
r"\d+").search(outlines[0])
576 self._jobid = int(m.group())
577 for (num, slave)
in enumerate(self._starting_slaves):
578 slave[2]._jobid =
"%d.%d" % (self._jobid, num+1)
581 class _SGEPESlave(
Slave):
586 def _start(self, command, unique_id, output):
587 Slave._start(self, command, unique_id, output)
588 cmdline =
"qrsh -inherit -V %s %s %s" % (self._host, command, unique_id)
589 _run_background(cmdline, output)
592 return "<SGE PE slave on %s>" % self._host
596 """An array of slaves in a Sun Grid Engine system parallel environment.
597 In order to use this class, the master must be run via Sun Grid Engine's
598 'qsub' command and submitted to a parallel environment using the qsub
599 -pe option. This class will start slaves on every node in the parallel
600 environment (including the node running the master). Each slave is
601 started using the 'qrsh' command with the '-inherit' option."""
603 def _get_slaves(self):
606 pe = os.environ[
'PE_HOSTFILE']
612 (node, num, queue) = line.split(
None, 2)
613 for i
in range(int(num)):
614 slaves.append(_SGEPESlave(node))
624 """A collection of tasks that run in the same environment.
625 Context objects are typically created by calling Manager::get_context().
629 self._manager = manager
630 self._startup = startup
634 """Add a task to this context.
635 Tasks are any Python callable object that can be pickled (e.g. a
636 function or a class that implements the \_\_call\_\_ method). When
637 the task is run on the slave its arguments are the return value
638 from this context's startup function."""
639 self._tasks.append(task)
642 """Run all of the tasks on available slaves, and return results.
643 If there are more tasks than slaves, subsequent tasks are
644 started only once a running task completes: each slave only runs
645 a single task at a time. As each task completes, the return value(s)
646 from the task callable are returned from this method, as a
647 Python generator. Note that the results are returned in the order
648 that tasks complete, which may not be the same as the order they
651 @exception NoMoreSlavesError there are no slaves available
652 to run the tasks (or they all failed during execution).
653 @exception RemoteError a slave encountered an unhandled exception.
654 @exception NetworkError the master lost (or was unable to
655 establish) communication with any slave.
657 return self._manager._get_results_unordered(self)
661 """Manages slaves and contexts.
664 connect_timeout = 7200
667 heartbeat_timeout = 7200
669 def __init__(self, python=None, host=None, output='slave%d.output'):
671 @param python If not None, the command to run to start a Python
672 interpreter that can import the IMP module. Otherwise,
673 the same interpreter that the master is currently using
674 is used. This is passed to the shell, so a full command
675 line (including multiple words separated by spaces) can
676 be used if necessary.
677 @param host The hostname that slaves use to connect back to the
678 master. If not specified, the master machine's primary
679 IP address is used. On multi-homed machines, such as
680 compute cluster headnodes, this may need to be changed
681 to allow all slaves to reach the master (typically the
682 name of the machine's internal network address is
683 needed). If only running local slaves, 'localhost' can
684 be used to prohibit connections across the network.
685 @param output A format string used to name slave output files. It is
686 given the numeric slave id, so for example the default
687 value 'slave\%d.output' will yield output files called
688 slave0.output, slave1.output, etc.
691 self._python = sys.executable
693 self._python = python
695 self._output = output
696 self._all_slaves = []
697 self._starting_slaves = {}
698 self._slave_arrays = []
703 self._host = socket.gethostbyname_ex(socket.gethostname())[-1][0]
704 self._listen_sock = _ListenSocket(self._host, self.connect_timeout)
707 """Add a Slave object."""
708 if hasattr(slave,
'_get_slaves'):
709 self._slave_arrays.append(slave)
711 self._all_slaves.append(slave)
714 """Create and return a new Context in which tasks can be run.
715 @param startup If not None, a callable (Python function or class
716 that implements the \_\_call\_\_ method) that sets up
717 the slave to run tasks. This method is only called
718 once per slave. The return values from this method
719 are passed to the task object when it runs on
721 @return A new Context object.
725 def _get_results_unordered(self, context):
726 """Run all of a context's tasks, and yield results"""
727 self._send_tasks_to_slaves(context)
730 for task
in self._get_finished_tasks(context):
731 tasks_queued = len(context._tasks)
735 if len(context._tasks) > tasks_queued:
736 self._send_tasks_to_slaves(context)
737 except _NoMoreTasksError:
740 def _start_all_slaves(self):
741 for array
in self._slave_arrays:
742 self._all_slaves.extend(array._get_slaves())
744 command = (
"%s -c \"import IMP.parallel.slave_handler as s; s.main()\" "
745 "%s %d") % (self._python, self._host, self._listen_sock.port)
747 for (num, slave)
in enumerate(self._all_slaves):
748 if slave._ready_to_start():
749 unique_id = self._get_unique_id(num)
750 self._starting_slaves[unique_id] = slave
751 slave._start(command, unique_id, self._output % num)
753 for array
in self._slave_arrays:
754 array._start(command)
755 self._slave_arrays = []
757 def _get_unique_id(self, num):
759 for i
in range(0, 8):
760 id += chr(random.randint(0, 25) + ord(
'A'))
763 def _send_tasks_to_slaves(self, context):
764 self._start_all_slaves()
766 available_slaves = [a
for a
in self._all_slaves
767 if a._ready_for_task(context)] + \
768 [a
for a
in self._all_slaves
769 if a._ready_for_task(
None)]
770 for slave
in available_slaves:
771 if len(context._tasks) == 0:
774 self._send_task_to_slave(slave, context)
776 def _send_task_to_slave(self, slave, context):
777 if len(context._tasks) == 0:
779 t = context._tasks[0]
781 slave._start_task(t, context)
782 context._tasks.pop(0)
783 except socket.error
as detail:
786 def _get_finished_tasks(self, context):
788 events = self._get_network_events(context)
790 self._kill_all_running_slaves(context)
792 task = self._process_event(event, context)
796 def _process_event(self, event, context):
797 if event == self._listen_sock:
799 (conn, addr) = self._listen_sock.accept()
800 new_slave = self._accept_slave(conn, context)
801 elif event._running_task(context):
803 task = event._get_finished_task()
805 self._send_task_to_slave(event, context)
808 self._kill_timed_out_slaves(context)
809 except NetworkError
as detail:
811 print(
"Slave %s failed (%s): rescheduling task %s" \
812 % (str(event), str(detail), str(task)))
813 context._tasks.append(task)
814 self._send_tasks_to_slaves(context)
818 def _kill_timed_out_slaves(self, context):
819 timed_out = [a
for a
in self._all_slaves
if a._running_task(context) \
820 and a.get_contact_timed_out(self.heartbeat_timeout)]
821 for slave
in timed_out:
823 print(
"Did not hear from slave %s in %d seconds; rescheduling "
824 "task %s" % (str(slave), self.heartbeat_timeout, str(task)))
825 context._tasks.append(task)
826 if len(timed_out) > 0:
827 self._send_tasks_to_slaves(context)
829 def _kill_all_running_slaves(self, context):
830 running = [a
for a
in self._all_slaves
if a._running_task(context)]
831 for slave
in running:
833 context._tasks.append(task)
834 raise NetworkError(
"Did not hear from any running slave in "
835 "%d seconds" % self.heartbeat_timeout)
837 def _accept_slave(self, sock, context):
838 sock.setblocking(
True)
839 identifier = sock.recv(1024)
841 identifier = identifier.decode(
'ascii')
842 if identifier
and identifier
in self._starting_slaves:
843 slave = self._starting_slaves.pop(identifier)
844 slave._accept_connection(sock)
845 print(
"Identified slave %s " % str(slave))
846 self._init_slave(slave)
847 self._send_task_to_slave(slave, context)
850 print(
"Ignoring request from unknown slave")
852 def _init_slave(self, slave):
853 if _import_time_path[0] !=
'':
854 slave._set_python_search_path(_import_time_path[0])
855 if sys.path[0] !=
'' and sys.path[0] != _import_time_path[0]:
856 slave._set_python_search_path(sys.path[0])
858 def _get_network_events(self, context):
859 running = [a
for a
in self._all_slaves
if a._running_task(context)]
860 if len(running) == 0:
861 if len(context._tasks) == 0:
862 raise _NoMoreTasksError()
863 elif len(self._starting_slaves) == 0:
867 return util._poll_events(self._listen_sock, running,
868 self.heartbeat_timeout)
872 def get_module_version():
873 """get_module_version() -> std::string const"""
874 return _IMP_parallel.get_module_version()
877 """get_example_path(std::string fname) -> std::string"""
878 return _IMP_parallel.get_example_path(fname)
881 """get_data_path(std::string fname) -> std::string"""
882 return _IMP_parallel.get_data_path(fname)
884 from .
import _version_check
885 _version_check.check_version(get_module_version())
886 __version__ = get_module_version()
Representation of an array of slaves.
std::string get_data_path(std::string file_name)
Return the full path to one of this module's data files.
Error raised if a slave has an unhandled exception.
An array of slaves in a Sun Grid Engine system parallel environment.
A collection of tasks that run in the same environment.
Representation of a single slave.
std::string get_example_path(std::string file_name)
Return the full path to one of this module's example files.
def add_task
Add a task to this context.
Utilities for the IMP.parallel module.
Base class for all errors specific to the parallel module.
The general base class for IMP exceptions.
def add_slave
Add a Slave object.
def get_context
Create and return a new Context in which tasks can be run.
Error raised if a problem occurs with the network.
Error raised if all slaves failed, so tasks cannot be run.
Distribute IMP tasks to multiple processors or machines.
An array of slaves on a Sun Grid Engine system, started with 'qsub'.
def get_results_unordered
Run all of the tasks on available slaves, and return results.
Manages slaves and contexts.
A slave running on the same machine as the master.