9 import cPickle
as pickle
21 _import_time_path = sys.path[:]
25 """Base class for all errors specific to the parallel module"""
29 class _NoMoreTasksError(
Error):
34 """Error raised if all workers failed, so tasks cannot be run"""
39 """Error raised if a problem occurs with the network"""
44 """Error raised if a worker has an unhandled exception"""
45 def __init__(self, exc, traceback, worker):
47 self.traceback = traceback
51 errstr = str(self.exc.__class__).replace(
"exceptions.",
"")
52 return "%s: %s from %s\nRemote traceback:\n%s" \
53 % (errstr, str(self.exc), str(self.worker), self.traceback)
56 class _Communicator(object):
57 """Simple support for sending Python pickled objects over the network"""
60 _slen_size = struct.calcsize(
'!i')
67 s = pickle.dumps(obj, -1)
68 self._socket.sendall(struct.pack(
'!i', len(s)) + s)
70 def get_data_pending(self):
71 return len(self._ibuffer) > 0
76 obj, self._ibuffer = self._unpickle(self._ibuffer)
77 if isinstance(obj, _ErrorWrapper):
81 except (IndexError, EOFError):
83 data = self._socket.recv(4096)
84 except socket.error
as detail:
86 % (str(self), str(detail)))
92 def _unpickle(self, ibuffer):
93 if len(ibuffer) < self._slen_size:
95 slen, = struct.unpack_from(
'!i', ibuffer)
96 if self._slen_size + slen > len(ibuffer):
98 return (pickle.loads(ibuffer[self._slen_size: self._slen_size + slen]),
99 ibuffer[self._slen_size + slen:])
103 """Representation of a single worker.
104 Each worker uses a single thread of execution (i.e. a single CPU core)
105 to run tasks sequentially.
106 Worker is an abstract class; instead of using this class directly, use
107 a subclass such as LocalWorker or SGEQsubWorkerArray."""
110 _Communicator.__init__(self)
111 self._state = workerstate.init
114 self.update_contact_time()
116 def _start(self, command, unique_id, output):
117 """Start the worker running on the remote host; override in
119 self._state = workerstate.started
121 def _accept_connection(self, sock):
123 self._state = workerstate.connected
124 self.update_contact_time()
126 def _set_python_search_path(self, path):
127 self._send(_SetPathAction(path))
129 def update_contact_time(self):
130 self.last_contact_time = time.time()
132 def get_contact_timed_out(self, timeout):
133 return (time.time() - self.last_contact_time) > timeout
135 def _start_task(self, task, context):
136 if not self._ready_for_task(context) \
137 and not self._ready_for_task(
None):
138 raise TypeError(
"%s not ready for task" % str(self))
139 if self._context != context:
140 self._context = context
141 self._send(_ContextWrapper(context._startup))
142 self._state = workerstate.running_task
144 self._send(_TaskWrapper(task))
146 def _get_finished_task(self):
149 self.update_contact_time()
150 if isinstance(r, _HeartBeat):
151 if not self.get_data_pending():
158 self._state = workerstate.connected
165 self._state = workerstate.dead
168 def _ready_to_start(self):
169 return self._state == workerstate.init
171 def _ready_for_task(self, context):
172 return self._state == workerstate.connected \
173 and self._context == context
175 def _running_task(self, context):
176 return self._state == workerstate.running_task \
177 and self._context == context
181 """Representation of an array of workers.
182 This is similar to Worker, except that it represents a collection of
183 workers that are controlled together, such as a batch submission system
184 array job on a compute cluster.
185 Worker is an abstract class; instead of using this class directly, use
186 a subclass such as SGEQsubWorkerArray."""
188 def _get_workers(self):
189 """Return a list of Worker objects contained within this array"""
193 """Do any necessary startup after all contained Workers have started"""
197 class LocalWorker(Worker):
198 """A worker running on the same machine as the manager."""
200 def _start(self, command, unique_id, output):
201 Worker._start(self, command, unique_id, output)
202 cmdline =
"%s %s" % (command, unique_id)
203 _run_background(cmdline, output)
206 return "<LocalWorker>"
209 class _SGEQsubWorker(Worker):
210 def __init__(self, array):
211 Worker.__init__(self)
215 def _start(self, command, unique_id, output):
216 Worker._start(self, command, unique_id, output)
217 self._array._worker_started(unique_id, output, self)
223 return "<SGE qsub worker, ID %s>" % jobid
227 """An array of workers on a Sun Grid Engine system, started with 'qsub'.
228 To use this class, the manager process must be running on a machine that
229 can submit Sun Grid Engine (SGE) jobs using the 'qsub' command (this
230 is termed a 'submit host' by SGE). The class starts an SGE job array
231 (every worker has the same SGE job ID, but a different task ID).
234 standard_options =
'-j y -cwd -r n -o sge-errors'
238 @param numworker The number of workers, which corresponds to the
239 number of tasks in the SGE job.
240 @param options A string of SGE options that are passed on the
241 'qsub' command line. This is added to standard_options.
243 self._numworker = numworker
244 self._options = options
245 self._starting_workers = []
248 def _get_workers(self):
249 """Return a list of Worker objects contained within this array"""
250 return [_SGEQsubWorker(self)
for x
in range(self._numworker)]
252 def _worker_started(self, command, output, worker):
253 self._starting_workers.append((command, output, worker))
255 def _start(self, command):
256 qsub =
"qsub -S /bin/sh %s %s -t 1-%d" % \
257 (self._options, self.standard_options,
258 len(self._starting_workers))
261 (inp, out) = (a.stdin, a.stdout)
262 worker_uid =
" ".join([repr(s[0])
for s
in self._starting_workers])
263 worker_out =
" ".join([repr(s[1])
for s
in self._starting_workers])
264 inp.write(
"#!/bin/sh\n")
265 inp.write(
"uid=( '' %s )\n" % worker_uid)
266 inp.write(
"out=( '' %s )\n" % worker_out)
267 inp.write(
"myuid=${uid[$SGE_TASK_ID]}\n")
268 inp.write(
"myout=${out[$SGE_TASK_ID]}\n")
269 inp.write(
"%s $myuid > $myout 2>&1\n" % command)
271 outlines = out.readlines()
273 for line
in outlines:
274 print(line.rstrip(
'\r\n'))
275 a.require_clean_exit()
276 self._set_jobid(outlines)
277 self._starting_workers = []
279 def _set_jobid(self, outlines):
280 """Try to figure out the job ID from the SGE qsub output"""
281 if len(outlines) > 0:
282 m = re.compile(
r"\d+").search(outlines[0])
284 self._jobid = int(m.group())
285 for (num, worker)
in enumerate(self._starting_workers):
286 worker[2]._jobid =
"%d.%d" % (self._jobid, num+1)
289 class _SGEPEWorker(
Worker):
291 Worker.__init__(self)
294 def _start(self, command, unique_id, output):
295 Worker._start(self, command, unique_id, output)
296 cmdline =
"qrsh -inherit -V %s %s %s" % (self._host, command,
298 _run_background(cmdline, output)
301 return "<SGE PE worker on %s>" % self._host
305 """An array of workers in a Sun Grid Engine system parallel environment.
306 In order to use this class, the manager must be run via
307 Sun Grid Engine's 'qsub' command and submitted to a parallel
308 environment using the qsub -pe option. This class will start workers
309 on every node in the parallel environment (including the node running
310 the manager). Each worker is started using the 'qrsh' command with
311 the '-inherit' option."""
313 def _get_workers(self):
316 pe = os.environ[
'PE_HOSTFILE']
322 (node, num, queue) = line.split(
None, 2)
323 for i
in range(int(num)):
324 workers.append(_SGEPEWorker(node))
334 """A collection of tasks that run in the same environment.
335 Context objects are typically created by calling Manager::get_context().
339 self._manager = manager
340 self._startup = startup
344 """Add a task to this context.
345 Tasks are any Python callable object that can be pickled (e.g. a
346 function or a class that implements the \_\_call\_\_ method).
347 When the task is run on the worker its arguments are the return
348 value from this context's startup function."""
349 self._tasks.append(task)
352 """Run all of the tasks on available workers, and return results.
353 If there are more tasks than workers, subsequent tasks are
354 started only once a running task completes: each worker only runs
355 a single task at a time. As each task completes, the return value(s)
356 from the task callable are returned from this method, as a
357 Python generator. Note that the results are returned in the order
358 that tasks complete, which may not be the same as the order they
361 @exception NoMoreWorkersError there are no workers available
362 to run the tasks (or they all failed during execution).
363 @exception RemoteError a worker encountered an unhandled exception.
364 @exception NetworkError the manager lost (or was unable to
365 establish) communication with any worker.
367 return self._manager._get_results_unordered(self)
371 """Manages workers and contexts.
374 connect_timeout = 7200
377 heartbeat_timeout = 7200
379 def __init__(self, python=None, host=None, output='worker%d.output'):
381 @param python If not None, the command to run to start a Python
382 interpreter that can import the IMP module. Otherwise,
383 the same interpreter that the manager is currently
384 using is used. This is passed to the shell, so a full
385 command line (including multiple words separated by
386 spaces) can be used if necessary.
387 @param host The hostname that workers use to connect back to the
388 manager. If not specified, the manager machine's
389 primary IP address is used. On multi-homed machines,
390 such as compute cluster headnodes, this may need to be
391 changed to allow all workers to reach the manager
392 (typically the name of the machine's internal network
393 address is needed). If only running local workers,
394 'localhost' can be used to prohibit connections
396 @param output A format string used to name worker output files.
397 It is given the numeric worker id, so for example the default
398 value 'worker\%d.output' will yield output files called
399 worker0.output, worker1.output, etc.
402 self._python = sys.executable
404 self._python = python
406 self._output = output
407 self._all_workers = []
408 self._starting_workers = {}
409 self._worker_arrays = []
414 self._host = socket.gethostbyname_ex(socket.gethostname())[-1][0]
415 self._listen_sock = _ListenSocket(self._host, self.connect_timeout)
418 """Add a Worker object."""
419 if hasattr(worker,
'_get_workers'):
420 self._worker_arrays.append(worker)
422 self._all_workers.append(worker)
425 """Create and return a new Context in which tasks can be run.
426 @param startup If not None, a callable (Python function or class
427 that implements the \_\_call\_\_ method) that sets up
428 the worker to run tasks. This method is only called
429 once per worker. The return values from this method
430 are passed to the task object when it runs on
432 @return A new Context object.
436 def _get_results_unordered(self, context):
437 """Run all of a context's tasks, and yield results"""
438 self._send_tasks_to_workers(context)
441 for task
in self._get_finished_tasks(context):
442 tasks_queued = len(context._tasks)
446 if len(context._tasks) > tasks_queued:
447 self._send_tasks_to_workers(context)
448 except _NoMoreTasksError:
451 def _start_all_workers(self):
452 for array
in self._worker_arrays:
453 self._all_workers.extend(array._get_workers())
456 (
"%s -c \"import IMP.parallel.worker_handler as s; s.main()\""
457 " %s %d") % (self._python, self._host, self._listen_sock.port)
459 for (num, worker)
in enumerate(self._all_workers):
460 if worker._ready_to_start():
461 unique_id = self._get_unique_id(num)
462 self._starting_workers[unique_id] = worker
463 worker._start(command, unique_id, self._output % num)
465 for array
in self._worker_arrays:
466 array._start(command)
467 self._worker_arrays = []
469 def _get_unique_id(self, num):
471 for i
in range(0, 8):
472 id += chr(random.randint(0, 25) + ord(
'A'))
475 def _send_tasks_to_workers(self, context):
476 self._start_all_workers()
478 available_workers = \
479 [a
for a
in self._all_workers
if a._ready_for_task(context)] + \
480 [a
for a
in self._all_workers
if a._ready_for_task(
None)]
481 for worker
in available_workers:
482 if len(context._tasks) == 0:
485 self._send_task_to_worker(worker, context)
487 def _send_task_to_worker(self, worker, context):
488 if len(context._tasks) == 0:
490 t = context._tasks[0]
492 worker._start_task(t, context)
493 context._tasks.pop(0)
497 def _get_finished_tasks(self, context):
499 events = self._get_network_events(context)
501 self._kill_all_running_workers(context)
503 task = self._process_event(event, context)
507 def _process_event(self, event, context):
508 if event == self._listen_sock:
510 (conn, addr) = self._listen_sock.accept()
511 self._accept_worker(conn, context)
512 elif event._running_task(context):
514 task = event._get_finished_task()
516 self._send_task_to_worker(event, context)
519 self._kill_timed_out_workers(context)
520 except NetworkError
as detail:
522 print(
"Worker %s failed (%s): rescheduling task %s"
523 % (str(event), str(detail), str(task)))
524 context._tasks.append(task)
525 self._send_tasks_to_workers(context)
529 def _kill_timed_out_workers(self, context):
530 timed_out = [a
for a
in self._all_workers
if a._running_task(context)
531 and a.get_contact_timed_out(self.heartbeat_timeout)]
532 for worker
in timed_out:
533 task = worker._kill()
534 print(
"Did not hear from worker %s in %d seconds; rescheduling "
535 "task %s" % (str(worker), self.heartbeat_timeout, str(task)))
536 context._tasks.append(task)
537 if len(timed_out) > 0:
538 self._send_tasks_to_workers(context)
540 def _kill_all_running_workers(self, context):
541 running = [a
for a
in self._all_workers
if a._running_task(context)]
542 for worker
in running:
543 task = worker._kill()
544 context._tasks.append(task)
545 raise NetworkError(
"Did not hear from any running worker in "
546 "%d seconds" % self.heartbeat_timeout)
548 def _accept_worker(self, sock, context):
549 sock.setblocking(
True)
550 identifier = sock.recv(1024)
552 identifier = identifier.decode(
'ascii')
553 if identifier
and identifier
in self._starting_workers:
554 worker = self._starting_workers.pop(identifier)
555 worker._accept_connection(sock)
556 print(
"Identified worker %s " % str(worker))
557 self._init_worker(worker)
558 self._send_task_to_worker(worker, context)
561 print(
"Ignoring request from unknown worker")
563 def _init_worker(self, worker):
564 if _import_time_path[0] !=
'':
565 worker._set_python_search_path(_import_time_path[0])
566 if sys.path[0] !=
'' and sys.path[0] != _import_time_path[0]:
567 worker._set_python_search_path(sys.path[0])
569 def _get_network_events(self, context):
570 running = [a
for a
in self._all_workers
if a._running_task(context)]
571 if len(running) == 0:
572 if len(context._tasks) == 0:
573 raise _NoMoreTasksError()
574 elif len(self._starting_workers) == 0:
579 return IMP.parallel.util._poll_events(
580 self._listen_sock, running, self.heartbeat_timeout)
583 __version__ =
"2.20.0"
586 '''Return the version of this module, as a string'''
590 '''Return the fully-qualified name of this module'''
591 return "IMP::parallel"
594 '''Return the full path to one of this module's data files'''
596 return IMP._get_module_data_path(
"parallel", fname)
599 '''Return the full path to one of this module's example files'''
601 return IMP._get_module_example_path(
"parallel", fname)
Representation of a single worker.
Error raised if a worker has an unhandled exception.
An array of workers in a Sun Grid Engine system parallel environment.
def get_module_name
Return the fully-qualified name of this module.
Representation of an array of workers.
A collection of tasks that run in the same environment.
A worker running on the same machine as the manager.
Error raised if all workers failed, so tasks cannot be run.
def add_task
Add a task to this context.
Utilities for the IMP.parallel module.
def add_worker
Add a Worker object.
An array of workers on a Sun Grid Engine system, started with 'qsub'.
Base class for all errors specific to the parallel module.
The general base class for IMP exceptions.
def get_module_version
Return the version of this module, as a string.
def get_example_path
Return the full path to one of this module's example files.
def get_context
Create and return a new Context in which tasks can be run.
def get_data_path
Return the full path to one of this module's data files.
Error raised if a problem occurs with the network.
Distribute IMP tasks to multiple processors or machines.
def get_results_unordered
Run all of the tasks on available workers, and return results.
Manages workers and contexts.