10 from __future__
import print_function, division, absolute_import
15 from sys
import version_info
as _swig_python_version_info
16 if _swig_python_version_info >= (2, 7, 0):
17 def swig_import_helper():
19 pkg = __name__.rpartition(
'.')[0]
20 mname =
'.'.join((pkg,
'_IMP_parallel')).lstrip(
'.')
22 return importlib.import_module(mname)
24 return importlib.import_module(
'_IMP_parallel')
25 _IMP_parallel = swig_import_helper()
26 del swig_import_helper
27 elif _swig_python_version_info >= (2, 6, 0):
28 def swig_import_helper():
29 from os.path
import dirname
33 fp, pathname, description = imp.find_module(
'_IMP_parallel', [dirname(__file__)])
39 _mod = imp.load_module(
'_IMP_parallel', fp, pathname, description)
43 _IMP_parallel = swig_import_helper()
44 del swig_import_helper
47 del _swig_python_version_info
49 _swig_property = property
54 import builtins
as __builtin__
58 def _swig_setattr_nondynamic(self, class_type, name, value, static=1):
59 if (name ==
"thisown"):
60 return self.this.own(value)
62 if type(value).__name__ ==
'SwigPyObject':
63 self.__dict__[name] = value
65 method = class_type.__swig_setmethods__.get(name,
None)
67 return method(self, value)
69 object.__setattr__(self, name, value)
71 raise AttributeError(
"You cannot add attributes to %s" % self)
74 def _swig_setattr(self, class_type, name, value):
75 return _swig_setattr_nondynamic(self, class_type, name, value, 0)
78 def _swig_getattr(self, class_type, name):
79 if (name ==
"thisown"):
80 return self.this.own()
81 method = class_type.__swig_getmethods__.get(name,
None)
84 raise AttributeError(
"'%s' object has no attribute '%s'" % (class_type.__name__, name))
89 strthis =
"proxy of " + self.this.__repr__()
90 except __builtin__.Exception:
92 return "<%s.%s; %s >" % (self.__class__.__module__, self.__class__.__name__, strthis,)
95 def _swig_setattr_nondynamic_method(set):
96 def set_attr(self, name, value):
97 if (name ==
"thisown"):
98 return self.this.own(value)
99 if hasattr(self, name)
or (name ==
"this"):
100 set(self, name, value)
102 raise AttributeError(
"You cannot add attributes to %s" % self)
108 weakref_proxy = weakref.proxy
109 except __builtin__.Exception:
110 weakref_proxy =
lambda x: x
113 class IMP_PARALLEL_SwigPyIterator(object):
114 """Proxy of C++ swig::IMP_PARALLEL_SwigPyIterator class."""
116 thisown = _swig_property(
lambda x: x.this.own(),
lambda x, v: x.this.own(v), doc=
'The membership flag')
118 def __init__(self, *args, **kwargs):
119 raise AttributeError(
"No constructor defined - class is abstract")
120 __repr__ = _swig_repr
121 __swig_destroy__ = _IMP_parallel.delete_IMP_PARALLEL_SwigPyIterator
122 __del__ =
lambda self:
None
125 """value(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
126 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_value(self)
131 incr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
132 incr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
134 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_incr(self, n)
139 decr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
140 decr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
142 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_decr(self, n)
145 def distance(self, x):
146 """distance(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t"""
147 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_distance(self, x)
151 """equal(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
152 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_equal(self, x)
156 """copy(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator"""
157 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_copy(self)
161 """next(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
162 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_next(self)
166 """__next__(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
167 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___next__(self)
171 """previous(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
172 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_previous(self)
175 def advance(self, n):
176 """advance(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
177 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_advance(self, n)
181 """__eq__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
182 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___eq__(self, x)
186 """__ne__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
187 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___ne__(self, x)
190 def __iadd__(self, n):
191 """__iadd__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
192 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___iadd__(self, n)
195 def __isub__(self, n):
196 """__isub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
197 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___isub__(self, n)
200 def __add__(self, n):
201 """__add__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
202 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___add__(self, n)
205 def __sub__(self, *args):
207 __sub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator
208 __sub__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t
210 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___sub__(self, *args)
214 IMP_PARALLEL_SwigPyIterator_swigregister = _IMP_parallel.IMP_PARALLEL_SwigPyIterator_swigregister
215 IMP_PARALLEL_SwigPyIterator_swigregister(IMP_PARALLEL_SwigPyIterator)
223 IMP_DEBUG = _IMP_parallel.IMP_DEBUG
224 IMP_RELEASE = _IMP_parallel.IMP_RELEASE
225 IMP_SILENT = _IMP_parallel.IMP_SILENT
226 IMP_PROGRESS = _IMP_parallel.IMP_PROGRESS
227 IMP_TERSE = _IMP_parallel.IMP_TERSE
228 IMP_VERBOSE = _IMP_parallel.IMP_VERBOSE
229 IMP_MEMORY = _IMP_parallel.IMP_MEMORY
230 IMP_NONE = _IMP_parallel.IMP_NONE
231 IMP_USAGE = _IMP_parallel.IMP_USAGE
232 IMP_INTERNAL = _IMP_parallel.IMP_INTERNAL
233 IMP_KERNEL_HAS_LOG4CXX = _IMP_parallel.IMP_KERNEL_HAS_LOG4CXX
234 IMP_COMPILER_HAS_AUTO = _IMP_parallel.IMP_COMPILER_HAS_AUTO
235 IMP_COMPILER_HAS_DEBUG_VECTOR = _IMP_parallel.IMP_COMPILER_HAS_DEBUG_VECTOR
236 IMP_COMPILER_HAS_UNIQUE_PTR = _IMP_parallel.IMP_COMPILER_HAS_UNIQUE_PTR
237 IMP_KERNEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_KERNEL_HAS_BOOST_RANDOM
238 IMP_KERNEL_HAS_GPERFTOOLS = _IMP_parallel.IMP_KERNEL_HAS_GPERFTOOLS
239 IMP_KERNEL_HAS_TCMALLOC_HEAPCHECKER = _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPCHECKER
240 IMP_KERNEL_HAS_TCMALLOC_HEAPPROFILER = _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPPROFILER
241 IMPKERNEL_SHOW_WARNINGS = _IMP_parallel.IMPKERNEL_SHOW_WARNINGS
244 class _DirectorObjects(object):
245 """@internal Simple class to keep references to director objects
246 to prevent premature deletion."""
249 def register(self, obj):
250 """Take a reference to a director object; will only work for
251 refcounted C++ classes"""
252 if hasattr(obj,
'get_ref_count'):
253 self._objects.append(obj)
255 """Only drop our reference and allow cleanup by Python if no other
256 Python references exist (we hold 3 references: one in self._objects,
257 one in x, and one in the argument list for getrefcount) *and* no
258 other C++ references exist (the Python object always holds one)"""
259 objs = [x
for x
in self._objects
if sys.getrefcount(x) > 3 \
260 or x.get_ref_count() > 1]
264 def get_object_count(self):
265 """Get number of director objects (useful for testing only)"""
266 return len(self._objects)
267 _director_objects = _DirectorObjects()
269 class _ostream(object):
270 """Proxy of C++ std::ostream class."""
272 thisown = _swig_property(
lambda x: x.this.own(),
lambda x, v: x.this.own(v), doc=
'The membership flag')
274 def __init__(self, *args, **kwargs):
275 raise AttributeError(
"No constructor defined")
276 __repr__ = _swig_repr
278 def write(self, osa_buf):
279 """write(_ostream self, char const * osa_buf)"""
280 return _IMP_parallel._ostream_write(self, osa_buf)
282 _ostream_swigregister = _IMP_parallel._ostream_swigregister
283 _ostream_swigregister(_ostream)
285 IMP_COMPILER_HAS_OVERRIDE = _IMP_parallel.IMP_COMPILER_HAS_OVERRIDE
286 IMP_COMPILER_HAS_FINAL = _IMP_parallel.IMP_COMPILER_HAS_FINAL
287 IMP_HAS_NOEXCEPT = _IMP_parallel.IMP_HAS_NOEXCEPT
289 IMP_PARALLEL_HAS_BOOST_FILESYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_FILESYSTEM
290 IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS
291 IMP_PARALLEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_RANDOM
292 IMP_PARALLEL_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_SYSTEM
293 IMPPARALLEL_SHOW_WARNINGS = _IMP_parallel.IMPPARALLEL_SHOW_WARNINGS
305 import cPickle
as pickle
316 _import_time_path = sys.path[:]
319 """Base class for all errors specific to the parallel module"""
323 class _NoMoreTasksError(Error):
328 """Error raised if all slaves failed, so tasks cannot be run"""
333 """Error raised if a problem occurs with the network"""
338 """Error raised if a slave has an unhandled exception"""
339 def __init__(self, exc, traceback, slave):
341 self.traceback = traceback
345 errstr = str(self.exc.__class__).replace(
"exceptions.",
"")
346 return "%s: %s from %s\nRemote traceback:\n%s" \
347 % (errstr, str(self.exc), str(self.slave), self.traceback)
349 class _Communicator(object):
350 """Simple support for sending Python pickled objects over the network"""
356 def _send(self, obj):
359 p.pack_string(pickle.dumps(obj, -1))
363 p.pack_string(pickle.dumps(obj, 0))
364 self._socket.sendall(p.get_buffer())
366 def get_data_pending(self):
367 return len(self._ibuffer) > 0
372 obj, self._ibuffer = self._unpickle(self._ibuffer)
373 if isinstance(obj, _ErrorWrapper):
377 except (IndexError, EOFError):
379 data = self._socket.recv(4096)
380 except socket.error
as detail:
382 % (str(self), str(detail)))
384 self._ibuffer += data
388 def _unpickle(self, ibuffer):
389 p = xdrlib.Unpacker(ibuffer)
390 obj = p.unpack_string()
391 return (pickle.loads(obj), ibuffer[p.get_position():])
395 """Representation of a single slave.
396 Each slave uses a single thread of execution (i.e. a single CPU core)
397 to run tasks sequentially.
398 Slave is an abstract class; instead of using this class directly, use
399 a subclass such as LocalSlave or SGEQsubSlaveArray."""
402 _Communicator.__init__(self)
403 self._state = slavestate.init
406 self.update_contact_time()
408 def _start(self, command, unique_id, output):
409 """Start the slave running on the remote host; override in subclasses"""
410 self._state = slavestate.started
412 def _accept_connection(self, sock):
414 self._state = slavestate.connected
415 self.update_contact_time()
417 def _set_python_search_path(self, path):
418 self._send(_SetPathAction(path))
420 def update_contact_time(self):
421 self.last_contact_time = time.time()
423 def get_contact_timed_out(self, timeout):
424 return (time.time() - self.last_contact_time) > timeout
426 def _start_task(self, task, context):
427 if not self._ready_for_task(context)
and not self._ready_for_task(
None):
428 raise TypeError(
"%s not ready for task" % str(self))
429 if self._context != context:
430 self._context = context
431 self._send(_ContextWrapper(context._startup))
432 self._state = slavestate.running_task
434 self._send(_TaskWrapper(task))
436 def _get_finished_task(self):
439 self.update_contact_time()
440 if isinstance(r, _HeartBeat):
441 if not self.get_data_pending():
448 self._state = slavestate.connected
455 self._state = slavestate.dead
458 def _ready_to_start(self):
459 return self._state == slavestate.init
461 def _ready_for_task(self, context):
462 return self._state == slavestate.connected \
463 and self._context == context
465 def _running_task(self, context):
466 return self._state == slavestate.running_task \
467 and self._context == context
471 """Representation of an array of slaves.
472 This is similar to Slave, except that it represents a collection of
473 slaves that are controlled together, such as a batch submission system
474 array job on a compute cluster.
475 Slave is an abstract class; instead of using this class directly, use
476 a subclass such as SGEQsubSlaveArray."""
478 def _get_slaves(self):
479 """Return a list of Slave objects contained within this array"""
483 """Do any necessary startup after all contained Slaves have started"""
487 class LocalSlave(Slave):
488 """A slave running on the same machine as the master."""
490 def _start(self, command, unique_id, output):
491 Slave._start(self, command, unique_id, output)
492 cmdline =
"%s %s" % (command, unique_id)
493 _run_background(cmdline, output)
496 return "<LocalSlave>"
499 class _SGEQsubSlave(Slave):
500 def __init__(self, array):
505 def _start(self, command, unique_id, output):
506 Slave._start(self, command, unique_id, output)
507 self._array._slave_started(unique_id, output, self)
513 return "<SGE qsub slave, ID %s>" % jobid
517 """An array of slaves on a Sun Grid Engine system, started with 'qsub'.
518 To use this class, the master process must be running on a machine that
519 can submit Sun Grid Engine (SGE) jobs using the 'qsub' command (this
520 is termed a 'submit host' by SGE). The class starts an SGE job array
521 (every slave has the same SGE job ID, but a different task ID).
525 standard_options =
'-j y -cwd -r n -o sge-errors'
529 @param numslave The number of slaves, which corresponds to the
530 number of tasks in the SGE job.
531 @param options A string of SGE options that are passed on the 'qsub'
532 command line. This is added to standard_options.
534 self._numslave = numslave
535 self._options = options
536 self._starting_slaves = []
539 def _get_slaves(self):
540 """Return a list of Slave objects contained within this array"""
541 return [_SGEQsubSlave(self)
for x
in range(self._numslave)]
543 def _slave_started(self, command, output, slave):
544 self._starting_slaves.append((command, output, slave))
546 def _start(self, command):
547 qsub =
"qsub -S /bin/sh %s %s -t 1-%d" % \
548 (self._options, self.standard_options,
549 len(self._starting_slaves))
552 (inp, out) = (a.stdin, a.stdout)
553 slave_uid =
" ".join([repr(s[0])
for s
in self._starting_slaves])
554 slave_out =
" ".join([repr(s[1])
for s
in self._starting_slaves])
555 inp.write(
"#!/bin/sh\n")
556 inp.write(
"uid=( '' %s )\n" % slave_uid)
557 inp.write(
"out=( '' %s )\n" % slave_out)
558 inp.write(
"myuid=${uid[$SGE_TASK_ID]}\n")
559 inp.write(
"myout=${out[$SGE_TASK_ID]}\n")
560 inp.write(
"%s $myuid > $myout 2>&1\n" % command)
562 outlines = out.readlines()
564 for line
in outlines:
565 print(line.rstrip(
'\r\n'))
566 a.require_clean_exit()
567 self._set_jobid(outlines)
568 self._starting_slaves = []
570 def _set_jobid(self, outlines):
571 """Try to figure out the job ID from the SGE qsub output"""
572 if len(outlines) > 0:
573 m = re.compile(
r"\d+").search(outlines[0])
575 self._jobid = int(m.group())
576 for (num, slave)
in enumerate(self._starting_slaves):
577 slave[2]._jobid =
"%d.%d" % (self._jobid, num+1)
580 class _SGEPESlave(
Slave):
585 def _start(self, command, unique_id, output):
586 Slave._start(self, command, unique_id, output)
587 cmdline =
"qrsh -inherit -V %s %s %s" % (self._host, command, unique_id)
588 _run_background(cmdline, output)
591 return "<SGE PE slave on %s>" % self._host
595 """An array of slaves in a Sun Grid Engine system parallel environment.
596 In order to use this class, the master must be run via Sun Grid Engine's
597 'qsub' command and submitted to a parallel environment using the qsub
598 -pe option. This class will start slaves on every node in the parallel
599 environment (including the node running the master). Each slave is
600 started using the 'qrsh' command with the '-inherit' option."""
602 def _get_slaves(self):
605 pe = os.environ[
'PE_HOSTFILE']
611 (node, num, queue) = line.split(
None, 2)
612 for i
in range(int(num)):
613 slaves.append(_SGEPESlave(node))
623 """A collection of tasks that run in the same environment.
624 Context objects are typically created by calling Manager::get_context().
628 self._manager = manager
629 self._startup = startup
633 """Add a task to this context.
634 Tasks are any Python callable object that can be pickled (e.g. a
635 function or a class that implements the \_\_call\_\_ method). When
636 the task is run on the slave its arguments are the return value
637 from this context's startup function."""
638 self._tasks.append(task)
641 """Run all of the tasks on available slaves, and return results.
642 If there are more tasks than slaves, subsequent tasks are
643 started only once a running task completes: each slave only runs
644 a single task at a time. As each task completes, the return value(s)
645 from the task callable are returned from this method, as a
646 Python generator. Note that the results are returned in the order
647 that tasks complete, which may not be the same as the order they
650 \exception NoMoreSlavesError there are no slaves available
651 to run the tasks (or they all failed during execution).
652 \exception RemoteError a slave encountered an unhandled exception.
653 \exception NetworkError the master lost (or was unable to
654 establish) communication with any slave.
656 return self._manager._get_results_unordered(self)
660 """Manages slaves and contexts.
663 connect_timeout = 7200
666 heartbeat_timeout = 7200
668 def __init__(self, python=None, host=None, output='slave%d.output'):
670 @param python If not None, the command to run to start a Python
671 interpreter that can import the IMP module. Otherwise,
672 the same interpreter that the master is currently using
673 is used. This is passed to the shell, so a full command
674 line (including multiple words separated by spaces) can
675 be used if necessary.
676 @param host The hostname that slaves use to connect back to the
677 master. If not specified, the master machine's primary
678 IP address is used. On multi-homed machines, such as
679 compute cluster headnodes, this may need to be changed
680 to allow all slaves to reach the master (typically the
681 name of the machine's internal network address is
682 needed). If only running local slaves, 'localhost' can
683 be used to prohibit connections across the network.
684 @param output A format string used to name slave output files. It is
685 given the numeric slave id, so for example the default
686 value 'slave\%d.output' will yield output files called
687 slave0.output, slave1.output, etc.
690 self._python = sys.executable
692 self._python = python
694 self._output = output
695 self._all_slaves = []
696 self._starting_slaves = {}
697 self._slave_arrays = []
702 self._host = socket.gethostbyname_ex(socket.gethostname())[-1][0]
703 self._listen_sock = _ListenSocket(self._host, self.connect_timeout)
706 """Add a Slave object."""
707 if hasattr(slave,
'_get_slaves'):
708 self._slave_arrays.append(slave)
710 self._all_slaves.append(slave)
713 """Create and return a new Context in which tasks can be run.
714 @param startup If not None, a callable (Python function or class
715 that implements the \_\_call\_\_ method) that sets up
716 the slave to run tasks. This method is only called
717 once per slave. The return values from this method
718 are passed to the task object when it runs on
720 @return A new Context object.
724 def _get_results_unordered(self, context):
725 """Run all of a context's tasks, and yield results"""
726 self._send_tasks_to_slaves(context)
729 for task
in self._get_finished_tasks(context):
730 tasks_queued = len(context._tasks)
734 if len(context._tasks) > tasks_queued:
735 self._send_tasks_to_slaves(context)
736 except _NoMoreTasksError:
739 def _start_all_slaves(self):
740 for array
in self._slave_arrays:
741 self._all_slaves.extend(array._get_slaves())
743 command = (
"%s -c \"import IMP.parallel.slave_handler as s; s.main()\" "
744 "%s %d") % (self._python, self._host, self._listen_sock.port)
746 for (num, slave)
in enumerate(self._all_slaves):
747 if slave._ready_to_start():
748 unique_id = self._get_unique_id(num)
749 self._starting_slaves[unique_id] = slave
750 slave._start(command, unique_id, self._output % num)
752 for array
in self._slave_arrays:
753 array._start(command)
754 self._slave_arrays = []
756 def _get_unique_id(self, num):
758 for i
in range(0, 8):
759 id += chr(random.randint(0, 25) + ord(
'A'))
762 def _send_tasks_to_slaves(self, context):
763 self._start_all_slaves()
765 available_slaves = [a
for a
in self._all_slaves
766 if a._ready_for_task(context)] + \
767 [a
for a
in self._all_slaves
768 if a._ready_for_task(
None)]
769 for slave
in available_slaves:
770 if len(context._tasks) == 0:
773 self._send_task_to_slave(slave, context)
775 def _send_task_to_slave(self, slave, context):
776 if len(context._tasks) == 0:
778 t = context._tasks[0]
780 slave._start_task(t, context)
781 context._tasks.pop(0)
782 except socket.error
as detail:
785 def _get_finished_tasks(self, context):
787 events = self._get_network_events(context)
789 self._kill_all_running_slaves(context)
791 task = self._process_event(event, context)
795 def _process_event(self, event, context):
796 if event == self._listen_sock:
798 (conn, addr) = self._listen_sock.accept()
799 new_slave = self._accept_slave(conn, context)
800 elif event._running_task(context):
802 task = event._get_finished_task()
804 self._send_task_to_slave(event, context)
807 self._kill_timed_out_slaves(context)
808 except NetworkError
as detail:
810 print(
"Slave %s failed (%s): rescheduling task %s" \
811 % (str(event), str(detail), str(task)))
812 context._tasks.append(task)
813 self._send_tasks_to_slaves(context)
817 def _kill_timed_out_slaves(self, context):
818 timed_out = [a
for a
in self._all_slaves
if a._running_task(context) \
819 and a.get_contact_timed_out(self.heartbeat_timeout)]
820 for slave
in timed_out:
822 print(
"Did not hear from slave %s in %d seconds; rescheduling "
823 "task %s" % (str(slave), self.heartbeat_timeout, str(task)))
824 context._tasks.append(task)
825 if len(timed_out) > 0:
826 self._send_tasks_to_slaves(context)
828 def _kill_all_running_slaves(self, context):
829 running = [a
for a
in self._all_slaves
if a._running_task(context)]
830 for slave
in running:
832 context._tasks.append(task)
833 raise NetworkError(
"Did not hear from any running slave in "
834 "%d seconds" % self.heartbeat_timeout)
836 def _accept_slave(self, sock, context):
837 sock.setblocking(
True)
838 identifier = sock.recv(1024)
840 identifier = identifier.decode(
'ascii')
841 if identifier
and identifier
in self._starting_slaves:
842 slave = self._starting_slaves.pop(identifier)
843 slave._accept_connection(sock)
844 print(
"Identified slave %s " % str(slave))
845 self._init_slave(slave)
846 self._send_task_to_slave(slave, context)
849 print(
"Ignoring request from unknown slave")
851 def _init_slave(self, slave):
852 if _import_time_path[0] !=
'':
853 slave._set_python_search_path(_import_time_path[0])
854 if sys.path[0] !=
'' and sys.path[0] != _import_time_path[0]:
855 slave._set_python_search_path(sys.path[0])
857 def _get_network_events(self, context):
858 running = [a
for a
in self._all_slaves
if a._running_task(context)]
859 if len(running) == 0:
860 if len(context._tasks) == 0:
861 raise _NoMoreTasksError()
862 elif len(self._starting_slaves) == 0:
866 return util._poll_events(self._listen_sock, running,
867 self.heartbeat_timeout)
871 def get_module_version():
872 """get_module_version() -> std::string const"""
873 return _IMP_parallel.get_module_version()
876 """get_example_path(std::string fname) -> std::string"""
877 return _IMP_parallel.get_example_path(fname)
880 """get_data_path(std::string fname) -> std::string"""
881 return _IMP_parallel.get_data_path(fname)
883 from .
import _version_check
884 _version_check.check_version(get_module_version())
885 __version__ = get_module_version()
Representation of an array of slaves.
std::string get_data_path(std::string file_name)
Return the full path to one of this module's data files.
Error raised if a slave has an unhandled exception.
An array of slaves in a Sun Grid Engine system parallel environment.
A collection of tasks that run in the same environment.
Representation of a single slave.
std::string get_example_path(std::string file_name)
Return the full path to one of this module's example files.
def add_task
Add a task to this context.
Utilities for the IMP.parallel module.
Base class for all errors specific to the parallel module.
The general base class for IMP exceptions.
def add_slave
Add a Slave object.
def get_context
Create and return a new Context in which tasks can be run.
Error raised if a problem occurs with the network.
Error raised if all slaves failed, so tasks cannot be run.
Distribute IMP tasks to multiple processors or machines.
An array of slaves on a Sun Grid Engine system, started with 'qsub'.
def get_results_unordered
Run all of the tasks on available slaves, and return results.
Manages slaves and contexts.
A slave running on the same machine as the master.