9 import cPickle
as pickle
21 _import_time_path = sys.path[:]
25 """Base class for all errors specific to the parallel module"""
29 class _NoMoreTasksError(
Error):
34 """Error raised if all workers failed, so tasks cannot be run"""
43 class NetworkError(Error):
44 """Error raised if a problem occurs with the network"""
49 """Error raised if a worker has an unhandled exception"""
50 def __init__(self, exc, traceback, worker):
52 self.traceback = traceback
56 errstr = str(self.exc.__class__).replace(
"exceptions.",
"")
57 return "%s: %s from %s\nRemote traceback:\n%s" \
58 % (errstr, str(self.exc), str(self.worker), self.traceback)
61 class _Communicator(object):
62 """Simple support for sending Python pickled objects over the network"""
71 p.pack_string(pickle.dumps(obj, -1))
75 p.pack_string(pickle.dumps(obj, 0))
76 self._socket.sendall(p.get_buffer())
78 def get_data_pending(self):
79 return len(self._ibuffer) > 0
84 obj, self._ibuffer = self._unpickle(self._ibuffer)
85 if isinstance(obj, _ErrorWrapper):
89 except (IndexError, EOFError):
91 data = self._socket.recv(4096)
92 except socket.error
as detail:
94 % (str(self), str(detail)))
100 def _unpickle(self, ibuffer):
101 p = xdrlib.Unpacker(ibuffer)
102 obj = p.unpack_string()
103 return (pickle.loads(obj), ibuffer[p.get_position():])
107 """Representation of a single worker.
108 Each worker uses a single thread of execution (i.e. a single CPU core)
109 to run tasks sequentially.
110 Worker is an abstract class; instead of using this class directly, use
111 a subclass such as LocalWorker or SGEQsubWorkerArray."""
114 _Communicator.__init__(self)
115 self._state = workerstate.init
118 self.update_contact_time()
120 def _start(self, command, unique_id, output):
121 """Start the worker running on the remote host; override in
123 self._state = workerstate.started
125 def _accept_connection(self, sock):
127 self._state = workerstate.connected
128 self.update_contact_time()
130 def _set_python_search_path(self, path):
131 self._send(_SetPathAction(path))
133 def update_contact_time(self):
134 self.last_contact_time = time.time()
136 def get_contact_timed_out(self, timeout):
137 return (time.time() - self.last_contact_time) > timeout
139 def _start_task(self, task, context):
140 if not self._ready_for_task(context) \
141 and not self._ready_for_task(
None):
142 raise TypeError(
"%s not ready for task" % str(self))
143 if self._context != context:
144 self._context = context
145 self._send(_ContextWrapper(context._startup))
146 self._state = workerstate.running_task
148 self._send(_TaskWrapper(task))
150 def _get_finished_task(self):
153 self.update_contact_time()
154 if isinstance(r, _HeartBeat):
155 if not self.get_data_pending():
162 self._state = workerstate.connected
169 self._state = workerstate.dead
172 def _ready_to_start(self):
173 return self._state == workerstate.init
175 def _ready_for_task(self, context):
176 return self._state == workerstate.connected \
177 and self._context == context
179 def _running_task(self, context):
180 return self._state == workerstate.running_task \
181 and self._context == context
189 class WorkerArray(object):
190 """Representation of an array of workers.
191 This is similar to Worker, except that it represents a collection of
192 workers that are controlled together, such as a batch submission system
193 array job on a compute cluster.
194 Worker is an abstract class; instead of using this class directly, use
195 a subclass such as SGEQsubWorkerArray."""
197 def _get_workers(self):
198 """Return a list of Worker objects contained within this array"""
202 """Do any necessary startup after all contained Workers have started"""
207 class SlaveArray(WorkerArray):
211 class LocalWorker(Worker):
212 """A worker running on the same machine as the manager."""
214 def _start(self, command, unique_id, output):
215 Worker._start(self, command, unique_id, output)
216 cmdline =
"%s %s" % (command, unique_id)
217 _run_background(cmdline, output)
220 return "<LocalWorker>"
224 class LocalSlave(LocalWorker):
228 class _SGEQsubWorker(Worker):
229 def __init__(self, array):
230 Worker.__init__(self)
234 def _start(self, command, unique_id, output):
235 Worker._start(self, command, unique_id, output)
236 self._array._worker_started(unique_id, output, self)
242 return "<SGE qsub worker, ID %s>" % jobid
246 """An array of workers on a Sun Grid Engine system, started with 'qsub'.
247 To use this class, the manager process must be running on a machine that
248 can submit Sun Grid Engine (SGE) jobs using the 'qsub' command (this
249 is termed a 'submit host' by SGE). The class starts an SGE job array
250 (every worker has the same SGE job ID, but a different task ID).
253 standard_options =
'-j y -cwd -r n -o sge-errors'
257 @param numworker The number of workers, which corresponds to the
258 number of tasks in the SGE job.
259 @param options A string of SGE options that are passed on the
260 'qsub' command line. This is added to standard_options.
262 self._numworker = numworker
263 self._options = options
264 self._starting_workers = []
267 def _get_workers(self):
268 """Return a list of Worker objects contained within this array"""
269 return [_SGEQsubWorker(self)
for x
in range(self._numworker)]
271 def _worker_started(self, command, output, worker):
272 self._starting_workers.append((command, output, worker))
274 def _start(self, command):
275 qsub =
"qsub -S /bin/sh %s %s -t 1-%d" % \
276 (self._options, self.standard_options,
277 len(self._starting_workers))
280 (inp, out) = (a.stdin, a.stdout)
281 worker_uid =
" ".join([repr(s[0])
for s
in self._starting_workers])
282 worker_out =
" ".join([repr(s[1])
for s
in self._starting_workers])
283 inp.write(
"#!/bin/sh\n")
284 inp.write(
"uid=( '' %s )\n" % worker_uid)
285 inp.write(
"out=( '' %s )\n" % worker_out)
286 inp.write(
"myuid=${uid[$SGE_TASK_ID]}\n")
287 inp.write(
"myout=${out[$SGE_TASK_ID]}\n")
288 inp.write(
"%s $myuid > $myout 2>&1\n" % command)
290 outlines = out.readlines()
292 for line
in outlines:
293 print(line.rstrip(
'\r\n'))
294 a.require_clean_exit()
295 self._set_jobid(outlines)
296 self._starting_workers = []
298 def _set_jobid(self, outlines):
299 """Try to figure out the job ID from the SGE qsub output"""
300 if len(outlines) > 0:
301 m = re.compile(
r"\d+").search(outlines[0])
303 self._jobid = int(m.group())
304 for (num, worker)
in enumerate(self._starting_workers):
305 worker[2]._jobid =
"%d.%d" % (self._jobid, num+1)
313 class _SGEPEWorker(Worker):
315 Worker.__init__(self)
318 def _start(self, command, unique_id, output):
319 Worker._start(self, command, unique_id, output)
320 cmdline =
"qrsh -inherit -V %s %s %s" % (self._host, command,
322 _run_background(cmdline, output)
325 return "<SGE PE worker on %s>" % self._host
329 """An array of workers in a Sun Grid Engine system parallel environment.
330 In order to use this class, the manager must be run via
331 Sun Grid Engine's 'qsub' command and submitted to a parallel
332 environment using the qsub -pe option. This class will start workers
333 on every node in the parallel environment (including the node running
334 the manager). Each worker is started using the 'qrsh' command with
335 the '-inherit' option."""
337 def _get_workers(self):
340 pe = os.environ[
'PE_HOSTFILE']
346 (node, num, queue) = line.split(
None, 2)
347 for i
in range(int(num)):
348 workers.append(_SGEPEWorker(node))
362 class Context(object):
363 """A collection of tasks that run in the same environment.
364 Context objects are typically created by calling Manager::get_context().
368 self._manager = manager
369 self._startup = startup
373 """Add a task to this context.
374 Tasks are any Python callable object that can be pickled (e.g. a
375 function or a class that implements the \_\_call\_\_ method).
376 When the task is run on the worker its arguments are the return
377 value from this context's startup function."""
378 self._tasks.append(task)
381 """Run all of the tasks on available workers, and return results.
382 If there are more tasks than workers, subsequent tasks are
383 started only once a running task completes: each worker only runs
384 a single task at a time. As each task completes, the return value(s)
385 from the task callable are returned from this method, as a
386 Python generator. Note that the results are returned in the order
387 that tasks complete, which may not be the same as the order they
390 @exception NoMoreWorkersError there are no workers available
391 to run the tasks (or they all failed during execution).
392 @exception RemoteError a worker encountered an unhandled exception.
393 @exception NetworkError the manager lost (or was unable to
394 establish) communication with any worker.
396 return self._manager._get_results_unordered(self)
400 """Manages workers and contexts.
403 connect_timeout = 7200
406 heartbeat_timeout = 7200
408 def __init__(self, python=None, host=None, output='worker%d.output'):
410 @param python If not None, the command to run to start a Python
411 interpreter that can import the IMP module. Otherwise,
412 the same interpreter that the manager is currently
413 using is used. This is passed to the shell, so a full
414 command line (including multiple words separated by
415 spaces) can be used if necessary.
416 @param host The hostname that workers use to connect back to the
417 manager. If not specified, the manager machine's
418 primary IP address is used. On multi-homed machines,
419 such as compute cluster headnodes, this may need to be
420 changed to allow all workers to reach the manager
421 (typically the name of the machine's internal network
422 address is needed). If only running local workers,
423 'localhost' can be used to prohibit connections
425 @param output A format string used to name worker output files.
426 It is given the numeric worker id, so for example the default
427 value 'worker\%d.output' will yield output files called
428 worker0.output, worker1.output, etc.
431 self._python = sys.executable
433 self._python = python
435 self._output = output
436 self._all_workers = []
437 self._starting_workers = {}
438 self._worker_arrays = []
443 self._host = socket.gethostbyname_ex(socket.gethostname())[-1][0]
444 self._listen_sock = _ListenSocket(self._host, self.connect_timeout)
447 """Add a Worker object."""
448 if hasattr(worker,
'_get_workers'):
449 self._worker_arrays.append(worker)
451 self._all_workers.append(worker)
454 """Create and return a new Context in which tasks can be run.
455 @param startup If not None, a callable (Python function or class
456 that implements the \_\_call\_\_ method) that sets up
457 the worker to run tasks. This method is only called
458 once per worker. The return values from this method
459 are passed to the task object when it runs on
461 @return A new Context object.
465 def _get_results_unordered(self, context):
466 """Run all of a context's tasks, and yield results"""
467 self._send_tasks_to_workers(context)
470 for task
in self._get_finished_tasks(context):
471 tasks_queued = len(context._tasks)
475 if len(context._tasks) > tasks_queued:
476 self._send_tasks_to_workers(context)
477 except _NoMoreTasksError:
480 def _start_all_workers(self):
481 for array
in self._worker_arrays:
482 self._all_workers.extend(array._get_workers())
485 (
"%s -c \"import IMP.parallel.worker_handler as s; s.main()\""
486 " %s %d") % (self._python, self._host, self._listen_sock.port)
488 for (num, worker)
in enumerate(self._all_workers):
489 if worker._ready_to_start():
490 unique_id = self._get_unique_id(num)
491 self._starting_workers[unique_id] = worker
492 worker._start(command, unique_id, self._output % num)
494 for array
in self._worker_arrays:
495 array._start(command)
496 self._worker_arrays = []
498 def _get_unique_id(self, num):
500 for i
in range(0, 8):
501 id += chr(random.randint(0, 25) + ord(
'A'))
504 def _send_tasks_to_workers(self, context):
505 self._start_all_workers()
507 available_workers = \
508 [a
for a
in self._all_workers
if a._ready_for_task(context)] + \
509 [a
for a
in self._all_workers
if a._ready_for_task(
None)]
510 for worker
in available_workers:
511 if len(context._tasks) == 0:
514 self._send_task_to_worker(worker, context)
516 def _send_task_to_worker(self, worker, context):
517 if len(context._tasks) == 0:
519 t = context._tasks[0]
521 worker._start_task(t, context)
522 context._tasks.pop(0)
526 def _get_finished_tasks(self, context):
528 events = self._get_network_events(context)
530 self._kill_all_running_workers(context)
532 task = self._process_event(event, context)
536 def _process_event(self, event, context):
537 if event == self._listen_sock:
539 (conn, addr) = self._listen_sock.accept()
540 self._accept_worker(conn, context)
541 elif event._running_task(context):
543 task = event._get_finished_task()
545 self._send_task_to_worker(event, context)
548 self._kill_timed_out_workers(context)
549 except NetworkError
as detail:
551 print(
"Worker %s failed (%s): rescheduling task %s"
552 % (str(event), str(detail), str(task)))
553 context._tasks.append(task)
554 self._send_tasks_to_workers(context)
558 def _kill_timed_out_workers(self, context):
559 timed_out = [a
for a
in self._all_workers
if a._running_task(context)
560 and a.get_contact_timed_out(self.heartbeat_timeout)]
561 for worker
in timed_out:
562 task = worker._kill()
563 print(
"Did not hear from worker %s in %d seconds; rescheduling "
564 "task %s" % (str(worker), self.heartbeat_timeout, str(task)))
565 context._tasks.append(task)
566 if len(timed_out) > 0:
567 self._send_tasks_to_workers(context)
569 def _kill_all_running_workers(self, context):
570 running = [a
for a
in self._all_workers
if a._running_task(context)]
571 for worker
in running:
572 task = worker._kill()
573 context._tasks.append(task)
574 raise NetworkError(
"Did not hear from any running worker in "
575 "%d seconds" % self.heartbeat_timeout)
577 def _accept_worker(self, sock, context):
578 sock.setblocking(
True)
579 identifier = sock.recv(1024)
581 identifier = identifier.decode(
'ascii')
582 if identifier
and identifier
in self._starting_workers:
583 worker = self._starting_workers.pop(identifier)
584 worker._accept_connection(sock)
585 print(
"Identified worker %s " % str(worker))
586 self._init_worker(worker)
587 self._send_task_to_worker(worker, context)
590 print(
"Ignoring request from unknown worker")
592 def _init_worker(self, worker):
593 if _import_time_path[0] !=
'':
594 worker._set_python_search_path(_import_time_path[0])
595 if sys.path[0] !=
'' and sys.path[0] != _import_time_path[0]:
596 worker._set_python_search_path(sys.path[0])
598 def _get_network_events(self, context):
599 running = [a
for a
in self._all_workers
if a._running_task(context)]
600 if len(running) == 0:
601 if len(context._tasks) == 0:
602 raise _NoMoreTasksError()
603 elif len(self._starting_workers) == 0:
608 return IMP.parallel.util._poll_events(
609 self._listen_sock, running, self.heartbeat_timeout)
612 __version__ =
"2.16.0"
615 '''Return the version of this module, as a string'''
619 '''Return the fully-qualified name of this module'''
620 return "IMP::parallel"
623 '''Return the full path to one of this module's data files'''
625 return IMP._get_module_data_path(
"parallel", fname)
628 '''Return the full path to one of this module's example files'''
630 return IMP._get_module_example_path(
"parallel", fname)
Representation of a single worker.
Error raised if a worker has an unhandled exception.
An array of workers in a Sun Grid Engine system parallel environment.
def get_module_name
Return the fully-qualified name of this module.
Representation of an array of workers.
A collection of tasks that run in the same environment.
A worker running on the same machine as the manager.
Error raised if all workers failed, so tasks cannot be run.
def add_task
Add a task to this context.
Utilities for the IMP.parallel module.
def add_worker
Add a Worker object.
def deprecated_object
Python decorator to mark a class as deprecated.
An array of workers on a Sun Grid Engine system, started with 'qsub'.
Base class for all errors specific to the parallel module.
The general base class for IMP exceptions.
def get_module_version
Return the version of this module, as a string.
def get_example_path
Return the full path to one of this module's example files.
def get_context
Create and return a new Context in which tasks can be run.
def get_data_path
Return the full path to one of this module's data files.
Error raised if a problem occurs with the network.
Distribute IMP tasks to multiple processors or machines.
def get_results_unordered
Run all of the tasks on available workers, and return results.
Manages workers and contexts.