10 import cPickle
as pickle
22 _import_time_path = sys.path[:]
25 """Base class for all errors specific to the parallel module"""
29 class _NoMoreTasksError(
Error):
34 """Error raised if all workers failed, so tasks cannot be run"""
43 class NetworkError(Error):
44 """Error raised if a problem occurs with the network"""
49 """Error raised if a worker has an unhandled exception"""
50 def __init__(self, exc, traceback, worker):
52 self.traceback = traceback
56 errstr = str(self.exc.__class__).replace(
"exceptions.",
"")
57 return "%s: %s from %s\nRemote traceback:\n%s" \
58 % (errstr, str(self.exc), str(self.worker), self.traceback)
60 class _Communicator(object):
61 """Simple support for sending Python pickled objects over the network"""
70 p.pack_string(pickle.dumps(obj, -1))
74 p.pack_string(pickle.dumps(obj, 0))
75 self._socket.sendall(p.get_buffer())
77 def get_data_pending(self):
78 return len(self._ibuffer) > 0
83 obj, self._ibuffer = self._unpickle(self._ibuffer)
84 if isinstance(obj, _ErrorWrapper):
88 except (IndexError, EOFError):
90 data = self._socket.recv(4096)
91 except socket.error
as detail:
93 % (str(self), str(detail)))
99 def _unpickle(self, ibuffer):
100 p = xdrlib.Unpacker(ibuffer)
101 obj = p.unpack_string()
102 return (pickle.loads(obj), ibuffer[p.get_position():])
106 """Representation of a single worker.
107 Each worker uses a single thread of execution (i.e. a single CPU core)
108 to run tasks sequentially.
109 Worker is an abstract class; instead of using this class directly, use
110 a subclass such as LocalWorker or SGEQsubWorkerArray."""
113 _Communicator.__init__(self)
114 self._state = workerstate.init
117 self.update_contact_time()
119 def _start(self, command, unique_id, output):
120 """Start the worker running on the remote host; override in
122 self._state = workerstate.started
124 def _accept_connection(self, sock):
126 self._state = workerstate.connected
127 self.update_contact_time()
129 def _set_python_search_path(self, path):
130 self._send(_SetPathAction(path))
132 def update_contact_time(self):
133 self.last_contact_time = time.time()
135 def get_contact_timed_out(self, timeout):
136 return (time.time() - self.last_contact_time) > timeout
138 def _start_task(self, task, context):
139 if not self._ready_for_task(context)
and not self._ready_for_task(
None):
140 raise TypeError(
"%s not ready for task" % str(self))
141 if self._context != context:
142 self._context = context
143 self._send(_ContextWrapper(context._startup))
144 self._state = workerstate.running_task
146 self._send(_TaskWrapper(task))
148 def _get_finished_task(self):
151 self.update_contact_time()
152 if isinstance(r, _HeartBeat):
153 if not self.get_data_pending():
160 self._state = workerstate.connected
167 self._state = workerstate.dead
170 def _ready_to_start(self):
171 return self._state == workerstate.init
173 def _ready_for_task(self, context):
174 return self._state == workerstate.connected \
175 and self._context == context
177 def _running_task(self, context):
178 return self._state == workerstate.running_task \
179 and self._context == context
187 class WorkerArray(object):
188 """Representation of an array of workers.
189 This is similar to Worker, except that it represents a collection of
190 workers that are controlled together, such as a batch submission system
191 array job on a compute cluster.
192 Worker is an abstract class; instead of using this class directly, use
193 a subclass such as SGEQsubWorkerArray."""
195 def _get_workers(self):
196 """Return a list of Worker objects contained within this array"""
200 """Do any necessary startup after all contained Workers have started"""
204 class SlaveArray(WorkerArray):
208 class LocalWorker(Worker):
209 """A worker running on the same machine as the manager."""
211 def _start(self, command, unique_id, output):
212 Worker._start(self, command, unique_id, output)
213 cmdline =
"%s %s" % (command, unique_id)
214 _run_background(cmdline, output)
217 return "<LocalWorker>"
221 class LocalSlave(LocalWorker):
225 class _SGEQsubWorker(Worker):
226 def __init__(self, array):
227 Worker.__init__(self)
231 def _start(self, command, unique_id, output):
232 Worker._start(self, command, unique_id, output)
233 self._array._worker_started(unique_id, output, self)
239 return "<SGE qsub worker, ID %s>" % jobid
243 """An array of workers on a Sun Grid Engine system, started with 'qsub'.
244 To use this class, the manager process must be running on a machine that
245 can submit Sun Grid Engine (SGE) jobs using the 'qsub' command (this
246 is termed a 'submit host' by SGE). The class starts an SGE job array
247 (every worker has the same SGE job ID, but a different task ID).
251 standard_options =
'-j y -cwd -r n -o sge-errors'
255 @param numworker The number of workers, which corresponds to the
256 number of tasks in the SGE job.
257 @param options A string of SGE options that are passed on the 'qsub'
258 command line. This is added to standard_options.
260 self._numworker = numworker
261 self._options = options
262 self._starting_workers = []
265 def _get_workers(self):
266 """Return a list of Worker objects contained within this array"""
267 return [_SGEQsubWorker(self)
for x
in range(self._numworker)]
269 def _worker_started(self, command, output, worker):
270 self._starting_workers.append((command, output, worker))
272 def _start(self, command):
273 qsub =
"qsub -S /bin/sh %s %s -t 1-%d" % \
274 (self._options, self.standard_options,
275 len(self._starting_workers))
278 (inp, out) = (a.stdin, a.stdout)
279 worker_uid =
" ".join([repr(s[0])
for s
in self._starting_workers])
280 worker_out =
" ".join([repr(s[1])
for s
in self._starting_workers])
281 inp.write(
"#!/bin/sh\n")
282 inp.write(
"uid=( '' %s )\n" % worker_uid)
283 inp.write(
"out=( '' %s )\n" % worker_out)
284 inp.write(
"myuid=${uid[$SGE_TASK_ID]}\n")
285 inp.write(
"myout=${out[$SGE_TASK_ID]}\n")
286 inp.write(
"%s $myuid > $myout 2>&1\n" % command)
288 outlines = out.readlines()
290 for line
in outlines:
291 print(line.rstrip(
'\r\n'))
292 a.require_clean_exit()
293 self._set_jobid(outlines)
294 self._starting_workers = []
296 def _set_jobid(self, outlines):
297 """Try to figure out the job ID from the SGE qsub output"""
298 if len(outlines) > 0:
299 m = re.compile(
r"\d+").search(outlines[0])
301 self._jobid = int(m.group())
302 for (num, worker)
in enumerate(self._starting_workers):
303 worker[2]._jobid =
"%d.%d" % (self._jobid, num+1)
311 class _SGEPEWorker(Worker):
313 Worker.__init__(self)
316 def _start(self, command, unique_id, output):
317 Worker._start(self, command, unique_id, output)
318 cmdline =
"qrsh -inherit -V %s %s %s" % (self._host, command, unique_id)
319 _run_background(cmdline, output)
322 return "<SGE PE worker on %s>" % self._host
326 """An array of workers in a Sun Grid Engine system parallel environment.
327 In order to use this class, the manager must be run via Sun Grid Engine's
328 'qsub' command and submitted to a parallel environment using the qsub
329 -pe option. This class will start workers on every node in the parallel
330 environment (including the node running the manager). Each worker is
331 started using the 'qrsh' command with the '-inherit' option."""
333 def _get_workers(self):
336 pe = os.environ[
'PE_HOSTFILE']
342 (node, num, queue) = line.split(
None, 2)
343 for i
in range(int(num)):
344 workers.append(_SGEPEWorker(node))
358 class Context(object):
359 """A collection of tasks that run in the same environment.
360 Context objects are typically created by calling Manager::get_context().
364 self._manager = manager
365 self._startup = startup
369 """Add a task to this context.
370 Tasks are any Python callable object that can be pickled (e.g. a
371 function or a class that implements the \_\_call\_\_ method). When
372 the task is run on the worker its arguments are the return value
373 from this context's startup function."""
374 self._tasks.append(task)
377 """Run all of the tasks on available workers, and return results.
378 If there are more tasks than workers, subsequent tasks are
379 started only once a running task completes: each worker only runs
380 a single task at a time. As each task completes, the return value(s)
381 from the task callable are returned from this method, as a
382 Python generator. Note that the results are returned in the order
383 that tasks complete, which may not be the same as the order they
386 @exception NoMoreWorkersError there are no workers available
387 to run the tasks (or they all failed during execution).
388 @exception RemoteError a worker encountered an unhandled exception.
389 @exception NetworkError the manager lost (or was unable to
390 establish) communication with any worker.
392 return self._manager._get_results_unordered(self)
396 """Manages workers and contexts.
399 connect_timeout = 7200
402 heartbeat_timeout = 7200
404 def __init__(self, python=None, host=None, output='worker%d.output'):
406 @param python If not None, the command to run to start a Python
407 interpreter that can import the IMP module. Otherwise,
408 the same interpreter that the manager is currently
409 using is used. This is passed to the shell, so a full
410 command line (including multiple words separated by
411 spaces) can be used if necessary.
412 @param host The hostname that workers use to connect back to the
413 manager. If not specified, the manager machine's
414 primary IP address is used. On multi-homed machines,
415 such as compute cluster headnodes, this may need to be
416 changed to allow all workers to reach the manager
417 (typically the name of the machine's internal network
418 address is needed). If only running local workers,
419 'localhost' can be used to prohibit connections
421 @param output A format string used to name worker output files. It is
422 given the numeric worker id, so for example the default
423 value 'worker\%d.output' will yield output files called
424 worker0.output, worker1.output, etc.
427 self._python = sys.executable
429 self._python = python
431 self._output = output
432 self._all_workers = []
433 self._starting_workers = {}
434 self._worker_arrays = []
439 self._host = socket.gethostbyname_ex(socket.gethostname())[-1][0]
440 self._listen_sock = _ListenSocket(self._host, self.connect_timeout)
443 """Add a Worker object."""
444 if hasattr(worker,
'_get_workers'):
445 self._worker_arrays.append(worker)
447 self._all_workers.append(worker)
450 """Create and return a new Context in which tasks can be run.
451 @param startup If not None, a callable (Python function or class
452 that implements the \_\_call\_\_ method) that sets up
453 the worker to run tasks. This method is only called
454 once per worker. The return values from this method
455 are passed to the task object when it runs on
457 @return A new Context object.
461 def _get_results_unordered(self, context):
462 """Run all of a context's tasks, and yield results"""
463 self._send_tasks_to_workers(context)
466 for task
in self._get_finished_tasks(context):
467 tasks_queued = len(context._tasks)
471 if len(context._tasks) > tasks_queued:
472 self._send_tasks_to_workers(context)
473 except _NoMoreTasksError:
476 def _start_all_workers(self):
477 for array
in self._worker_arrays:
478 self._all_workers.extend(array._get_workers())
480 command = (
"%s -c \"import IMP.parallel.worker_handler as s; s.main()\""
481 " %s %d") % (self._python, self._host,
482 self._listen_sock.port)
484 for (num, worker)
in enumerate(self._all_workers):
485 if worker._ready_to_start():
486 unique_id = self._get_unique_id(num)
487 self._starting_workers[unique_id] = worker
488 worker._start(command, unique_id, self._output % num)
490 for array
in self._worker_arrays:
491 array._start(command)
492 self._worker_arrays = []
494 def _get_unique_id(self, num):
496 for i
in range(0, 8):
497 id += chr(random.randint(0, 25) + ord(
'A'))
500 def _send_tasks_to_workers(self, context):
501 self._start_all_workers()
503 available_workers = [a
for a
in self._all_workers
504 if a._ready_for_task(context)] + \
505 [a
for a
in self._all_workers
506 if a._ready_for_task(
None)]
507 for worker
in available_workers:
508 if len(context._tasks) == 0:
511 self._send_task_to_worker(worker, context)
513 def _send_task_to_worker(self, worker, context):
514 if len(context._tasks) == 0:
516 t = context._tasks[0]
518 worker._start_task(t, context)
519 context._tasks.pop(0)
520 except socket.error
as detail:
523 def _get_finished_tasks(self, context):
525 events = self._get_network_events(context)
527 self._kill_all_running_workers(context)
529 task = self._process_event(event, context)
533 def _process_event(self, event, context):
534 if event == self._listen_sock:
536 (conn, addr) = self._listen_sock.accept()
537 new_worker = self._accept_worker(conn, context)
538 elif event._running_task(context):
540 task = event._get_finished_task()
542 self._send_task_to_worker(event, context)
545 self._kill_timed_out_workers(context)
546 except NetworkError
as detail:
548 print(
"Worker %s failed (%s): rescheduling task %s" \
549 % (str(event), str(detail), str(task)))
550 context._tasks.append(task)
551 self._send_tasks_to_workers(context)
555 def _kill_timed_out_workers(self, context):
556 timed_out = [a
for a
in self._all_workers
if a._running_task(context) \
557 and a.get_contact_timed_out(self.heartbeat_timeout)]
558 for worker
in timed_out:
559 task = worker._kill()
560 print(
"Did not hear from worker %s in %d seconds; rescheduling "
561 "task %s" % (str(worker), self.heartbeat_timeout, str(task)))
562 context._tasks.append(task)
563 if len(timed_out) > 0:
564 self._send_tasks_to_workers(context)
566 def _kill_all_running_workers(self, context):
567 running = [a
for a
in self._all_workers
if a._running_task(context)]
568 for worker
in running:
569 task = worker._kill()
570 context._tasks.append(task)
571 raise NetworkError(
"Did not hear from any running worker in "
572 "%d seconds" % self.heartbeat_timeout)
574 def _accept_worker(self, sock, context):
575 sock.setblocking(
True)
576 identifier = sock.recv(1024)
578 identifier = identifier.decode(
'ascii')
579 if identifier
and identifier
in self._starting_workers:
580 worker = self._starting_workers.pop(identifier)
581 worker._accept_connection(sock)
582 print(
"Identified worker %s " % str(worker))
583 self._init_worker(worker)
584 self._send_task_to_worker(worker, context)
587 print(
"Ignoring request from unknown worker")
589 def _init_worker(self, worker):
590 if _import_time_path[0] !=
'':
591 worker._set_python_search_path(_import_time_path[0])
592 if sys.path[0] !=
'' and sys.path[0] != _import_time_path[0]:
593 worker._set_python_search_path(sys.path[0])
595 def _get_network_events(self, context):
596 running = [a
for a
in self._all_workers
if a._running_task(context)]
597 if len(running) == 0:
598 if len(context._tasks) == 0:
599 raise _NoMoreTasksError()
600 elif len(self._starting_workers) == 0:
604 return util._poll_events(self._listen_sock, running,
605 self.heartbeat_timeout)
608 __version__ =
"2.14.0"
611 '''Return the version of this module, as a string'''
615 '''Return the fully-qualified name of this module'''
616 return "IMP::parallel"
619 '''Return the full path to one of this module's data files'''
621 return IMP._get_module_data_path(
"parallel", fname)
624 '''Return the full path to one of this module's example files'''
626 return IMP._get_module_example_path(
"parallel", fname)
Representation of a single worker.
Error raised if a worker has an unhandled exception.
An array of workers in a Sun Grid Engine system parallel environment.
def get_module_name
Return the fully-qualified name of this module.
Representation of an array of workers.
A collection of tasks that run in the same environment.
A worker running on the same machine as the manager.
Error raised if all workers failed, so tasks cannot be run.
def add_task
Add a task to this context.
Utilities for the IMP.parallel module.
def add_worker
Add a Worker object.
def deprecated_object
Python decorator to mark a class as deprecated.
An array of workers on a Sun Grid Engine system, started with 'qsub'.
Base class for all errors specific to the parallel module.
The general base class for IMP exceptions.
def get_module_version
Return the version of this module, as a string.
def get_example_path
Return the full path to one of this module's example files.
def get_context
Create and return a new Context in which tasks can be run.
def get_data_path
Return the full path to one of this module's data files.
Error raised if a problem occurs with the network.
Distribute IMP tasks to multiple processors or machines.
def get_results_unordered
Run all of the tasks on available workers, and return results.
Manages workers and contexts.