10 import cPickle
as pickle
21 _import_time_path = sys.path[:]
24 """Base class for all errors specific to the parallel module"""
28 class _NoMoreTasksError(
Error):
33 """Error raised if all slaves failed, so tasks cannot be run"""
38 """Error raised if a problem occurs with the network"""
43 """Error raised if a slave has an unhandled exception"""
44 def __init__(self, exc, traceback, slave):
46 self.traceback = traceback
50 errstr = str(self.exc.__class__).replace(
"exceptions.",
"")
51 return "%s: %s from %s\nRemote traceback:\n%s" \
52 % (errstr, str(self.exc), str(self.slave), self.traceback)
54 class _Communicator(object):
55 """Simple support for sending Python pickled objects over the network"""
64 p.pack_string(pickle.dumps(obj, -1))
68 p.pack_string(pickle.dumps(obj, 0))
69 self._socket.sendall(p.get_buffer())
71 def get_data_pending(self):
72 return len(self._ibuffer) > 0
77 obj, self._ibuffer = self._unpickle(self._ibuffer)
78 if isinstance(obj, _ErrorWrapper):
82 except (IndexError, EOFError):
84 data = self._socket.recv(4096)
85 except socket.error
as detail:
87 % (str(self), str(detail)))
93 def _unpickle(self, ibuffer):
94 p = xdrlib.Unpacker(ibuffer)
95 obj = p.unpack_string()
96 return (pickle.loads(obj), ibuffer[p.get_position():])
100 """Representation of a single slave.
101 Each slave uses a single thread of execution (i.e. a single CPU core)
102 to run tasks sequentially.
103 Slave is an abstract class; instead of using this class directly, use
104 a subclass such as LocalSlave or SGEQsubSlaveArray."""
107 _Communicator.__init__(self)
108 self._state = slavestate.init
111 self.update_contact_time()
113 def _start(self, command, unique_id, output):
114 """Start the slave running on the remote host; override in subclasses"""
115 self._state = slavestate.started
117 def _accept_connection(self, sock):
119 self._state = slavestate.connected
120 self.update_contact_time()
122 def _set_python_search_path(self, path):
123 self._send(_SetPathAction(path))
125 def update_contact_time(self):
126 self.last_contact_time = time.time()
128 def get_contact_timed_out(self, timeout):
129 return (time.time() - self.last_contact_time) > timeout
131 def _start_task(self, task, context):
132 if not self._ready_for_task(context)
and not self._ready_for_task(
None):
133 raise TypeError(
"%s not ready for task" % str(self))
134 if self._context != context:
135 self._context = context
136 self._send(_ContextWrapper(context._startup))
137 self._state = slavestate.running_task
139 self._send(_TaskWrapper(task))
141 def _get_finished_task(self):
144 self.update_contact_time()
145 if isinstance(r, _HeartBeat):
146 if not self.get_data_pending():
153 self._state = slavestate.connected
160 self._state = slavestate.dead
163 def _ready_to_start(self):
164 return self._state == slavestate.init
166 def _ready_for_task(self, context):
167 return self._state == slavestate.connected \
168 and self._context == context
170 def _running_task(self, context):
171 return self._state == slavestate.running_task \
172 and self._context == context
176 """Representation of an array of slaves.
177 This is similar to Slave, except that it represents a collection of
178 slaves that are controlled together, such as a batch submission system
179 array job on a compute cluster.
180 Slave is an abstract class; instead of using this class directly, use
181 a subclass such as SGEQsubSlaveArray."""
183 def _get_slaves(self):
184 """Return a list of Slave objects contained within this array"""
188 """Do any necessary startup after all contained Slaves have started"""
192 class LocalSlave(Slave):
193 """A slave running on the same machine as the master."""
195 def _start(self, command, unique_id, output):
196 Slave._start(self, command, unique_id, output)
197 cmdline =
"%s %s" % (command, unique_id)
198 _run_background(cmdline, output)
201 return "<LocalSlave>"
204 class _SGEQsubSlave(Slave):
205 def __init__(self, array):
210 def _start(self, command, unique_id, output):
211 Slave._start(self, command, unique_id, output)
212 self._array._slave_started(unique_id, output, self)
218 return "<SGE qsub slave, ID %s>" % jobid
222 """An array of slaves on a Sun Grid Engine system, started with 'qsub'.
223 To use this class, the master process must be running on a machine that
224 can submit Sun Grid Engine (SGE) jobs using the 'qsub' command (this
225 is termed a 'submit host' by SGE). The class starts an SGE job array
226 (every slave has the same SGE job ID, but a different task ID).
230 standard_options =
'-j y -cwd -r n -o sge-errors'
234 @param numslave The number of slaves, which corresponds to the
235 number of tasks in the SGE job.
236 @param options A string of SGE options that are passed on the 'qsub'
237 command line. This is added to standard_options.
239 self._numslave = numslave
240 self._options = options
241 self._starting_slaves = []
244 def _get_slaves(self):
245 """Return a list of Slave objects contained within this array"""
246 return [_SGEQsubSlave(self)
for x
in range(self._numslave)]
248 def _slave_started(self, command, output, slave):
249 self._starting_slaves.append((command, output, slave))
251 def _start(self, command):
252 qsub =
"qsub -S /bin/sh %s %s -t 1-%d" % \
253 (self._options, self.standard_options,
254 len(self._starting_slaves))
257 (inp, out) = (a.stdin, a.stdout)
258 slave_uid =
" ".join([repr(s[0])
for s
in self._starting_slaves])
259 slave_out =
" ".join([repr(s[1])
for s
in self._starting_slaves])
260 inp.write(
"#!/bin/sh\n")
261 inp.write(
"uid=( '' %s )\n" % slave_uid)
262 inp.write(
"out=( '' %s )\n" % slave_out)
263 inp.write(
"myuid=${uid[$SGE_TASK_ID]}\n")
264 inp.write(
"myout=${out[$SGE_TASK_ID]}\n")
265 inp.write(
"%s $myuid > $myout 2>&1\n" % command)
267 outlines = out.readlines()
269 for line
in outlines:
270 print(line.rstrip(
'\r\n'))
271 a.require_clean_exit()
272 self._set_jobid(outlines)
273 self._starting_slaves = []
275 def _set_jobid(self, outlines):
276 """Try to figure out the job ID from the SGE qsub output"""
277 if len(outlines) > 0:
278 m = re.compile(
r"\d+").search(outlines[0])
280 self._jobid = int(m.group())
281 for (num, slave)
in enumerate(self._starting_slaves):
282 slave[2]._jobid =
"%d.%d" % (self._jobid, num+1)
285 class _SGEPESlave(
Slave):
290 def _start(self, command, unique_id, output):
291 Slave._start(self, command, unique_id, output)
292 cmdline =
"qrsh -inherit -V %s %s %s" % (self._host, command, unique_id)
293 _run_background(cmdline, output)
296 return "<SGE PE slave on %s>" % self._host
300 """An array of slaves in a Sun Grid Engine system parallel environment.
301 In order to use this class, the master must be run via Sun Grid Engine's
302 'qsub' command and submitted to a parallel environment using the qsub
303 -pe option. This class will start slaves on every node in the parallel
304 environment (including the node running the master). Each slave is
305 started using the 'qrsh' command with the '-inherit' option."""
307 def _get_slaves(self):
310 pe = os.environ[
'PE_HOSTFILE']
316 (node, num, queue) = line.split(
None, 2)
317 for i
in range(int(num)):
318 slaves.append(_SGEPESlave(node))
328 """A collection of tasks that run in the same environment.
329 Context objects are typically created by calling Manager::get_context().
333 self._manager = manager
334 self._startup = startup
338 """Add a task to this context.
339 Tasks are any Python callable object that can be pickled (e.g. a
340 function or a class that implements the \_\_call\_\_ method). When
341 the task is run on the slave its arguments are the return value
342 from this context's startup function."""
343 self._tasks.append(task)
346 """Run all of the tasks on available slaves, and return results.
347 If there are more tasks than slaves, subsequent tasks are
348 started only once a running task completes: each slave only runs
349 a single task at a time. As each task completes, the return value(s)
350 from the task callable are returned from this method, as a
351 Python generator. Note that the results are returned in the order
352 that tasks complete, which may not be the same as the order they
355 @exception NoMoreSlavesError there are no slaves available
356 to run the tasks (or they all failed during execution).
357 @exception RemoteError a slave encountered an unhandled exception.
358 @exception NetworkError the master lost (or was unable to
359 establish) communication with any slave.
361 return self._manager._get_results_unordered(self)
365 """Manages slaves and contexts.
368 connect_timeout = 7200
371 heartbeat_timeout = 7200
373 def __init__(self, python=None, host=None, output='slave%d.output'):
375 @param python If not None, the command to run to start a Python
376 interpreter that can import the IMP module. Otherwise,
377 the same interpreter that the master is currently using
378 is used. This is passed to the shell, so a full command
379 line (including multiple words separated by spaces) can
380 be used if necessary.
381 @param host The hostname that slaves use to connect back to the
382 master. If not specified, the master machine's primary
383 IP address is used. On multi-homed machines, such as
384 compute cluster headnodes, this may need to be changed
385 to allow all slaves to reach the master (typically the
386 name of the machine's internal network address is
387 needed). If only running local slaves, 'localhost' can
388 be used to prohibit connections across the network.
389 @param output A format string used to name slave output files. It is
390 given the numeric slave id, so for example the default
391 value 'slave\%d.output' will yield output files called
392 slave0.output, slave1.output, etc.
395 self._python = sys.executable
397 self._python = python
399 self._output = output
400 self._all_slaves = []
401 self._starting_slaves = {}
402 self._slave_arrays = []
407 self._host = socket.gethostbyname_ex(socket.gethostname())[-1][0]
408 self._listen_sock = _ListenSocket(self._host, self.connect_timeout)
411 """Add a Slave object."""
412 if hasattr(slave,
'_get_slaves'):
413 self._slave_arrays.append(slave)
415 self._all_slaves.append(slave)
418 """Create and return a new Context in which tasks can be run.
419 @param startup If not None, a callable (Python function or class
420 that implements the \_\_call\_\_ method) that sets up
421 the slave to run tasks. This method is only called
422 once per slave. The return values from this method
423 are passed to the task object when it runs on
425 @return A new Context object.
429 def _get_results_unordered(self, context):
430 """Run all of a context's tasks, and yield results"""
431 self._send_tasks_to_slaves(context)
434 for task
in self._get_finished_tasks(context):
435 tasks_queued = len(context._tasks)
439 if len(context._tasks) > tasks_queued:
440 self._send_tasks_to_slaves(context)
441 except _NoMoreTasksError:
444 def _start_all_slaves(self):
445 for array
in self._slave_arrays:
446 self._all_slaves.extend(array._get_slaves())
448 command = (
"%s -c \"import IMP.parallel.slave_handler as s; s.main()\" "
449 "%s %d") % (self._python, self._host, self._listen_sock.port)
451 for (num, slave)
in enumerate(self._all_slaves):
452 if slave._ready_to_start():
453 unique_id = self._get_unique_id(num)
454 self._starting_slaves[unique_id] = slave
455 slave._start(command, unique_id, self._output % num)
457 for array
in self._slave_arrays:
458 array._start(command)
459 self._slave_arrays = []
461 def _get_unique_id(self, num):
463 for i
in range(0, 8):
464 id += chr(random.randint(0, 25) + ord(
'A'))
467 def _send_tasks_to_slaves(self, context):
468 self._start_all_slaves()
470 available_slaves = [a
for a
in self._all_slaves
471 if a._ready_for_task(context)] + \
472 [a
for a
in self._all_slaves
473 if a._ready_for_task(
None)]
474 for slave
in available_slaves:
475 if len(context._tasks) == 0:
478 self._send_task_to_slave(slave, context)
480 def _send_task_to_slave(self, slave, context):
481 if len(context._tasks) == 0:
483 t = context._tasks[0]
485 slave._start_task(t, context)
486 context._tasks.pop(0)
487 except socket.error
as detail:
490 def _get_finished_tasks(self, context):
492 events = self._get_network_events(context)
494 self._kill_all_running_slaves(context)
496 task = self._process_event(event, context)
500 def _process_event(self, event, context):
501 if event == self._listen_sock:
503 (conn, addr) = self._listen_sock.accept()
504 new_slave = self._accept_slave(conn, context)
505 elif event._running_task(context):
507 task = event._get_finished_task()
509 self._send_task_to_slave(event, context)
512 self._kill_timed_out_slaves(context)
513 except NetworkError
as detail:
515 print(
"Slave %s failed (%s): rescheduling task %s" \
516 % (str(event), str(detail), str(task)))
517 context._tasks.append(task)
518 self._send_tasks_to_slaves(context)
522 def _kill_timed_out_slaves(self, context):
523 timed_out = [a
for a
in self._all_slaves
if a._running_task(context) \
524 and a.get_contact_timed_out(self.heartbeat_timeout)]
525 for slave
in timed_out:
527 print(
"Did not hear from slave %s in %d seconds; rescheduling "
528 "task %s" % (str(slave), self.heartbeat_timeout, str(task)))
529 context._tasks.append(task)
530 if len(timed_out) > 0:
531 self._send_tasks_to_slaves(context)
533 def _kill_all_running_slaves(self, context):
534 running = [a
for a
in self._all_slaves
if a._running_task(context)]
535 for slave
in running:
537 context._tasks.append(task)
538 raise NetworkError(
"Did not hear from any running slave in "
539 "%d seconds" % self.heartbeat_timeout)
541 def _accept_slave(self, sock, context):
542 sock.setblocking(
True)
543 identifier = sock.recv(1024)
545 identifier = identifier.decode(
'ascii')
546 if identifier
and identifier
in self._starting_slaves:
547 slave = self._starting_slaves.pop(identifier)
548 slave._accept_connection(sock)
549 print(
"Identified slave %s " % str(slave))
550 self._init_slave(slave)
551 self._send_task_to_slave(slave, context)
554 print(
"Ignoring request from unknown slave")
556 def _init_slave(self, slave):
557 if _import_time_path[0] !=
'':
558 slave._set_python_search_path(_import_time_path[0])
559 if sys.path[0] !=
'' and sys.path[0] != _import_time_path[0]:
560 slave._set_python_search_path(sys.path[0])
562 def _get_network_events(self, context):
563 running = [a
for a
in self._all_slaves
if a._running_task(context)]
564 if len(running) == 0:
565 if len(context._tasks) == 0:
566 raise _NoMoreTasksError()
567 elif len(self._starting_slaves) == 0:
571 return util._poll_events(self._listen_sock, running,
572 self.heartbeat_timeout)
576 '''Return the version of this module, as a string'''
580 '''Return the fully-qualified name of this module'''
581 return "IMP::parallel"
584 '''Return the full path to one of this module's data files'''
586 return IMP._get_module_data_path(
"parallel", fname)
589 '''Return the full path to one of this module's example files'''
591 return IMP._get_module_example_path(
"parallel", fname)
Representation of an array of slaves.
Error raised if a slave has an unhandled exception.
def get_module_name
Return the fully-qualified name of this module.
An array of slaves in a Sun Grid Engine system parallel environment.
A collection of tasks that run in the same environment.
Representation of a single slave.
def add_task
Add a task to this context.
Utilities for the IMP.parallel module.
Base class for all errors specific to the parallel module.
The general base class for IMP exceptions.
def get_module_version
Return the version of this module, as a string.
def get_example_path
Return the full path to one of this module's example files.
def add_slave
Add a Slave object.
def get_context
Create and return a new Context in which tasks can be run.
def get_data_path
Return the full path to one of this module's data files.
Error raised if a problem occurs with the network.
Error raised if all slaves failed, so tasks cannot be run.
Distribute IMP tasks to multiple processors or machines.
An array of slaves on a Sun Grid Engine system, started with 'qsub'.
def get_results_unordered
Run all of the tasks on available slaves, and return results.
Manages slaves and contexts.
A slave running on the same machine as the master.