18 _import_time_path = sys.path[:]
22 """Base class for all errors specific to the parallel module"""
26 class _NoMoreTasksError(
Error):
31 """Error raised if all workers failed, so tasks cannot be run"""
36 """Error raised if a problem occurs with the network"""
41 """Error raised if a worker has an unhandled exception"""
42 def __init__(self, exc, traceback, worker):
44 self.traceback = traceback
48 errstr = str(self.exc.__class__).replace(
"exceptions.",
"")
49 return "%s: %s from %s\nRemote traceback:\n%s" \
50 % (errstr, str(self.exc), str(self.worker), self.traceback)
54 """Simple support for sending Python pickled objects over the network"""
57 _slen_size = struct.calcsize(
'!i')
64 s = pickle.dumps(obj, -1)
65 self._socket.sendall(struct.pack(
'!i', len(s)) + s)
67 def get_data_pending(self):
68 return len(self._ibuffer) > 0
73 obj, self._ibuffer = self._unpickle(self._ibuffer)
74 if isinstance(obj, _ErrorWrapper):
78 except (IndexError, EOFError):
80 data = self._socket.recv(4096)
81 except socket.error
as detail:
83 % (str(self), str(detail)))
89 def _unpickle(self, ibuffer):
90 if len(ibuffer) < self._slen_size:
92 slen, = struct.unpack_from(
'!i', ibuffer)
93 if self._slen_size + slen > len(ibuffer):
95 return (pickle.loads(ibuffer[self._slen_size: self._slen_size + slen]),
96 ibuffer[self._slen_size + slen:])
100 """Representation of a single worker.
101 Each worker uses a single thread of execution (i.e. a single CPU core)
102 to run tasks sequentially.
103 Worker is an abstract class; instead of using this class directly, use
104 a subclass such as LocalWorker or SGEQsubWorkerArray."""
107 _Communicator.__init__(self)
108 self._state = workerstate.init
111 self.update_contact_time()
113 def _start(self, command, unique_id, output):
114 """Start the worker running on the remote host; override in
116 self._state = workerstate.started
118 def _accept_connection(self, sock):
120 self._state = workerstate.connected
121 self.update_contact_time()
123 def _set_python_search_path(self, path):
124 self._send(_SetPathAction(path))
126 def update_contact_time(self):
127 self.last_contact_time = time.time()
129 def get_contact_timed_out(self, timeout):
130 return (time.time() - self.last_contact_time) > timeout
132 def _start_task(self, task, context):
133 if not self._ready_for_task(context) \
134 and not self._ready_for_task(
None):
135 raise TypeError(
"%s not ready for task" % str(self))
136 if self._context != context:
137 self._context = context
138 self._send(_ContextWrapper(context._startup))
139 self._state = workerstate.running_task
141 self._send(_TaskWrapper(task))
143 def _get_finished_task(self):
146 self.update_contact_time()
147 if isinstance(r, _HeartBeat):
148 if not self.get_data_pending():
155 self._state = workerstate.connected
162 self._state = workerstate.dead
165 def _ready_to_start(self):
166 return self._state == workerstate.init
168 def _ready_for_task(self, context):
169 return self._state == workerstate.connected \
170 and self._context == context
172 def _running_task(self, context):
173 return self._state == workerstate.running_task \
174 and self._context == context
178 """Representation of an array of workers.
179 This is similar to Worker, except that it represents a collection of
180 workers that are controlled together, such as a batch submission system
181 array job on a compute cluster.
182 Worker is an abstract class; instead of using this class directly, use
183 a subclass such as SGEQsubWorkerArray."""
185 def _get_workers(self):
186 """Return a list of Worker objects contained within this array"""
190 """Do any necessary startup after all contained Workers have started"""
194 class LocalWorker(Worker):
195 """A worker running on the same machine as the manager."""
197 def _start(self, command, unique_id, output):
198 Worker._start(self, command, unique_id, output)
199 cmdline =
"%s %s" % (command, unique_id)
200 _run_background(cmdline, output)
203 return "<LocalWorker>"
206 class _SGEQsubWorker(Worker):
207 def __init__(self, array):
208 Worker.__init__(self)
212 def _start(self, command, unique_id, output):
213 Worker._start(self, command, unique_id, output)
214 self._array._worker_started(unique_id, output, self)
220 return "<SGE qsub worker, ID %s>" % jobid
224 """An array of workers on a Sun Grid Engine system, started with 'qsub'.
225 To use this class, the manager process must be running on a machine that
226 can submit Sun Grid Engine (SGE) jobs using the 'qsub' command (this
227 is termed a 'submit host' by SGE). The class starts an SGE job array
228 (every worker has the same SGE job ID, but a different task ID).
231 standard_options =
'-j y -cwd -r n -o sge-errors'
235 @param numworker The number of workers, which corresponds to the
236 number of tasks in the SGE job.
237 @param options A string of SGE options that are passed on the
238 'qsub' command line. This is added to standard_options.
240 self._numworker = numworker
241 self._options = options
242 self._starting_workers = []
245 def _get_workers(self):
246 """Return a list of Worker objects contained within this array"""
247 return [_SGEQsubWorker(self)
for x
in range(self._numworker)]
249 def _worker_started(self, command, output, worker):
250 self._starting_workers.append((command, output, worker))
252 def _start(self, command):
253 qsub =
"qsub -S /bin/sh %s %s -t 1-%d" % \
254 (self._options, self.standard_options,
255 len(self._starting_workers))
258 (inp, out) = (a.stdin, a.stdout)
259 worker_uid =
" ".join([repr(s[0])
for s
in self._starting_workers])
260 worker_out =
" ".join([repr(s[1])
for s
in self._starting_workers])
261 inp.write(
"#!/bin/sh\n")
262 inp.write(
"uid=( '' %s )\n" % worker_uid)
263 inp.write(
"out=( '' %s )\n" % worker_out)
264 inp.write(
"myuid=${uid[$SGE_TASK_ID]}\n")
265 inp.write(
"myout=${out[$SGE_TASK_ID]}\n")
266 inp.write(
"%s $myuid > $myout 2>&1\n" % command)
268 outlines = out.readlines()
270 for line
in outlines:
271 print(line.rstrip(
'\r\n'))
272 a.require_clean_exit()
273 self._set_jobid(outlines)
274 self._starting_workers = []
276 def _set_jobid(self, outlines):
277 """Try to figure out the job ID from the SGE qsub output"""
278 if len(outlines) > 0:
279 m = re.compile(
r"\d+").search(outlines[0])
281 self._jobid = int(m.group())
282 for (num, worker)
in enumerate(self._starting_workers):
283 worker[2]._jobid =
"%d.%d" % (self._jobid, num+1)
286 class _SGEPEWorker(
Worker):
288 Worker.__init__(self)
291 def _start(self, command, unique_id, output):
292 Worker._start(self, command, unique_id, output)
293 cmdline =
"qrsh -inherit -V %s %s %s" % (self._host, command,
295 _run_background(cmdline, output)
298 return "<SGE PE worker on %s>" % self._host
302 """An array of workers in a Sun Grid Engine system parallel environment.
303 In order to use this class, the manager must be run via
304 Sun Grid Engine's 'qsub' command and submitted to a parallel
305 environment using the qsub -pe option. This class will start workers
306 on every node in the parallel environment (including the node running
307 the manager). Each worker is started using the 'qrsh' command with
308 the '-inherit' option."""
310 def _get_workers(self):
313 pe = os.environ[
'PE_HOSTFILE']
319 (node, num, queue) = line.split(
None, 2)
320 for i
in range(int(num)):
321 workers.append(_SGEPEWorker(node))
331 """A collection of tasks that run in the same environment.
332 Context objects are typically created by calling Manager::get_context().
336 self._manager = manager
337 self._startup = startup
341 """Add a task to this context.
342 Tasks are any Python callable object that can be pickled (e.g. a
343 function or a class that implements the \_\_call\_\_ method).
344 When the task is run on the worker its arguments are the return
345 value from this context's startup function."""
346 self._tasks.append(task)
349 """Run all of the tasks on available workers, and return results.
350 If there are more tasks than workers, subsequent tasks are
351 started only once a running task completes: each worker only runs
352 a single task at a time. As each task completes, the return value(s)
353 from the task callable are returned from this method, as a
354 Python generator. Note that the results are returned in the order
355 that tasks complete, which may not be the same as the order they
358 @exception NoMoreWorkersError there are no workers available
359 to run the tasks (or they all failed during execution).
360 @exception RemoteError a worker encountered an unhandled exception.
361 @exception NetworkError the manager lost (or was unable to
362 establish) communication with any worker.
364 return self._manager._get_results_unordered(self)
368 """Manages workers and contexts.
371 connect_timeout = 7200
374 heartbeat_timeout = 7200
376 def __init__(self, python=None, host=None, output='worker%d.output'):
378 @param python If not None, the command to run to start a Python
379 interpreter that can import the IMP module. Otherwise,
380 the same interpreter that the manager is currently
381 using is used. This is passed to the shell, so a full
382 command line (including multiple words separated by
383 spaces) can be used if necessary.
384 @param host The hostname that workers use to connect back to the
385 manager. If not specified, the manager machine's
386 primary IP address is used. On multi-homed machines,
387 such as compute cluster headnodes, this may need to be
388 changed to allow all workers to reach the manager
389 (typically the name of the machine's internal network
390 address is needed). If only running local workers,
391 'localhost' can be used to prohibit connections
393 @param output A format string used to name worker output files.
394 It is given the numeric worker id, so for example the default
395 value 'worker\%d.output' will yield output files called
396 worker0.output, worker1.output, etc.
399 self._python = sys.executable
401 self._python = python
403 self._output = output
404 self._all_workers = []
405 self._starting_workers = {}
406 self._worker_arrays = []
411 self._host = socket.gethostbyname_ex(socket.gethostname())[-1][0]
412 self._listen_sock = _ListenSocket(self._host, self.connect_timeout)
415 """Add a Worker object."""
416 if hasattr(worker,
'_get_workers'):
417 self._worker_arrays.append(worker)
419 self._all_workers.append(worker)
422 """Create and return a new Context in which tasks can be run.
423 @param startup If not None, a callable (Python function or class
424 that implements the \_\_call\_\_ method) that sets up
425 the worker to run tasks. This method is only called
426 once per worker. The return values from this method
427 are passed to the task object when it runs on
429 @return A new Context object.
433 def _get_results_unordered(self, context):
434 """Run all of a context's tasks, and yield results"""
435 self._send_tasks_to_workers(context)
438 for task
in self._get_finished_tasks(context):
439 tasks_queued = len(context._tasks)
443 if len(context._tasks) > tasks_queued:
444 self._send_tasks_to_workers(context)
445 except _NoMoreTasksError:
448 def _start_all_workers(self):
449 for array
in self._worker_arrays:
450 self._all_workers.extend(array._get_workers())
453 (
"%s -c \"import IMP.parallel.worker_handler as s; s.main()\""
454 " %s %d") % (self._python, self._host, self._listen_sock.port)
456 for (num, worker)
in enumerate(self._all_workers):
457 if worker._ready_to_start():
458 unique_id = self._get_unique_id(num)
459 self._starting_workers[unique_id] = worker
460 worker._start(command, unique_id, self._output % num)
462 for array
in self._worker_arrays:
463 array._start(command)
464 self._worker_arrays = []
466 def _get_unique_id(self, num):
468 for i
in range(0, 8):
469 id += chr(random.randint(0, 25) + ord(
'A'))
472 def _send_tasks_to_workers(self, context):
473 self._start_all_workers()
475 available_workers = \
476 [a
for a
in self._all_workers
if a._ready_for_task(context)] + \
477 [a
for a
in self._all_workers
if a._ready_for_task(
None)]
478 for worker
in available_workers:
479 if len(context._tasks) == 0:
482 self._send_task_to_worker(worker, context)
484 def _send_task_to_worker(self, worker, context):
485 if len(context._tasks) == 0:
487 t = context._tasks[0]
489 worker._start_task(t, context)
490 context._tasks.pop(0)
494 def _get_finished_tasks(self, context):
496 events = self._get_network_events(context)
498 self._kill_all_running_workers(context)
500 task = self._process_event(event, context)
504 def _process_event(self, event, context):
505 if event == self._listen_sock:
507 (conn, addr) = self._listen_sock.accept()
508 self._accept_worker(conn, context)
509 elif event._running_task(context):
511 task = event._get_finished_task()
513 self._send_task_to_worker(event, context)
516 self._kill_timed_out_workers(context)
517 except NetworkError
as detail:
519 print(
"Worker %s failed (%s): rescheduling task %s"
520 % (str(event), str(detail), str(task)))
521 context._tasks.append(task)
522 self._send_tasks_to_workers(context)
526 def _kill_timed_out_workers(self, context):
527 timed_out = [a
for a
in self._all_workers
if a._running_task(context)
528 and a.get_contact_timed_out(self.heartbeat_timeout)]
529 for worker
in timed_out:
530 task = worker._kill()
531 print(
"Did not hear from worker %s in %d seconds; rescheduling "
532 "task %s" % (str(worker), self.heartbeat_timeout, str(task)))
533 context._tasks.append(task)
534 if len(timed_out) > 0:
535 self._send_tasks_to_workers(context)
537 def _kill_all_running_workers(self, context):
538 running = [a
for a
in self._all_workers
if a._running_task(context)]
539 for worker
in running:
540 task = worker._kill()
541 context._tasks.append(task)
542 raise NetworkError(
"Did not hear from any running worker in "
543 "%d seconds" % self.heartbeat_timeout)
545 def _accept_worker(self, sock, context):
546 sock.setblocking(
True)
547 identifier = sock.recv(1024)
549 identifier = identifier.decode(
'ascii')
550 if identifier
and identifier
in self._starting_workers:
551 worker = self._starting_workers.pop(identifier)
552 worker._accept_connection(sock)
553 print(
"Identified worker %s " % str(worker))
554 self._init_worker(worker)
555 self._send_task_to_worker(worker, context)
558 print(
"Ignoring request from unknown worker")
560 def _init_worker(self, worker):
561 if _import_time_path[0] !=
'':
562 worker._set_python_search_path(_import_time_path[0])
563 if sys.path[0] !=
'' and sys.path[0] != _import_time_path[0]:
564 worker._set_python_search_path(sys.path[0])
566 def _get_network_events(self, context):
567 running = [a
for a
in self._all_workers
if a._running_task(context)]
568 if len(running) == 0:
569 if len(context._tasks) == 0:
570 raise _NoMoreTasksError()
571 elif len(self._starting_workers) == 0:
576 return IMP.parallel.util._poll_events(
577 self._listen_sock, running, self.heartbeat_timeout)
580 __version__ =
"20241121.develop.d97d4ead1f"
583 '''Return the version of this module, as a string'''
584 return "20241121.develop.d97d4ead1f"
587 '''Return the fully-qualified name of this module'''
588 return "IMP::parallel"
591 '''Return the full path to one of this module's data files'''
593 return IMP._get_module_data_path(
"parallel", fname)
596 '''Return the full path to one of this module's example files'''
598 return IMP._get_module_example_path(
"parallel", fname)
Representation of a single worker.
Error raised if a worker has an unhandled exception.
An array of workers in a Sun Grid Engine system parallel environment.
def get_module_name
Return the fully-qualified name of this module.
Representation of an array of workers.
A collection of tasks that run in the same environment.
A worker running on the same machine as the manager.
Error raised if all workers failed, so tasks cannot be run.
def add_task
Add a task to this context.
Utilities for the IMP.parallel module.
def add_worker
Add a Worker object.
An array of workers on a Sun Grid Engine system, started with 'qsub'.
Base class for all errors specific to the parallel module.
The general base class for IMP exceptions.
def get_module_version
Return the version of this module, as a string.
def get_example_path
Return the full path to one of this module's example files.
def get_context
Create and return a new Context in which tasks can be run.
def get_data_path
Return the full path to one of this module's data files.
Error raised if a problem occurs with the network.
Distribute IMP tasks to multiple processors or machines.
def get_results_unordered
Run all of the tasks on available workers, and return results.
Manages workers and contexts.