11 from sys
import version_info
12 if version_info >= (2,6,0):
13 def swig_import_helper():
14 from os.path
import dirname
18 fp, pathname, description = imp.find_module(
'_IMP_parallel', [dirname(__file__)])
24 _mod = imp.load_module(
'_IMP_parallel', fp, pathname, description)
28 _IMP_parallel = swig_import_helper()
29 del swig_import_helper
34 _swig_property = property
37 def _swig_setattr_nondynamic(self,class_type,name,value,static=1):
38 if (name ==
"thisown"):
return self.this.own(value)
40 if type(value).__name__ ==
'SwigPyObject':
41 self.__dict__[name] = value
43 method = class_type.__swig_setmethods__.get(name,
None)
44 if method:
return method(self,value)
46 self.__dict__[name] = value
48 raise AttributeError(
"You cannot add attributes to %s" % self)
50 def _swig_setattr(self,class_type,name,value):
51 return _swig_setattr_nondynamic(self,class_type,name,value,0)
53 def _swig_getattr(self,class_type,name):
54 if (name ==
"thisown"):
return self.this.own()
55 method = class_type.__swig_getmethods__.get(name,
None)
56 if method:
return method(self)
57 raise AttributeError(name)
60 try: strthis =
"proxy of " + self.this.__repr__()
62 return "<%s.%s; %s >" % (self.__class__.__module__, self.__class__.__name__, strthis,)
67 except AttributeError:
72 def _swig_setattr_nondynamic_method(set):
73 def set_attr(self,name,value):
74 if (name ==
"thisown"):
return self.this.own(value)
75 if hasattr(self,name)
or (name ==
"this"):
78 raise AttributeError(
"You cannot add attributes to %s" % self)
84 weakref_proxy = weakref.proxy
86 weakref_proxy =
lambda x: x
89 class IMP_PARALLEL_SwigPyIterator(object):
90 """Proxy of C++ swig::IMP_PARALLEL_SwigPyIterator class"""
91 thisown = _swig_property(
lambda x: x.this.own(),
lambda x, v: x.this.own(v), doc=
'The membership flag')
92 def __init__(self, *args, **kwargs):
raise AttributeError(
"No constructor defined - class is abstract")
94 __swig_destroy__ = _IMP_parallel.delete_IMP_PARALLEL_SwigPyIterator
95 __del__ =
lambda self :
None;
97 """value(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
98 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_value(self)
102 incr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
103 incr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
105 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_incr(self, n)
109 decr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
110 decr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
112 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_decr(self, n)
114 def distance(self, *args):
115 """distance(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t"""
116 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_distance(self, *args)
118 def equal(self, *args):
119 """equal(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
120 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_equal(self, *args)
123 """copy(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator"""
124 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_copy(self)
127 """next(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
128 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_next(self)
131 """__next__(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
132 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___next__(self)
135 """previous(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
136 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_previous(self)
138 def advance(self, *args):
139 """advance(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
140 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_advance(self, *args)
142 def __eq__(self, *args):
143 """__eq__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
144 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___eq__(self, *args)
146 def __ne__(self, *args):
147 """__ne__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
148 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___ne__(self, *args)
150 def __iadd__(self, *args):
151 """__iadd__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
152 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___iadd__(self, *args)
154 def __isub__(self, *args):
155 """__isub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
156 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___isub__(self, *args)
158 def __add__(self, *args):
159 """__add__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
160 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___add__(self, *args)
162 def __sub__(self, *args):
164 __sub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator
165 __sub__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t
167 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___sub__(self, *args)
169 def __iter__(self):
return self
170 IMP_PARALLEL_SwigPyIterator_swigregister = _IMP_parallel.IMP_PARALLEL_SwigPyIterator_swigregister
171 IMP_PARALLEL_SwigPyIterator_swigregister(IMP_PARALLEL_SwigPyIterator)
178 IMP_DEBUG = _IMP_parallel.IMP_DEBUG
179 IMP_RELEASE = _IMP_parallel.IMP_RELEASE
180 IMP_SILENT = _IMP_parallel.IMP_SILENT
181 IMP_PROGRESS = _IMP_parallel.IMP_PROGRESS
182 IMP_TERSE = _IMP_parallel.IMP_TERSE
183 IMP_VERBOSE = _IMP_parallel.IMP_VERBOSE
184 IMP_MEMORY = _IMP_parallel.IMP_MEMORY
185 IMP_NONE = _IMP_parallel.IMP_NONE
186 IMP_USAGE = _IMP_parallel.IMP_USAGE
187 IMP_INTERNAL = _IMP_parallel.IMP_INTERNAL
188 IMP_BASE_HAS_LOG4CXX = _IMP_parallel.IMP_BASE_HAS_LOG4CXX
189 IMP_COMPILER_HAS_AUTO = _IMP_parallel.IMP_COMPILER_HAS_AUTO
190 IMP_COMPILER_HAS_DEBUG_VECTOR = _IMP_parallel.IMP_COMPILER_HAS_DEBUG_VECTOR
191 IMP_BASE_HAS_BOOST_RANDOM = _IMP_parallel.IMP_BASE_HAS_BOOST_RANDOM
192 IMP_BASE_HAS_GPERFTOOLS = _IMP_parallel.IMP_BASE_HAS_GPERFTOOLS
193 IMP_BASE_HAS_TCMALLOC_HEAPCHECKER = _IMP_parallel.IMP_BASE_HAS_TCMALLOC_HEAPCHECKER
194 IMP_BASE_HAS_TCMALLOC_HEAPPROFILER = _IMP_parallel.IMP_BASE_HAS_TCMALLOC_HEAPPROFILER
195 IMPBASE_SHOW_WARNINGS = _IMP_parallel.IMPBASE_SHOW_WARNINGS
197 class _DirectorObjects(object):
198 """@internal Simple class to keep references to director objects
199 to prevent premature deletion."""
202 def register(self, obj):
203 """Take a reference to a director object; will only work for
204 refcounted C++ classes"""
205 if hasattr(obj,
'get_ref_count'):
206 self._objects.append(obj)
208 """Only drop our reference and allow cleanup by Python if no other
209 Python references exist (we hold 3 references: one in self._objects,
210 one in x, and one in the argument list for getrefcount) *and* no
211 other C++ references exist (the Python object always holds one)"""
212 objs = [x
for x
in self._objects
if sys.getrefcount(x) > 3 \
213 or x.get_ref_count() > 1]
217 def get_object_count(self):
218 """Get number of director objects (useful for testing only)"""
219 return len(self._objects)
220 _director_objects = _DirectorObjects()
222 class _ostream(object):
223 """Proxy of C++ std::ostream class"""
224 thisown = _swig_property(
lambda x: x.this.own(),
lambda x, v: x.this.own(v), doc=
'The membership flag')
225 def __init__(self, *args, **kwargs):
raise AttributeError(
"No constructor defined")
226 __repr__ = _swig_repr
227 def write(self, *args):
228 """write(_ostream self, char const * osa_buf)"""
229 return _IMP_parallel._ostream_write(self, *args)
231 _ostream_swigregister = _IMP_parallel._ostream_swigregister
232 _ostream_swigregister(_ostream)
234 IMP_COMPILER_HAS_OVERRIDE = _IMP_parallel.IMP_COMPILER_HAS_OVERRIDE
235 IMP_COMPILER_HAS_FINAL = _IMP_parallel.IMP_COMPILER_HAS_FINAL
236 IMP_HAS_NOEXCEPT = _IMP_parallel.IMP_HAS_NOEXCEPT
238 IMP_CGAL_HAS_BOOST_FILESYSTEM = _IMP_parallel.IMP_CGAL_HAS_BOOST_FILESYSTEM
239 IMP_CGAL_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_CGAL_HAS_BOOST_PROGRAMOPTIONS
240 IMP_CGAL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_CGAL_HAS_BOOST_RANDOM
241 IMP_CGAL_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_CGAL_HAS_BOOST_SYSTEM
242 IMPCGAL_SHOW_WARNINGS = _IMP_parallel.IMPCGAL_SHOW_WARNINGS
244 IMP_ALGEBRA_HAS_IMP_CGAL = _IMP_parallel.IMP_ALGEBRA_HAS_IMP_CGAL
245 IMP_ALGEBRA_HAS_BOOST_FILESYSTEM = _IMP_parallel.IMP_ALGEBRA_HAS_BOOST_FILESYSTEM
246 IMP_ALGEBRA_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_ALGEBRA_HAS_BOOST_PROGRAMOPTIONS
247 IMP_ALGEBRA_HAS_BOOST_RANDOM = _IMP_parallel.IMP_ALGEBRA_HAS_BOOST_RANDOM
248 IMP_ALGEBRA_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_ALGEBRA_HAS_BOOST_SYSTEM
249 IMP_ALGEBRA_HAS_CGAL = _IMP_parallel.IMP_ALGEBRA_HAS_CGAL
250 IMP_ALGEBRA_HAS_ANN = _IMP_parallel.IMP_ALGEBRA_HAS_ANN
251 IMPALGEBRA_SHOW_WARNINGS = _IMP_parallel.IMPALGEBRA_SHOW_WARNINGS
253 IMP_KERNEL_HAS_IMP_CGAL = _IMP_parallel.IMP_KERNEL_HAS_IMP_CGAL
254 IMP_KERNEL_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_KERNEL_HAS_BOOST_PROGRAMOPTIONS
255 IMP_KERNEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_KERNEL_HAS_BOOST_RANDOM
256 IMP_KERNEL_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_KERNEL_HAS_BOOST_SYSTEM
257 IMP_KERNEL_HAS_CGAL = _IMP_parallel.IMP_KERNEL_HAS_CGAL
258 IMPKERNEL_SHOW_WARNINGS = _IMP_parallel.IMPKERNEL_SHOW_WARNINGS
260 IMP_PARALLEL_HAS_IMP_ALGEBRA = _IMP_parallel.IMP_PARALLEL_HAS_IMP_ALGEBRA
261 IMP_PARALLEL_HAS_IMP_BASE = _IMP_parallel.IMP_PARALLEL_HAS_IMP_BASE
262 IMP_PARALLEL_HAS_IMP_CGAL = _IMP_parallel.IMP_PARALLEL_HAS_IMP_CGAL
263 IMP_PARALLEL_HAS_BOOST_FILESYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_FILESYSTEM
264 IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS
265 IMP_PARALLEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_RANDOM
266 IMP_PARALLEL_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_SYSTEM
267 IMP_PARALLEL_HAS_CGAL = _IMP_parallel.IMP_PARALLEL_HAS_CGAL
268 IMPPARALLEL_SHOW_WARNINGS = _IMP_parallel.IMPPARALLEL_SHOW_WARNINGS
278 import cPickle
as pickle
289 _import_time_path = sys.path[:]
292 """Base class for all errors specific to the parallel module"""
296 class _NoMoreTasksError(Error):
301 """Error raised if all slaves failed, so tasks cannot be run"""
306 """Error raised if a problem occurs with the network"""
311 """Error raised if a slave has an unhandled exception"""
312 def __init__(self, exc, traceback, slave):
314 self.traceback = traceback
318 errstr = str(self.exc.__class__).replace(
"exceptions.",
"")
319 return "%s: %s from %s\nRemote traceback:\n%s" \
320 % (errstr, str(self.exc), str(self.slave), self.traceback)
322 class _Communicator(object):
323 """Simple support for sending Python pickled objects over the network"""
329 def _send(self, obj):
332 p.pack_string(pickle.dumps(obj, -1))
336 p.pack_string(pickle.dumps(obj, 0))
337 self._socket.sendall(p.get_buffer())
339 def get_data_pending(self):
340 return len(self._ibuffer) > 0
345 obj, self._ibuffer = self._unpickle(self._ibuffer)
346 if isinstance(obj, _ErrorWrapper):
350 except (IndexError, EOFError):
352 data = self._socket.recv(4096)
353 except socket.error
as detail:
355 % (str(self), str(detail)))
357 self._ibuffer += data
361 def _unpickle(self, ibuffer):
362 p = xdrlib.Unpacker(ibuffer)
363 obj = p.unpack_string()
364 return (pickle.loads(obj), ibuffer[p.get_position():])
368 """Representation of a single slave.
369 Each slave uses a single thread of execution (i.e. a single CPU core)
370 to run tasks sequentially.
371 Slave is an abstract class; instead of using this class directly, use
372 a subclass such as LocalSlave or SGEQsubSlaveArray."""
375 _Communicator.__init__(self)
376 self._state = slavestate.init
379 self.update_contact_time()
381 def _start(self, command, unique_id, output):
382 """Start the slave running on the remote host; override in subclasses"""
383 self._state = slavestate.started
385 def _accept_connection(self, sock):
387 self._state = slavestate.connected
388 self.update_contact_time()
390 def _set_python_search_path(self, path):
391 self._send(_SetPathAction(path))
393 def update_contact_time(self):
394 self.last_contact_time = time.time()
396 def get_contact_timed_out(self, timeout):
397 return (time.time() - self.last_contact_time) > timeout
399 def _start_task(self, task, context):
400 if not self._ready_for_task(context)
and not self._ready_for_task(
None):
401 raise TypeError(
"%s not ready for task" % str(self))
402 if self._context != context:
403 self._context = context
404 self._send(_ContextWrapper(context._startup))
405 self._state = slavestate.running_task
407 self._send(_TaskWrapper(task))
409 def _get_finished_task(self):
412 self.update_contact_time()
413 if isinstance(r, _HeartBeat):
414 if not self.get_data_pending():
421 self._state = slavestate.connected
428 self._state = slavestate.dead
431 def _ready_to_start(self):
432 return self._state == slavestate.init
434 def _ready_for_task(self, context):
435 return self._state == slavestate.connected \
436 and self._context == context
438 def _running_task(self, context):
439 return self._state == slavestate.running_task \
440 and self._context == context
444 """Representation of an array of slaves.
445 This is similar to Slave, except that it represents a collection of
446 slaves that are controlled together, such as a batch submission system
447 array job on a compute cluster.
448 Slave is an abstract class; instead of using this class directly, use
449 a subclass such as SGEQsubSlaveArray."""
451 def _get_slaves(self):
452 """Return a list of Slave objects contained within this array"""
456 """Do any necessary startup after all contained Slaves have started"""
460 class LocalSlave(Slave):
461 """A slave running on the same machine as the master."""
463 def _start(self, command, unique_id, output):
464 Slave._start(self, command, unique_id, output)
465 cmdline =
"%s %s" % (command, unique_id)
466 _run_background(cmdline, output)
469 return "<LocalSlave>"
472 class _SGEQsubSlave(Slave):
473 def __init__(self, array):
478 def _start(self, command, unique_id, output):
479 Slave._start(self, command, unique_id, output)
480 self._array._slave_started(unique_id, output, self)
486 return "<SGE qsub slave, ID %s>" % jobid
490 """An array of slaves on a Sun Grid Engine system, started with 'qsub'.
491 To use this class, the master process must be running on a machine that
492 can submit Sun Grid Engine (SGE) jobs using the 'qsub' command (this
493 is termed a 'submit host' by SGE). The class starts an SGE job array
494 (every slave has the same SGE job ID, but a different task ID).
498 standard_options =
'-j y -cwd -r n -o sge-errors'
502 @param numslave The number of slaves, which correponds to the
503 number of tasks in the SGE job.
504 @param options A string of SGE options that are passed on the 'qsub'
505 command line. This is added to standard_options.
507 self._numslave = numslave
508 self._options = options
509 self._starting_slaves = []
512 def _get_slaves(self):
513 """Return a list of Slave objects contained within this array"""
514 return [_SGEQsubSlave(self)
for x
in range(self._numslave)]
516 def _slave_started(self, command, output, slave):
517 self._starting_slaves.append((command, output, slave))
519 def _start(self, command):
520 qsub =
"qsub -S /bin/sh %s %s -t 1-%d" % \
521 (self._options, self.standard_options,
522 len(self._starting_slaves))
525 (inp, out) = (a.stdin, a.stdout)
526 slave_uid =
" ".join([repr(s[0])
for s
in self._starting_slaves])
527 slave_out =
" ".join([repr(s[1])
for s
in self._starting_slaves])
528 inp.write(
"#!/bin/sh\n")
529 inp.write(
"uid=( '' %s )\n" % slave_uid)
530 inp.write(
"out=( '' %s )\n" % slave_out)
531 inp.write(
"myuid=${uid[$SGE_TASK_ID]}\n")
532 inp.write(
"myout=${out[$SGE_TASK_ID]}\n")
533 inp.write(
"%s $myuid > $myout 2>&1\n" % command)
535 outlines = out.readlines()
537 for line
in outlines:
538 print(line.rstrip(
'\r\n'))
539 a.require_clean_exit()
540 self._set_jobid(outlines)
541 self._starting_slaves = []
543 def _set_jobid(self, outlines):
544 """Try to figure out the job ID from the SGE qsub output"""
545 if len(outlines) > 0:
546 m = re.compile(
r"\d+").search(outlines[0])
548 self._jobid = int(m.group())
549 for (num, slave)
in enumerate(self._starting_slaves):
550 slave[2]._jobid =
"%d.%d" % (self._jobid, num+1)
553 class _SGEPESlave(
Slave):
558 def _start(self, command, unique_id, output):
559 Slave._start(self, command, unique_id, output)
560 cmdline =
"qrsh -inherit -V %s %s %s" % (self._host, command, unique_id)
561 _run_background(cmdline, output)
564 return "<SGE PE slave on %s>" % self._host
568 """An array of slaves in a Sun Grid Engine system parallel environment.
569 In order to use this class, the master must be run via Sun Grid Engine's
570 'qsub' command and submitted to a parallel environment using the qsub
571 -pe option. This class will start slaves on every node in the parallel
572 environment (including the node running the master). Each slave is
573 started using the 'qrsh' command with the '-inherit' option."""
575 def _get_slaves(self):
578 pe = os.environ[
'PE_HOSTFILE']
584 (node, num, queue) = line.split(
None, 2)
585 for i
in range(int(num)):
586 slaves.append(_SGEPESlave(node))
596 """A collection of tasks that run in the same environment.
597 Context objects are typically created by calling Manager::get_context().
601 self._manager = manager
602 self._startup = startup
606 """Add a task to this context.
607 Tasks are any Python callable object that can be pickled (e.g. a
608 function or a class that implements the \_\_call\_\_ method). When
609 the task is run on the slave its arguments are the return value
610 from this context's startup function."""
611 self._tasks.append(task)
614 """Run all of the tasks on available slaves, and return results.
615 If there are more tasks than slaves, subsequent tasks are
616 started only once a running task completes: each slave only runs
617 a single task at a time. As each task completes, the return value(s)
618 from the task callable are returned from this method, as a
619 Python generator. Note that the results are returned in the order
620 that tasks complete, which may not be the same as the order they
623 \exception NoMoreSlavesError there are no slaves available
624 to run the tasks (or they all failed during execution).
625 \exception RemoteError a slave encountered an unhandled exception.
626 \exception NetworkError the master lost (or was unable to
627 establish) communication with any slave.
629 return self._manager._get_results_unordered(self)
633 """Manages slaves and contexts.
636 connect_timeout = 7200
639 heartbeat_timeout = 7200
641 def __init__(self, python=None, host=None, output='slave%d.output'):
643 @param python If not None, the command to run to start a Python
644 interpreter that can import the IMP module. Otherwise,
645 the same interpreter that the master is currently using
646 is used. This is passed to the shell, so a full command
647 line (including multiple words separated by spaces) can
648 be used if necessary.
649 @param host The hostname that slaves use to connect back to the
650 master. If not specified, the master machine's primary
651 IP address is used. On multi-homed machines, such as
652 compute cluster headnodes, this may need to be changed
653 to allow all slaves to reach the master (typically the
654 name of the machine's internal network address is
655 needed). If only running local slaves, 'localhost' can
656 be used to prohibit connections across the network.
657 @param output A format string used to name slave output files. It is
658 given the numeric slave id, so for example the default
659 value 'slave\%d.output' will yield output files called
660 slave0.output, slave1.output, etc.
663 self._python = sys.executable
665 self._python = python
667 self._output = output
668 self._all_slaves = []
669 self._starting_slaves = {}
670 self._slave_arrays = []
675 self._host = socket.gethostbyname_ex(socket.gethostname())[-1][0]
676 self._listen_sock = _ListenSocket(self._host, self.connect_timeout)
679 """Add a Slave object."""
680 if hasattr(slave,
'_get_slaves'):
681 self._slave_arrays.append(slave)
683 self._all_slaves.append(slave)
686 """Create and return a new Context in which tasks can be run.
687 @param startup If not None, a callable (Python function or class
688 that implements the \_\_call\_\_ method) that sets up
689 the slave to run tasks. This method is only called
690 once per slave. The return values from this method
691 are passed to the task object when it runs on
693 @return A new Context object.
697 def _get_results_unordered(self, context):
698 """Run all of a context's tasks, and yield results"""
699 self._send_tasks_to_slaves(context)
702 for task
in self._get_finished_tasks(context):
703 tasks_queued = len(context._tasks)
707 if len(context._tasks) > tasks_queued:
708 self._send_tasks_to_slaves(context)
709 except _NoMoreTasksError:
712 def _start_all_slaves(self):
713 for array
in self._slave_arrays:
714 self._all_slaves.extend(array._get_slaves())
716 command = (
"%s -c \"import IMP.parallel.slave_handler as s; s.main()\" "
717 "%s %d") % (self._python, self._host, self._listen_sock.port)
719 for (num, slave)
in enumerate(self._all_slaves):
720 if slave._ready_to_start():
721 unique_id = self._get_unique_id(num)
722 self._starting_slaves[unique_id] = slave
723 slave._start(command, unique_id, self._output % num)
725 for array
in self._slave_arrays:
726 array._start(command)
727 self._slave_arrays = []
729 def _get_unique_id(self, num):
731 for i
in range(0, 8):
732 id += chr(random.randint(0, 25) + ord(
'A'))
735 def _send_tasks_to_slaves(self, context):
736 self._start_all_slaves()
738 available_slaves = [a
for a
in self._all_slaves
739 if a._ready_for_task(context)] + \
740 [a
for a
in self._all_slaves
741 if a._ready_for_task(
None)]
742 for slave
in available_slaves:
743 if len(context._tasks) == 0:
746 self._send_task_to_slave(slave, context)
748 def _send_task_to_slave(self, slave, context):
749 if len(context._tasks) == 0:
751 t = context._tasks[0]
753 slave._start_task(t, context)
754 context._tasks.pop(0)
755 except socket.error
as detail:
758 def _get_finished_tasks(self, context):
760 events = self._get_network_events(context)
762 self._kill_all_running_slaves(context)
764 task = self._process_event(event, context)
768 def _process_event(self, event, context):
769 if event == self._listen_sock:
771 (conn, addr) = self._listen_sock.accept()
772 new_slave = self._accept_slave(conn, context)
773 elif event._running_task(context):
775 task = event._get_finished_task()
777 self._send_task_to_slave(event, context)
780 self._kill_timed_out_slaves(context)
781 except NetworkError
as detail:
783 print(
"Slave %s failed (%s): rescheduling task %s" \
784 % (str(event), str(detail), str(task)))
785 context._tasks.append(task)
786 self._send_tasks_to_slaves(context)
790 def _kill_timed_out_slaves(self, context):
791 timed_out = [a
for a
in self._all_slaves
if a._running_task(context) \
792 and a.get_contact_timed_out(self.heartbeat_timeout)]
793 for slave
in timed_out:
795 print(
"Did not hear from slave %s in %d seconds; rescheduling "
796 "task %s" % (str(slave), self.heartbeat_timeout, str(task)))
797 context._tasks.append(task)
798 if len(timed_out) > 0:
799 self._send_tasks_to_slaves(context)
801 def _kill_all_running_slaves(self, context):
802 running = [a
for a
in self._all_slaves
if a._running_task(context)]
803 for slave
in running:
805 context._tasks.append(task)
806 raise NetworkError(
"Did not hear from any running slave in "
807 "%d seconds" % self.heartbeat_timeout)
809 def _accept_slave(self, sock, context):
810 sock.setblocking(
True)
811 identifier = sock.recv(1024)
813 identifier = identifier.decode(
'ascii')
814 if identifier
and identifier
in self._starting_slaves:
815 slave = self._starting_slaves.pop(identifier)
816 slave._accept_connection(sock)
817 print(
"Identified slave %s " % str(slave))
818 self._init_slave(slave)
819 self._send_task_to_slave(slave, context)
822 print(
"Ignoring request from unknown slave")
824 def _init_slave(self, slave):
825 if _import_time_path[0] !=
'':
826 slave._set_python_search_path(_import_time_path[0])
827 if sys.path[0] !=
'' and sys.path[0] != _import_time_path[0]:
828 slave._set_python_search_path(sys.path[0])
830 def _get_network_events(self, context):
831 running = [a
for a
in self._all_slaves
if a._running_task(context)]
832 if len(running) == 0:
833 if len(context._tasks) == 0:
834 raise _NoMoreTasksError()
835 elif len(self._starting_slaves) == 0:
839 return util._poll_events(self._listen_sock, running,
840 self.heartbeat_timeout)
844 def get_module_version():
845 """get_module_version() -> std::string const"""
846 return _IMP_parallel.get_module_version()
849 """get_example_path(std::string fname) -> std::string"""
850 return _IMP_parallel.get_example_path(*args)
853 """get_data_path(std::string fname) -> std::string"""
854 return _IMP_parallel.get_data_path(*args)
855 from .
import _version_check
856 _version_check.check_version(get_module_version())
Representation of an array of slaves.
std::string get_data_path(std::string file_name)
Return the full path to installed data.
Error raised if a slave has an unhandled exception.
Make CGAL functionality available to IMP.
Low level functionality (logging, error handling, profiling, command line flags etc) that is used by ...
An array of slaves in a Sun Grid Engine system parallel environment.
A collection of tasks that run in the same environment.
Representation of a single slave.
Base functionality and abstract base classes for representation, scoring and sampling.
std::string get_example_path(std::string file_name)
Return the path to installed example data for this module.
def add_task
Add a task to this context.
Utilities for the IMP.parallel module.
General purpose algebraic and geometric methods that are expected to be used by a wide variety of IMP...
Base class for all errors specific to the parallel module.
def add_slave
Add a Slave object.
def get_context
Create and return a new Context in which tasks can be run.
Error raised if a problem occurs with the network.
Error raised if all slaves failed, so tasks cannot be run.
Distribute IMP tasks to multiple processors or machines.
An array of slaves on a Sun Grid Engine system, started with 'qsub'.
def get_results_unordered
Run all of the tasks on available slaves, and return results.
Manages slaves and contexts.
A slave running on the same machine as the master.