9 from sys
import version_info
10 if version_info >= (2,6,0):
11 def swig_import_helper():
12 from os.path
import dirname
16 fp, pathname, description = imp.find_module(
'_IMP_parallel', [dirname(__file__)])
22 _mod = imp.load_module(
'_IMP_parallel', fp, pathname, description)
26 _IMP_parallel = swig_import_helper()
27 del swig_import_helper
32 _swig_property = property
35 def _swig_setattr_nondynamic(self,class_type,name,value,static=1):
36 if (name ==
"thisown"):
return self.this.own(value)
38 if type(value).__name__ ==
'SwigPyObject':
39 self.__dict__[name] = value
41 method = class_type.__swig_setmethods__.get(name,
None)
42 if method:
return method(self,value)
44 self.__dict__[name] = value
46 raise AttributeError(
"You cannot add attributes to %s" % self)
48 def _swig_setattr(self,class_type,name,value):
49 return _swig_setattr_nondynamic(self,class_type,name,value,0)
51 def _swig_getattr(self,class_type,name):
52 if (name ==
"thisown"):
return self.this.own()
53 method = class_type.__swig_getmethods__.get(name,
None)
54 if method:
return method(self)
55 raise AttributeError(name)
58 try: strthis =
"proxy of " + self.this.__repr__()
60 return "<%s.%s; %s >" % (self.__class__.__module__, self.__class__.__name__, strthis,)
65 except AttributeError:
72 weakref_proxy = weakref.proxy
74 weakref_proxy =
lambda x: x
77 class IMP_PARALLEL_SwigPyIterator(_object):
78 """Proxy of C++ swig::IMP_PARALLEL_SwigPyIterator class"""
79 __swig_setmethods__ = {}
80 __setattr__ =
lambda self, name, value: _swig_setattr(self, IMP_PARALLEL_SwigPyIterator, name, value)
81 __swig_getmethods__ = {}
82 __getattr__ =
lambda self, name: _swig_getattr(self, IMP_PARALLEL_SwigPyIterator, name)
83 def __init__(self, *args, **kwargs):
raise AttributeError(
"No constructor defined - class is abstract")
85 __swig_destroy__ = _IMP_parallel.delete_IMP_PARALLEL_SwigPyIterator
86 __del__ =
lambda self :
None;
88 """value(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
89 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_value(self)
93 incr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
94 incr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
96 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_incr(self, n)
100 decr(IMP_PARALLEL_SwigPyIterator self, size_t n=1) -> IMP_PARALLEL_SwigPyIterator
101 decr(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator
103 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_decr(self, n)
105 def distance(self, *args):
106 """distance(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t"""
107 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_distance(self, *args)
109 def equal(self, *args):
110 """equal(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
111 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_equal(self, *args)
114 """copy(IMP_PARALLEL_SwigPyIterator self) -> IMP_PARALLEL_SwigPyIterator"""
115 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_copy(self)
118 """next(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
119 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_next(self)
122 """__next__(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
123 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___next__(self)
126 """previous(IMP_PARALLEL_SwigPyIterator self) -> PyObject *"""
127 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_previous(self)
129 def advance(self, *args):
130 """advance(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
131 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator_advance(self, *args)
133 def __eq__(self, *args):
134 """__eq__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
135 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___eq__(self, *args)
137 def __ne__(self, *args):
138 """__ne__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> bool"""
139 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___ne__(self, *args)
141 def __iadd__(self, *args):
142 """__iadd__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
143 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___iadd__(self, *args)
145 def __isub__(self, *args):
146 """__isub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
147 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___isub__(self, *args)
149 def __add__(self, *args):
150 """__add__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator"""
151 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___add__(self, *args)
153 def __sub__(self, *args):
155 __sub__(IMP_PARALLEL_SwigPyIterator self, ptrdiff_t n) -> IMP_PARALLEL_SwigPyIterator
156 __sub__(IMP_PARALLEL_SwigPyIterator self, IMP_PARALLEL_SwigPyIterator x) -> ptrdiff_t
158 return _IMP_parallel.IMP_PARALLEL_SwigPyIterator___sub__(self, *args)
160 def __iter__(self):
return self
161 IMP_PARALLEL_SwigPyIterator_swigregister = _IMP_parallel.IMP_PARALLEL_SwigPyIterator_swigregister
162 IMP_PARALLEL_SwigPyIterator_swigregister(IMP_PARALLEL_SwigPyIterator)
169 IMP_HAS_DEPRECATED = _IMP_parallel.IMP_HAS_DEPRECATED
170 IMP_DEBUG = _IMP_parallel.IMP_DEBUG
171 IMP_RELEASE = _IMP_parallel.IMP_RELEASE
172 IMP_SILENT = _IMP_parallel.IMP_SILENT
173 IMP_PROGRESS = _IMP_parallel.IMP_PROGRESS
174 IMP_TERSE = _IMP_parallel.IMP_TERSE
175 IMP_VERBOSE = _IMP_parallel.IMP_VERBOSE
176 IMP_NONE = _IMP_parallel.IMP_NONE
177 IMP_USAGE = _IMP_parallel.IMP_USAGE
178 IMP_INTERNAL = _IMP_parallel.IMP_INTERNAL
179 IMP_COMPILER_HAS_AUTO = _IMP_parallel.IMP_COMPILER_HAS_AUTO
180 IMP_COMPILER_HAS_DEBUG_VECTOR = _IMP_parallel.IMP_COMPILER_HAS_DEBUG_VECTOR
181 IMP_COMPILER_HAS_NULLPTR = _IMP_parallel.IMP_COMPILER_HAS_NULLPTR
182 IMP_BASE_HAS_BOOST_RANDOM = _IMP_parallel.IMP_BASE_HAS_BOOST_RANDOM
183 IMP_BASE_HAS_GPERFTOOLS = _IMP_parallel.IMP_BASE_HAS_GPERFTOOLS
184 IMP_BASE_HAS_LOG4CXX = _IMP_parallel.IMP_BASE_HAS_LOG4CXX
185 IMP_BASE_HAS_TCMALLOC_HEAPCHECKER = _IMP_parallel.IMP_BASE_HAS_TCMALLOC_HEAPCHECKER
186 IMP_BASE_HAS_TCMALLOC_HEAPPROFILER = _IMP_parallel.IMP_BASE_HAS_TCMALLOC_HEAPPROFILER
188 class _DirectorObjects(object):
189 """@internal Simple class to keep references to director objects
190 to prevent premature deletion."""
193 def register(self, obj):
194 """Take a reference to a director object; will only work for
195 refcounted C++ classes"""
196 if hasattr(obj,
'get_ref_count'):
197 self._objects.append(obj)
199 """Only drop our reference and allow cleanup by Python if no other
200 Python references exist (we hold 3 references: one in self._objects,
201 one in x, and one in the argument list for getrefcount) *and* no
202 other C++ references exist (the Python object always holds one)"""
203 objs = [x
for x
in self._objects
if sys.getrefcount(x) > 3 \
204 or x.get_ref_count() > 1]
208 def get_object_count(self):
209 """Get number of director objects (useful for testing only)"""
210 return len(self._objects)
211 _director_objects = _DirectorObjects()
213 DEFAULT_CHECK = _IMP_parallel.DEFAULT_CHECK
214 NONE = _IMP_parallel.NONE
215 USAGE = _IMP_parallel.USAGE
216 USAGE_AND_INTERNAL = _IMP_parallel.USAGE_AND_INTERNAL
219 """set_check_level(IMP::base::CheckLevel tf)"""
220 return _IMP_parallel.set_check_level(*args)
223 """get_check_level() -> IMP::base::CheckLevel"""
224 return _IMP_parallel.get_check_level()
225 class _ostream(_object):
226 """Proxy of C++ std::ostream class"""
227 __swig_setmethods__ = {}
228 __setattr__ =
lambda self, name, value: _swig_setattr(self, _ostream, name, value)
229 __swig_getmethods__ = {}
230 __getattr__ =
lambda self, name: _swig_getattr(self, _ostream, name)
231 def __init__(self, *args, **kwargs):
raise AttributeError(
"No constructor defined")
232 __repr__ = _swig_repr
233 def write(self, *args):
234 """write(_ostream self, char const * osa_buf)"""
235 return _IMP_parallel._ostream_write(self, *args)
237 _ostream_swigregister = _IMP_parallel._ostream_swigregister
238 _ostream_swigregister(_ostream)
240 IMP_COMPILER_HAS_OVERRIDE = _IMP_parallel.IMP_COMPILER_HAS_OVERRIDE
241 IMP_COMPILER_HAS_FINAL = _IMP_parallel.IMP_COMPILER_HAS_FINAL
243 IMP_CGAL_HAS_BOOST_FILESYSTEM = _IMP_parallel.IMP_CGAL_HAS_BOOST_FILESYSTEM
244 IMP_CGAL_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_CGAL_HAS_BOOST_PROGRAMOPTIONS
245 IMP_CGAL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_CGAL_HAS_BOOST_RANDOM
246 IMP_CGAL_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_CGAL_HAS_BOOST_SYSTEM
248 IMP_ALGEBRA_HAS_IMP_CGAL = _IMP_parallel.IMP_ALGEBRA_HAS_IMP_CGAL
249 IMP_ALGEBRA_HAS_BOOST_FILESYSTEM = _IMP_parallel.IMP_ALGEBRA_HAS_BOOST_FILESYSTEM
250 IMP_ALGEBRA_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_ALGEBRA_HAS_BOOST_PROGRAMOPTIONS
251 IMP_ALGEBRA_HAS_BOOST_RANDOM = _IMP_parallel.IMP_ALGEBRA_HAS_BOOST_RANDOM
252 IMP_ALGEBRA_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_ALGEBRA_HAS_BOOST_SYSTEM
253 IMP_ALGEBRA_HAS_CGAL = _IMP_parallel.IMP_ALGEBRA_HAS_CGAL
254 IMP_ALGEBRA_HAS_ANN = _IMP_parallel.IMP_ALGEBRA_HAS_ANN
256 IMP_KERNEL_HAS_IMP_CGAL = _IMP_parallel.IMP_KERNEL_HAS_IMP_CGAL
257 IMP_KERNEL_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_KERNEL_HAS_BOOST_PROGRAMOPTIONS
258 IMP_KERNEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_KERNEL_HAS_BOOST_RANDOM
259 IMP_KERNEL_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_KERNEL_HAS_BOOST_SYSTEM
260 IMP_KERNEL_HAS_CGAL = _IMP_parallel.IMP_KERNEL_HAS_CGAL
262 IMP_PARALLEL_HAS_IMP_ALGEBRA = _IMP_parallel.IMP_PARALLEL_HAS_IMP_ALGEBRA
263 IMP_PARALLEL_HAS_IMP_BASE = _IMP_parallel.IMP_PARALLEL_HAS_IMP_BASE
264 IMP_PARALLEL_HAS_IMP_CGAL = _IMP_parallel.IMP_PARALLEL_HAS_IMP_CGAL
265 IMP_PARALLEL_HAS_BOOST_FILESYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_FILESYSTEM
266 IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_PROGRAMOPTIONS
267 IMP_PARALLEL_HAS_BOOST_RANDOM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_RANDOM
268 IMP_PARALLEL_HAS_BOOST_SYSTEM = _IMP_parallel.IMP_PARALLEL_HAS_BOOST_SYSTEM
269 IMP_PARALLEL_HAS_CGAL = _IMP_parallel.IMP_PARALLEL_HAS_CGAL
279 import cPickle
as pickle
290 _import_time_path = sys.path[:]
292 class Error(Exception):
293 """Base class for all errors specific to the parallel module"""
297 class _NoMoreTasksError(Error):
301 class NoMoreSlavesError(Error):
302 """Error raised if all slaves failed, so tasks cannot be run"""
306 class NetworkError(Error):
307 """Error raised if a problem occurs with the network"""
311 class RemoteError(Error):
312 """Error raised if a slave has an unhandled exception"""
313 def __init__(self, exc, traceback, slave):
315 self.traceback = traceback
319 errstr = str(self.exc.__class__).replace(
"exceptions.",
"")
320 return "%s: %s from %s\nRemote traceback:\n%s" \
321 % (errstr, str(self.exc), str(self.slave), self.traceback)
323 class _Communicator(object):
324 """Simple support for sending Python pickled objects over the network"""
330 def _send(self, obj):
333 p.pack_string(pickle.dumps(obj, -1))
337 p.pack_string(pickle.dumps(obj, 0))
338 self._socket.sendall(p.get_buffer())
340 def get_data_pending(self):
341 return len(self._ibuffer) > 0
346 obj, self._ibuffer = self._unpickle(self._ibuffer)
347 if isinstance(obj, _ErrorWrapper):
348 raise RemoteError(obj.obj, obj.traceback, self)
351 except (IndexError, EOFError):
353 data = self._socket.recv(4096)
354 except socket.error, detail:
355 raise NetworkError(
"Connection lost to %s: %s" \
356 % (str(self), str(detail)))
358 self._ibuffer += data
360 raise NetworkError(
"%s closed connection" % str(self))
362 def _unpickle(self, ibuffer):
363 p = xdrlib.Unpacker(ibuffer)
364 obj = p.unpack_string()
365 return (pickle.loads(obj), ibuffer[p.get_position():])
368 class Slave(_Communicator):
369 """Representation of a single slave.
370 Each slave uses a single thread of execution (i.e. a single CPU core)
371 to run tasks sequentially.
372 Slave is an abstract class; instead of using this class directly, use
373 a subclass such as LocalSlave or SGEQsubSlaveArray."""
376 _Communicator.__init__(self)
377 self._state = slavestate.init
380 self.update_contact_time()
382 def _start(self, command, unique_id, output):
383 """Start the slave running on the remote host; override in subclasses"""
384 self._state = slavestate.started
386 def _accept_connection(self, sock):
388 self._state = slavestate.connected
389 self.update_contact_time()
391 def _set_python_search_path(self, path):
392 self._send(_SetPathAction(path))
394 def update_contact_time(self):
395 self.last_contact_time = time.time()
397 def get_contact_timed_out(self, timeout):
398 return (time.time() - self.last_contact_time) > timeout
400 def _start_task(self, task, context):
401 if not self.ready_for_task(context)
and not self.ready_for_task(
None):
402 raise TypeError(
"%s not ready for task" % str(self))
403 if self._context != context:
404 self._context = context
405 self._send(_ContextWrapper(context._startup))
406 self._state = slavestate.running_task
408 self._send(_TaskWrapper(task))
410 def _get_finished_task(self):
413 self.update_contact_time()
414 if isinstance(r, _HeartBeat):
415 if not self.get_data_pending():
422 self._state = slavestate.connected
429 self._state = slavestate.dead
432 def ready_to_start(self):
433 return self._state == slavestate.init
435 def ready_for_task(self, context):
436 return self._state == slavestate.connected \
437 and self._context == context
439 def running_task(self, context):
440 return self._state == slavestate.running_task \
441 and self._context == context
444 class SlaveArray(object):
445 """Representation of an array of slaves.
446 This is similar to Slave, except that it represents a collection of
447 slaves that are controlled together, such as a batch submission system
448 array job on a compute cluster."""
450 def _get_slaves(self):
451 """Return a list of Slave objects contained within this array"""
455 """Do any necessary startup after all contained Slaves have started"""
459 class LocalSlave(Slave):
460 """A slave running on the same machine as the master."""
462 def _start(self, command, unique_id, output):
463 Slave._start(self, command, unique_id, output)
464 cmdline =
"%s %s" % (command, unique_id)
465 _run_background(cmdline, output)
468 return "<LocalSlave>"
471 class _SGEQsubSlave(Slave):
472 def __init__(self, array):
477 def _start(self, command, unique_id, output):
478 Slave._start(self, command, unique_id, output)
479 self._array._slave_started(unique_id, output, self)
485 return "<SGE qsub slave, ID %s>" % jobid
488 class SGEQsubSlaveArray(SlaveArray):
489 """An array of slaves on a Sun Grid Engine system, started with 'qsub'.
490 To use this class, the master process must be running on a machine that
491 can submit Sun Grid Engine (SGE) jobs using the 'qsub' command (this
492 is termed a 'submit host' by SGE). The class starts an SGE job array
493 (every slave has the same SGE job ID, but a different task ID).
494 @param numslave The number of slaves, which correponds to the number of
495 tasks in the SGE job.
496 @param options A string of SGE options that are passed on the 'qsub'
497 command line. This is added to standard_options.
501 standard_options =
'-j y -cwd -r n -o sge-errors'
503 def __init__(self, numslave, options):
504 self._numslave = numslave
505 self._options = options
506 self._starting_slaves = []
509 def _get_slaves(self):
510 """Return a list of Slave objects contained within this array"""
511 return [_SGEQsubSlave(self)
for x
in range(self._numslave)]
513 def _slave_started(self, command, output, slave):
514 self._starting_slaves.append((command, output, slave))
516 def _start(self, command):
517 qsub =
"qsub -S /bin/sh %s %s -t 1-%d" % \
518 (self._options, self.standard_options,
519 len(self._starting_slaves))
522 (inp, out) = (a.stdin, a.stdout)
523 slave_uid =
" ".join([repr(s[0])
for s
in self._starting_slaves])
524 slave_out =
" ".join([repr(s[1])
for s
in self._starting_slaves])
525 inp.write(
"#!/bin/sh\n")
526 inp.write(
"uid=( '' %s )\n" % slave_uid)
527 inp.write(
"out=( '' %s )\n" % slave_out)
528 inp.write(
"myuid=${uid[$SGE_TASK_ID]}\n")
529 inp.write(
"myout=${out[$SGE_TASK_ID]}\n")
530 inp.write(
"%s $myuid > $myout 2>&1\n" % command)
532 outlines = out.readlines()
534 for line
in outlines:
535 print line.rstrip(
'\r\n')
536 a.require_clean_exit()
537 self._set_jobid(outlines)
538 self._starting_slaves = []
540 def _set_jobid(self, outlines):
541 """Try to figure out the job ID from the SGE qsub output"""
542 if len(outlines) > 0:
543 m = re.compile(
r"\d+").search(outlines[0])
545 self._jobid = int(m.group())
546 for (num, slave)
in enumerate(self._starting_slaves):
547 slave[2]._jobid =
"%d.%d" % (self._jobid, num+1)
550 class _SGEPESlave(Slave):
551 def __init__(self, host):
555 def _start(self, command, unique_id, output):
556 Slave._start(self, command, unique_id, output)
557 cmdline =
"qrsh -inherit -V %s %s %s" % (self._host, command, unique_id)
558 _run_background(cmdline, output)
561 return "<SGE PE slave on %s>" % self._host
564 class SGEPESlaveArray(SlaveArray):
565 """An array of slaves in a Sun Grid Engine system parallel environment.
566 In order to use this class, the master must be run via Sun Grid Engine's
567 'qsub' command and submitted to a parallel environment using the qsub
568 -pe option. This class will start slaves on every node in the parallel
569 environment (including the node running the master). Each slave is
570 started using the 'qrsh' command with the '-inherit' option."""
572 def _get_slaves(self):
575 pe = os.environ[
'PE_HOSTFILE']
581 (node, num, queue) = line.split(
None, 2)
582 for i
in range(int(num)):
583 slaves.append(_SGEPESlave(node))
588 slaves[0] = LocalSlave()
592 class Context(object):
593 """A collection of tasks that run in the same environment.
594 Context objects are typically created by calling Manager::get_context().
596 def __init__(self, manager, startup=None):
597 self._manager = manager
598 self._startup = startup
601 def add_task(self, task):
602 """Add a task to this context.
603 Tasks are any Python callable object that can be pickled (e.g. a
604 function or a class that implements the __call__ method). When the
605 task is run on the slave its arguments are the return value from this
606 context's startup function."""
607 self._tasks.append(task)
609 def get_results_unordered(self):
610 """Run all of the tasks on available slaves, and return results.
611 If there are more tasks than slaves, subsequent tasks are
612 started only once a running task completes: each slave only runs
613 a single task at a time. As each task completes, the return value(s)
614 from the task callable are returned from this method, as a
615 Python generator. Note that the results are returned in the order
616 that tasks complete, which may not be the same as the order they
619 \exception NoMoreSlavesError there are no slaves available
620 to run the tasks (or they all failed during execution).
621 \exception RemoteError a slave encountered an unhandled exception.
622 \exception NetworkError the master lost (or was unable to
623 establish) communication with any slave.
625 return self._manager._get_results_unordered(self)
628 class Manager(object):
629 """Manages slaves and contexts.
630 @param python If not None, the command to run to start a Python
631 interpreter that can import the IMP module. Otherwise,
632 the same interpreter that the master is currently using
633 is used. This is passed to the shell, so a full command
634 line (including multiple words separated by spaces) can
635 be used if necessary.
636 @param host The hostname that slaves use to connect back to the master.
637 If not specified, the master machine's primary IP address
638 is used. On multi-homed machines, such as compute cluster
639 headnodes, this may need to be changed to allow all
640 slaves to reach the master (typically the name of the
641 machine's internal network address is needed). If only
642 running local slaves, 'localhost' can be used to prohibit
643 connections across the network.
644 @param output A format string used to name slave output files. It is
645 given the numeric slave id, so for example the default
646 value 'slave%d.output' will yield output files called
647 slave0.output, slave1.output, etc.
650 connect_timeout = 7200
653 heartbeat_timeout = 7200
655 def __init__(self, python=None, host=None, output='slave%d.output'):
657 self._python = sys.executable
659 self._python = python
661 self._output = output
662 self._all_slaves = []
663 self._starting_slaves = {}
664 self._slave_arrays = []
669 self._host = socket.gethostbyname_ex(socket.gethostname())[-1][0]
670 self._listen_sock = _ListenSocket(self._host, self.connect_timeout)
673 """Add a Slave object."""
674 if hasattr(slave,
'_get_slaves'):
675 self._slave_arrays.append(slave)
677 self._all_slaves.append(slave)
680 """Create and return a new Context in which tasks can be run.
681 @param startup If not None, a callable (Python function or class
682 that implements the __call__ method) that sets up
683 the slave to run tasks. This method is only called
684 once per slave. The return values from this method
685 are passed to the task object when it runs on
688 return Context(self, startup)
690 def _get_results_unordered(self, context):
691 """Run all of a context's tasks, and yield results"""
692 self._send_tasks_to_slaves(context)
695 for task
in self._get_finished_tasks(context):
696 tasks_queued = len(context._tasks)
700 if len(context._tasks) > tasks_queued:
701 self._send_tasks_to_slaves(context)
702 except _NoMoreTasksError:
705 def _start_all_slaves(self):
706 for array
in self._slave_arrays:
707 self._all_slaves.extend(array._get_slaves())
709 command = (
"%s -c \"import IMP.parallel.slave_handler as s; s.main()\" "
710 "%s %d") % (self._python, self._host, self._listen_sock.port)
712 for (num, slave)
in enumerate(self._all_slaves):
713 if slave.ready_to_start():
714 unique_id = self._get_unique_id(num)
715 self._starting_slaves[unique_id] = slave
716 slave._start(command, unique_id, self._output % num)
718 for array
in self._slave_arrays:
719 array._start(command)
720 self._slave_arrays = []
722 def _get_unique_id(self, num):
724 for i
in range(0, 8):
725 id += chr(random.randint(0, 25) + ord(
'A'))
728 def _send_tasks_to_slaves(self, context):
729 self._start_all_slaves()
731 available_slaves = [a
for a
in self._all_slaves
732 if a.ready_for_task(context)] + \
733 [a
for a
in self._all_slaves
734 if a.ready_for_task(
None)]
735 for slave
in available_slaves:
736 if len(context._tasks) == 0:
739 self._send_task_to_slave(slave, context)
741 def _send_task_to_slave(self, slave, context):
742 if len(context._tasks) == 0:
744 t = context._tasks[0]
746 slave._start_task(t, context)
747 context._tasks.pop(0)
748 except socket.error, detail:
751 def _get_finished_tasks(self, context):
753 events = self._get_network_events(context)
755 self._kill_all_running_slaves(context)
757 task = self._process_event(event, context)
761 def _process_event(self, event, context):
762 if event == self._listen_sock:
764 (conn, addr) = self._listen_sock.accept()
765 new_slave = self._accept_slave(conn, context)
766 elif event.running_task(context):
768 task = event._get_finished_task()
770 self._send_task_to_slave(event, context)
773 self._kill_timed_out_slaves(context)
774 except NetworkError, detail:
776 print "Slave %s failed (%s): rescheduling task %s" \
777 % (str(event), str(detail), str(task))
778 context._tasks.append(task)
779 self._send_tasks_to_slaves(context)
783 def _kill_timed_out_slaves(self, context):
784 timed_out = [a
for a
in self._all_slaves
if a.running_task(context) \
785 and a.get_contact_timed_out(self.heartbeat_timeout)]
786 for slave
in timed_out:
788 print(
"Did not hear from slave %s in %d seconds; rescheduling "
789 "task %s" % (str(slave), self.heartbeat_timeout, str(task)))
790 context._tasks.append(task)
791 if len(timed_out) > 0:
792 self._send_tasks_to_slaves(context)
794 def _kill_all_running_slaves(self, context):
795 running = [a
for a
in self._all_slaves
if a.running_task(context)]
796 for slave
in running:
798 context._tasks.append(task)
799 raise NetworkError(
"Did not hear from any running slave in "
800 "%d seconds" % self.heartbeat_timeout)
802 def _accept_slave(self, sock, context):
803 sock.setblocking(
True)
804 identifier = sock.recv(1024)
805 if identifier
and identifier
in self._starting_slaves:
806 slave = self._starting_slaves.pop(identifier)
807 slave._accept_connection(sock)
808 print "Identified slave %s " % str(slave)
809 self._init_slave(slave)
810 self._send_task_to_slave(slave, context)
813 print "Ignoring request from unknown slave"
815 def _init_slave(self, slave):
816 if _import_time_path[0] !=
'':
817 slave._set_python_search_path(_import_time_path[0])
818 if sys.path[0] !=
'' and sys.path[0] != _import_time_path[0]:
819 slave._set_python_search_path(sys.path[0])
821 def _get_network_events(self, context):
822 running = [a
for a
in self._all_slaves
if a.running_task(context)]
823 if len(running) == 0:
824 if len(context._tasks) == 0:
825 raise _NoMoreTasksError()
826 elif len(self._starting_slaves) == 0:
827 raise NoMoreSlavesError(
"Ran out of slaves to run tasks")
830 return util._poll_events(self._listen_sock, running,
831 self.heartbeat_timeout)
835 def get_module_version():
836 """get_module_version() -> std::string const"""
837 return _IMP_parallel.get_module_version()
840 """get_example_path(std::string fname) -> std::string"""
841 return _IMP_parallel.get_example_path(*args)
844 """get_data_path(std::string fname) -> std::string"""
845 return _IMP_parallel.get_data_path(*args)
846 import _version_check
847 _version_check.check_version(get_module_version())