10 from __future__
import print_function, division, absolute_import
15 from sys
import version_info
as _swig_python_version_info
16 if _swig_python_version_info >= (2, 7, 0):
17 def swig_import_helper():
19 pkg = __name__.rpartition(
'.')[0]
20 mname =
'.'.join((pkg,
'_IMP_parallel')).lstrip(
'.')
22 return importlib.import_module(mname)
24 return importlib.import_module(
'_IMP_parallel')
25 _IMP_parallel = swig_import_helper()
26 del swig_import_helper
27 elif _swig_python_version_info >= (2, 6, 0):
28 def swig_import_helper():
29 from os.path
import dirname
33 fp, pathname, description = imp.find_module(
'_IMP_parallel', [dirname(__file__)])
39 _mod = imp.load_module(
'_IMP_parallel', fp, pathname, description)
43 _IMP_parallel = swig_import_helper()
44 del swig_import_helper
47 del _swig_python_version_info
49 _swig_property = property
54 import builtins
as __builtin__
58 def _swig_setattr_nondynamic(self, class_type, name, value, static=1):
59 if (name ==
"thisown"):
60 return self.this.own(value)
62 if type(value).__name__ ==
'SwigPyObject':
63 self.__dict__[name] = value
65 method = class_type.__swig_setmethods__.get(name,
None)
67 return method(self, value)
69 object.__setattr__(self, name, value)
71 raise AttributeError(
"You cannot add attributes to %s" % self)
74 def _swig_setattr(self, class_type, name, value):
75 return _swig_setattr_nondynamic(self, class_type, name, value, 0)
78 def _swig_getattr(self, class_type, name):
79 if (name ==
"thisown"):
80 return self.this.own()
81 method = class_type.__swig_getmethods__.get(name,
None)
84 raise AttributeError(
"'%s' object has no attribute '%s'" % (class_type.__name__, name))
89 strthis =
"proxy of " + self.this.__repr__()
90 except __builtin__.Exception:
92 return "<%s.%s; %s >" % (self.__class__.__module__, self.__class__.__name__, strthis,)
95 def _swig_setattr_nondynamic_method(set):
96 def set_attr(self, name, value):
97 if (name ==
"thisown"):
98 return self.this.own(value)
99 if hasattr(self, name)
or (name ==
"this"):
100 set(self, name, value)
102 raise AttributeError(
"You cannot add attributes to %s" % self)
108 weakref_proxy = weakref.proxy
109 except __builtin__.Exception:
110 weakref_proxy =
lambda x: x
113 class IMP_PARALLEL_SwigPyIterator(object):
114 """Proxy of C++ swig::IMP_PARALLEL_SwigPyIterator class."""
116 thisown = _swig_property(
lambda x: x.this.own(),
lambda x, v: x.this.own(v), doc=
'The membership flag')
118 def __init__(self, *args, **kwargs):
119 raise AttributeError(
"No constructor defined - class is abstract")
120 __repr__ = _swig_repr
121 __swig_destroy__ = _IMP_parallel.delete_IMP_PARALLEL_SwigPyIterator
122 __del__ =
lambda self:
None
125 """value(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
126 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_value(self)
131 incr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
132 incr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
134 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_incr(self, n)
139 decr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
140 decr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
142 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_decr(self, n)
145 def distance(self, x):
146 """distance(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t"""
147 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_distance(self, x)
151 """equal(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
152 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_equal(self, x)
156 """copy(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator"""
157 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_copy(self)
161 """next(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
162 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_next(self)
166 """__next__(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
167 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___next__(self)
171 """previous(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
172 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_previous(self)
175 def advance(self, n):
176 """advance(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
177 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_advance(self, n)
181 """__eq__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
182 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___eq__(self, x)
186 """__ne__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
187 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___ne__(self, x)
190 def __iadd__(self, n):
191 """__iadd__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
192 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___iadd__(self, n)
195 def __isub__(self, n):
196 """__isub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
197 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___isub__(self, n)
200 def __add__(self, n):
201 """__add__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
202 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___add__(self, n)
205 def __sub__(self, *args):
207 __sub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator
208 __sub__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t
210 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___sub__(self, *args)
214 IMP_PARALLEL_SwigPyIterator_swigregister = _IMP_parallel.IMP_PARALLEL_SwigPyIterator_swigregister
215 IMP_PARALLEL_SwigPyIterator_swigregister(IMP_PARALLEL_SwigPyIterator)
223 IMP_DEBUG = _IMP_parallel.IMP_DEBUG
224 IMP_RELEASE = _IMP_parallel.IMP_RELEASE
225 IMP_SILENT = _IMP_parallel.IMP_SILENT
226 IMP_PROGRESS = _IMP_parallel.IMP_PROGRESS
227 IMP_TERSE = _IMP_parallel.IMP_TERSE
228 IMP_VERBOSE = _IMP_parallel.IMP_VERBOSE
229 IMP_MEMORY = _IMP_parallel.IMP_MEMORY
230 IMP_NONE = _IMP_parallel.IMP_NONE
231 IMP_USAGE = _IMP_parallel.IMP_USAGE
232 IMP_INTERNAL = _IMP_parallel.IMP_INTERNAL
233 IMP_KERNEL_HAS_LOG4CXX = _IMP_parallel.IMP_KERNEL_HAS_LOG4CXX
234 IMP_COMPILER_HAS_AUTO = _IMP_parallel.IMP_COMPILER_HAS_AUTO
235 IMP_COMPILER_HAS_DEBUG_VECTOR = _IMP_parallel.IMP_COMPILER_HAS_DEBUG_VECTOR
236 IMP_COMPILER_HAS_UNIQUE_PTR = _IMP_parallel.IMP_COMPILER_HAS_UNIQUE_PTR
237 IMP_KERNEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_KERNEL_HAS_BOOST_RANDOM
238 IMP_KERNEL_HAS_NUMPY = _IMP_parallel.IMP_KERNEL_HAS_NUMPY
239 IMP_KERNEL_HAS_GPERFTOOLS = _IMP_parallel.IMP_KERNEL_HAS_GPERFTOOLS
240 IMP_KERNEL_HAS_TCMALLOC_HEAPCHECKER = _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPCHECKER
241 IMP_KERNEL_HAS_TCMALLOC_HEAPPROFILER = _IMP_parallel.IMP_KERNEL_HAS_TCMALLOC_HEAPPROFILER
242 IMPKERNEL_SHOW_WARNINGS = _IMP_parallel.IMPKERNEL_SHOW_WARNINGS
245 class _DirectorObjects(object):
246 """@internal Simple class to keep references to director objects
247 to prevent premature deletion."""
250 def register(self, obj):
251 """Take a reference to a director object; will only work for
252 refcounted C++ classes"""
253 if hasattr(obj,
'get_ref_count'):
254 self._objects.append(obj)
256 """Only drop our reference and allow cleanup by Python if no other
257 Python references exist (we hold 3 references: one in self._objects,
258 one in x, and one in the argument list for getrefcount) *and* no
259 other C++ references exist (the Python object always holds one)"""
260 objs = [x
for x
in self._objects
if sys.getrefcount(x) > 3 \
261 or x.get_ref_count() > 1]
265 def get_object_count(self):
266 """Get number of director objects (useful for testing only)"""
267 return len(self._objects)
268 _director_objects = _DirectorObjects()
270 class _ostream(object):
271 """Proxy of C++ std::ostream class."""
273 thisown = _swig_property(
lambda x: x.this.own(),
lambda x, v: x.this.own(v), doc=
'The membership flag')
275 def __init__(self, *args, **kwargs):
276 raise AttributeError(
"No constructor defined")
277 __repr__ = _swig_repr
279 def write(self, osa_buf):
280 """write(_ostream self, char const * osa_buf)"""
281 return _IMP_parallel._ostream_write(self, osa_buf)
283 _ostream_swigregister = _IMP_parallel._ostream_swigregister
284 _ostream_swigregister(_ostream)
286 IMP_COMPILER_HAS_OVERRIDE = _IMP_parallel.IMP_COMPILER_HAS_OVERRIDE
287 IMP_COMPILER_HAS_FINAL = _IMP_parallel.IMP_COMPILER_HAS_FINAL
288 IMP_HAS_NOEXCEPT = _IMP_parallel.IMP_HAS_NOEXCEPT
289 IMP_C_OPEN_BINARY = _IMP_parallel.IMP_C_OPEN_BINARY
291 IMP_PARALLEL_HAS_BOOST_FILESYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_FILESYSTEM
292 IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS
293 IMP_PARALLEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_RANDOM
294 IMP_PARALLEL_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_SYSTEM
295 IMP_PARALLEL_HAS_NUMPY = _IMP_parallel.IMP_PARALLEL_HAS_NUMPY
296 IMPPARALLEL_SHOW_WARNINGS = _IMP_parallel.IMPPARALLEL_SHOW_WARNINGS
308 import cPickle
as pickle
319 _import_time_path = sys.path[:]
322 """Base class for all errors specific to the parallel module"""
326 class _NoMoreTasksError(Error):
331 """Error raised if all slaves failed, so tasks cannot be run"""
336 """Error raised if a problem occurs with the network"""
341 """Error raised if a slave has an unhandled exception"""
342 def __init__(self, exc, traceback, slave):
344 self.traceback = traceback
348 errstr = str(self.exc.__class__).replace(
"exceptions.",
"")
349 return "%s: %s from %s\nRemote traceback:\n%s" \
350 % (errstr, str(self.exc), str(self.slave), self.traceback)
352 class _Communicator(object):
353 """Simple support for sending Python pickled objects over the network"""
359 def _send(self, obj):
362 p.pack_string(pickle.dumps(obj, -1))
366 p.pack_string(pickle.dumps(obj, 0))
367 self._socket.sendall(p.get_buffer())
369 def get_data_pending(self):
370 return len(self._ibuffer) > 0
375 obj, self._ibuffer = self._unpickle(self._ibuffer)
376 if isinstance(obj, _ErrorWrapper):
380 except (IndexError, EOFError):
382 data = self._socket.recv(4096)
383 except socket.error
as detail:
385 % (str(self), str(detail)))
387 self._ibuffer += data
391 def _unpickle(self, ibuffer):
392 p = xdrlib.Unpacker(ibuffer)
393 obj = p.unpack_string()
394 return (pickle.loads(obj), ibuffer[p.get_position():])
398 """Representation of a single slave.
399 Each slave uses a single thread of execution (i.e. a single CPU core)
400 to run tasks sequentially.
401 Slave is an abstract class; instead of using this class directly, use
402 a subclass such as LocalSlave or SGEQsubSlaveArray."""
405 _Communicator.__init__(self)
406 self._state = slavestate.init
409 self.update_contact_time()
411 def _start(self, command, unique_id, output):
412 """Start the slave running on the remote host; override in subclasses"""
413 self._state = slavestate.started
415 def _accept_connection(self, sock):
417 self._state = slavestate.connected
418 self.update_contact_time()
420 def _set_python_search_path(self, path):
421 self._send(_SetPathAction(path))
423 def update_contact_time(self):
424 self.last_contact_time = time.time()
426 def get_contact_timed_out(self, timeout):
427 return (time.time() - self.last_contact_time) > timeout
429 def _start_task(self, task, context):
430 if not self._ready_for_task(context)
and not self._ready_for_task(
None):
431 raise TypeError(
"%s not ready for task" % str(self))
432 if self._context != context:
433 self._context = context
434 self._send(_ContextWrapper(context._startup))
435 self._state = slavestate.running_task
437 self._send(_TaskWrapper(task))
439 def _get_finished_task(self):
442 self.update_contact_time()
443 if isinstance(r, _HeartBeat):
444 if not self.get_data_pending():
451 self._state = slavestate.connected
458 self._state = slavestate.dead
461 def _ready_to_start(self):
462 return self._state == slavestate.init
464 def _ready_for_task(self, context):
465 return self._state == slavestate.connected \
466 and self._context == context
468 def _running_task(self, context):
469 return self._state == slavestate.running_task \
470 and self._context == context
474 """Representation of an array of slaves.
475 This is similar to Slave, except that it represents a collection of
476 slaves that are controlled together, such as a batch submission system
477 array job on a compute cluster.
478 Slave is an abstract class; instead of using this class directly, use
479 a subclass such as SGEQsubSlaveArray."""
481 def _get_slaves(self):
482 """Return a list of Slave objects contained within this array"""
486 """Do any necessary startup after all contained Slaves have started"""
490 class LocalSlave(Slave):
491 """A slave running on the same machine as the master."""
493 def _start(self, command, unique_id, output):
494 Slave._start(self, command, unique_id, output)
495 cmdline =
"%s %s" % (command, unique_id)
496 _run_background(cmdline, output)
499 return "<LocalSlave>"
502 class _SGEQsubSlave(Slave):
503 def __init__(self, array):
508 def _start(self, command, unique_id, output):
509 Slave._start(self, command, unique_id, output)
510 self._array._slave_started(unique_id, output, self)
516 return "<SGE qsub slave, ID %s>" % jobid
520 """An array of slaves on a Sun Grid Engine system, started with 'qsub'.
521 To use this class, the master process must be running on a machine that
522 can submit Sun Grid Engine (SGE) jobs using the 'qsub' command (this
523 is termed a 'submit host' by SGE). The class starts an SGE job array
524 (every slave has the same SGE job ID, but a different task ID).
528 standard_options =
'-j y -cwd -r n -o sge-errors'
532 @param numslave The number of slaves, which corresponds to the
533 number of tasks in the SGE job.
534 @param options A string of SGE options that are passed on the 'qsub'
535 command line. This is added to standard_options.
537 self._numslave = numslave
538 self._options = options
539 self._starting_slaves = []
542 def _get_slaves(self):
543 """Return a list of Slave objects contained within this array"""
544 return [_SGEQsubSlave(self)
for x
in range(self._numslave)]
546 def _slave_started(self, command, output, slave):
547 self._starting_slaves.append((command, output, slave))
549 def _start(self, command):
550 qsub =
"qsub -S /bin/sh %s %s -t 1-%d" % \
551 (self._options, self.standard_options,
552 len(self._starting_slaves))
555 (inp, out) = (a.stdin, a.stdout)
556 slave_uid =
" ".join([repr(s[0])
for s
in self._starting_slaves])
557 slave_out =
" ".join([repr(s[1])
for s
in self._starting_slaves])
558 inp.write(
"#!/bin/sh\n")
559 inp.write(
"uid=( '' %s )\n" % slave_uid)
560 inp.write(
"out=( '' %s )\n" % slave_out)
561 inp.write(
"myuid=${uid[$SGE_TASK_ID]}\n")
562 inp.write(
"myout=${out[$SGE_TASK_ID]}\n")
563 inp.write(
"%s $myuid > $myout 2>&1\n" % command)
565 outlines = out.readlines()
567 for line
in outlines:
568 print(line.rstrip(
'\r\n'))
569 a.require_clean_exit()
570 self._set_jobid(outlines)
571 self._starting_slaves = []
573 def _set_jobid(self, outlines):
574 """Try to figure out the job ID from the SGE qsub output"""
575 if len(outlines) > 0:
576 m = re.compile(
r"\d+").search(outlines[0])
578 self._jobid = int(m.group())
579 for (num, slave)
in enumerate(self._starting_slaves):
580 slave[2]._jobid =
"%d.%d" % (self._jobid, num+1)
583 class _SGEPESlave(
Slave):
588 def _start(self, command, unique_id, output):
589 Slave._start(self, command, unique_id, output)
590 cmdline =
"qrsh -inherit -V %s %s %s" % (self._host, command, unique_id)
591 _run_background(cmdline, output)
594 return "<SGE PE slave on %s>" % self._host
598 """An array of slaves in a Sun Grid Engine system parallel environment.
599 In order to use this class, the master must be run via Sun Grid Engine's
600 'qsub' command and submitted to a parallel environment using the qsub
601 -pe option. This class will start slaves on every node in the parallel
602 environment (including the node running the master). Each slave is
603 started using the 'qrsh' command with the '-inherit' option."""
605 def _get_slaves(self):
608 pe = os.environ[
'PE_HOSTFILE']
614 (node, num, queue) = line.split(
None, 2)
615 for i
in range(int(num)):
616 slaves.append(_SGEPESlave(node))
626 """A collection of tasks that run in the same environment.
627 Context objects are typically created by calling Manager::get_context().
631 self._manager = manager
632 self._startup = startup
636 """Add a task to this context.
637 Tasks are any Python callable object that can be pickled (e.g. a
638 function or a class that implements the \_\_call\_\_ method). When
639 the task is run on the slave its arguments are the return value
640 from this context's startup function."""
641 self._tasks.append(task)
644 """Run all of the tasks on available slaves, and return results.
645 If there are more tasks than slaves, subsequent tasks are
646 started only once a running task completes: each slave only runs
647 a single task at a time. As each task completes, the return value(s)
648 from the task callable are returned from this method, as a
649 Python generator. Note that the results are returned in the order
650 that tasks complete, which may not be the same as the order they
653 @exception NoMoreSlavesError there are no slaves available
654 to run the tasks (or they all failed during execution).
655 @exception RemoteError a slave encountered an unhandled exception.
656 @exception NetworkError the master lost (or was unable to
657 establish) communication with any slave.
659 return self._manager._get_results_unordered(self)
663 """Manages slaves and contexts.
666 connect_timeout = 7200
669 heartbeat_timeout = 7200
671 def __init__(self, python=None, host=None, output='slave%d.output'):
673 @param python If not None, the command to run to start a Python
674 interpreter that can import the IMP module. Otherwise,
675 the same interpreter that the master is currently using
676 is used. This is passed to the shell, so a full command
677 line (including multiple words separated by spaces) can
678 be used if necessary.
679 @param host The hostname that slaves use to connect back to the
680 master. If not specified, the master machine's primary
681 IP address is used. On multi-homed machines, such as
682 compute cluster headnodes, this may need to be changed
683 to allow all slaves to reach the master (typically the
684 name of the machine's internal network address is
685 needed). If only running local slaves, 'localhost' can
686 be used to prohibit connections across the network.
687 @param output A format string used to name slave output files. It is
688 given the numeric slave id, so for example the default
689 value 'slave\%d.output' will yield output files called
690 slave0.output, slave1.output, etc.
693 self._python = sys.executable
695 self._python = python
697 self._output = output
698 self._all_slaves = []
699 self._starting_slaves = {}
700 self._slave_arrays = []
705 self._host = socket.gethostbyname_ex(socket.gethostname())[-1][0]
706 self._listen_sock = _ListenSocket(self._host, self.connect_timeout)
709 """Add a Slave object."""
710 if hasattr(slave,
'_get_slaves'):
711 self._slave_arrays.append(slave)
713 self._all_slaves.append(slave)
716 """Create and return a new Context in which tasks can be run.
717 @param startup If not None, a callable (Python function or class
718 that implements the \_\_call\_\_ method) that sets up
719 the slave to run tasks. This method is only called
720 once per slave. The return values from this method
721 are passed to the task object when it runs on
723 @return A new Context object.
727 def _get_results_unordered(self, context):
728 """Run all of a context's tasks, and yield results"""
729 self._send_tasks_to_slaves(context)
732 for task
in self._get_finished_tasks(context):
733 tasks_queued = len(context._tasks)
737 if len(context._tasks) > tasks_queued:
738 self._send_tasks_to_slaves(context)
739 except _NoMoreTasksError:
742 def _start_all_slaves(self):
743 for array
in self._slave_arrays:
744 self._all_slaves.extend(array._get_slaves())
746 command = (
"%s -c \"import IMP.parallel.slave_handler as s; s.main()\" "
747 "%s %d") % (self._python, self._host, self._listen_sock.port)
749 for (num, slave)
in enumerate(self._all_slaves):
750 if slave._ready_to_start():
751 unique_id = self._get_unique_id(num)
752 self._starting_slaves[unique_id] = slave
753 slave._start(command, unique_id, self._output % num)
755 for array
in self._slave_arrays:
756 array._start(command)
757 self._slave_arrays = []
759 def _get_unique_id(self, num):
761 for i
in range(0, 8):
762 id += chr(random.randint(0, 25) + ord(
'A'))
765 def _send_tasks_to_slaves(self, context):
766 self._start_all_slaves()
768 available_slaves = [a
for a
in self._all_slaves
769 if a._ready_for_task(context)] + \
770 [a
for a
in self._all_slaves
771 if a._ready_for_task(
None)]
772 for slave
in available_slaves:
773 if len(context._tasks) == 0:
776 self._send_task_to_slave(slave, context)
778 def _send_task_to_slave(self, slave, context):
779 if len(context._tasks) == 0:
781 t = context._tasks[0]
783 slave._start_task(t, context)
784 context._tasks.pop(0)
785 except socket.error
as detail:
788 def _get_finished_tasks(self, context):
790 events = self._get_network_events(context)
792 self._kill_all_running_slaves(context)
794 task = self._process_event(event, context)
798 def _process_event(self, event, context):
799 if event == self._listen_sock:
801 (conn, addr) = self._listen_sock.accept()
802 new_slave = self._accept_slave(conn, context)
803 elif event._running_task(context):
805 task = event._get_finished_task()
807 self._send_task_to_slave(event, context)
810 self._kill_timed_out_slaves(context)
811 except NetworkError
as detail:
813 print(
"Slave %s failed (%s): rescheduling task %s" \
814 % (str(event), str(detail), str(task)))
815 context._tasks.append(task)
816 self._send_tasks_to_slaves(context)
820 def _kill_timed_out_slaves(self, context):
821 timed_out = [a
for a
in self._all_slaves
if a._running_task(context) \
822 and a.get_contact_timed_out(self.heartbeat_timeout)]
823 for slave
in timed_out:
825 print(
"Did not hear from slave %s in %d seconds; rescheduling "
826 "task %s" % (str(slave), self.heartbeat_timeout, str(task)))
827 context._tasks.append(task)
828 if len(timed_out) > 0:
829 self._send_tasks_to_slaves(context)
831 def _kill_all_running_slaves(self, context):
832 running = [a
for a
in self._all_slaves
if a._running_task(context)]
833 for slave
in running:
835 context._tasks.append(task)
836 raise NetworkError(
"Did not hear from any running slave in "
837 "%d seconds" % self.heartbeat_timeout)
839 def _accept_slave(self, sock, context):
840 sock.setblocking(
True)
841 identifier = sock.recv(1024)
843 identifier = identifier.decode(
'ascii')
844 if identifier
and identifier
in self._starting_slaves:
845 slave = self._starting_slaves.pop(identifier)
846 slave._accept_connection(sock)
847 print(
"Identified slave %s " % str(slave))
848 self._init_slave(slave)
849 self._send_task_to_slave(slave, context)
852 print(
"Ignoring request from unknown slave")
854 def _init_slave(self, slave):
855 if _import_time_path[0] !=
'':
856 slave._set_python_search_path(_import_time_path[0])
857 if sys.path[0] !=
'' and sys.path[0] != _import_time_path[0]:
858 slave._set_python_search_path(sys.path[0])
860 def _get_network_events(self, context):
861 running = [a
for a
in self._all_slaves
if a._running_task(context)]
862 if len(running) == 0:
863 if len(context._tasks) == 0:
864 raise _NoMoreTasksError()
865 elif len(self._starting_slaves) == 0:
869 return util._poll_events(self._listen_sock, running,
870 self.heartbeat_timeout)
874 def get_module_version():
875 """get_module_version() -> std::string const"""
876 return _IMP_parallel.get_module_version()
879 """get_example_path(std::string fname) -> std::string"""
880 return _IMP_parallel.get_example_path(fname)
883 """get_data_path(std::string fname) -> std::string"""
884 return _IMP_parallel.get_data_path(fname)
886 from .
import _version_check
887 _version_check.check_version(get_module_version())
888 __version__ = get_module_version()
Representation of an array of slaves.
std::string get_data_path(std::string file_name)
Return the full path to one of this module's data files.
Error raised if a slave has an unhandled exception.
An array of slaves in a Sun Grid Engine system parallel environment.
A collection of tasks that run in the same environment.
Representation of a single slave.
std::string get_example_path(std::string file_name)
Return the full path to one of this module's example files.
def add_task
Add a task to this context.
Utilities for the IMP.parallel module.
Base class for all errors specific to the parallel module.
The general base class for IMP exceptions.
def add_slave
Add a Slave object.
def get_context
Create and return a new Context in which tasks can be run.
Error raised if a problem occurs with the network.
Error raised if all slaves failed, so tasks cannot be run.
Distribute IMP tasks to multiple processors or machines.
An array of slaves on a Sun Grid Engine system, started with 'qsub'.
def get_results_unordered
Run all of the tasks on available slaves, and return results.
Manages slaves and contexts.
A slave running on the same machine as the master.