IMP logo
IMP Reference Guide  2.14.0
The Integrative Modeling Platform
parallel/__init__.py
1 import socket
2 import time
3 import sys
4 import os
5 import re
6 import random
7 import socket
8 import xdrlib
9 try:
10  import cPickle as pickle
11 except ImportError:
12  import pickle
13 import IMP
14 from IMP.parallel import workerstate
15 from IMP.parallel.subproc import _run_background, _Popen4
16 from IMP.parallel.util import _ListenSocket, _ErrorWrapper
17 from IMP.parallel.util import _TaskWrapper, _HeartBeat, _ContextWrapper
18 from IMP.parallel.util import _SetPathAction
19 
20 # Save sys.path at import time, so that workers can import using the same
21 # path that works for the manager imports
22 _import_time_path = sys.path[:]
23 
25  """Base class for all errors specific to the parallel module"""
26  pass
27 
28 
29 class _NoMoreTasksError(Error):
30  pass
31 
32 
34  """Error raised if all workers failed, so tasks cannot be run"""
35  pass
36 
37 
38 @IMP.deprecated_object("2.14", "Use NoMoreWorkersError instead")
39 class NoMoreSlavesError(NoMoreWorkersError):
40  pass
41 
42 
43 class NetworkError(Error):
44  """Error raised if a problem occurs with the network"""
45  pass
46 
47 
49  """Error raised if a worker has an unhandled exception"""
50  def __init__(self, exc, traceback, worker):
51  self.exc = exc
52  self.traceback = traceback
53  self.worker = worker
54 
55  def __str__(self):
56  errstr = str(self.exc.__class__).replace("exceptions.", "")
57  return "%s: %s from %s\nRemote traceback:\n%s" \
58  % (errstr, str(self.exc), str(self.worker), self.traceback)
59 
60 class _Communicator(object):
61  """Simple support for sending Python pickled objects over the network"""
62 
63  def __init__(self):
64  self._socket = None
65  self._ibuffer = b''
66 
67  def _send(self, obj):
68  p = xdrlib.Packer()
69  try:
70  p.pack_string(pickle.dumps(obj, -1))
71  # Python < 2.5 can fail trying to send Inf or NaN floats in binary
72  # mode, so fall back to the old protocol in this case:
73  except SystemError:
74  p.pack_string(pickle.dumps(obj, 0))
75  self._socket.sendall(p.get_buffer())
76 
77  def get_data_pending(self):
78  return len(self._ibuffer) > 0
79 
80  def _recv(self):
81  while True:
82  try:
83  obj, self._ibuffer = self._unpickle(self._ibuffer)
84  if isinstance(obj, _ErrorWrapper):
85  raise RemoteError(obj.obj, obj.traceback, self)
86  else:
87  return obj
88  except (IndexError, EOFError):
89  try:
90  data = self._socket.recv(4096)
91  except socket.error as detail:
92  raise NetworkError("Connection lost to %s: %s" \
93  % (str(self), str(detail)))
94  if len(data) > 0:
95  self._ibuffer += data
96  else:
97  raise NetworkError("%s closed connection" % str(self))
98 
99  def _unpickle(self, ibuffer):
100  p = xdrlib.Unpacker(ibuffer)
101  obj = p.unpack_string()
102  return (pickle.loads(obj), ibuffer[p.get_position():])
103 
104 
105 class Worker(_Communicator):
106  """Representation of a single worker.
107  Each worker uses a single thread of execution (i.e. a single CPU core)
108  to run tasks sequentially.
109  Worker is an abstract class; instead of using this class directly, use
110  a subclass such as LocalWorker or SGEQsubWorkerArray."""
111 
112  def __init__(self):
113  _Communicator.__init__(self)
114  self._state = workerstate.init
115  self._context = None
116  self._task = None
117  self.update_contact_time()
118 
119  def _start(self, command, unique_id, output):
120  """Start the worker running on the remote host; override in
121  subclasses"""
122  self._state = workerstate.started
123 
124  def _accept_connection(self, sock):
125  self._socket = sock
126  self._state = workerstate.connected
127  self.update_contact_time()
128 
129  def _set_python_search_path(self, path):
130  self._send(_SetPathAction(path))
131 
132  def update_contact_time(self):
133  self.last_contact_time = time.time()
134 
135  def get_contact_timed_out(self, timeout):
136  return (time.time() - self.last_contact_time) > timeout
137 
138  def _start_task(self, task, context):
139  if not self._ready_for_task(context) and not self._ready_for_task(None):
140  raise TypeError("%s not ready for task" % str(self))
141  if self._context != context:
142  self._context = context
143  self._send(_ContextWrapper(context._startup))
144  self._state = workerstate.running_task
145  self._task = task
146  self._send(_TaskWrapper(task))
147 
148  def _get_finished_task(self):
149  while True:
150  r = self._recv()
151  self.update_contact_time()
152  if isinstance(r, _HeartBeat):
153  if not self.get_data_pending():
154  return None
155  else:
156  break
157  task = self._task
158  task._results = r
159  self._task = None
160  self._state = workerstate.connected
161  return task
162 
163  def _kill(self):
164  task = self._task
165  self._task = None
166  self._context = None
167  self._state = workerstate.dead
168  return task
169 
170  def _ready_to_start(self):
171  return self._state == workerstate.init
172 
173  def _ready_for_task(self, context):
174  return self._state == workerstate.connected \
175  and self._context == context
176 
177  def _running_task(self, context):
178  return self._state == workerstate.running_task \
179  and self._context == context
180 
181 
182 @IMP.deprecated_object("2.14", "Use Worker instead")
183 class Slave(Worker):
184  pass
185 
186 
187 class WorkerArray(object):
188  """Representation of an array of workers.
189  This is similar to Worker, except that it represents a collection of
190  workers that are controlled together, such as a batch submission system
191  array job on a compute cluster.
192  Worker is an abstract class; instead of using this class directly, use
193  a subclass such as SGEQsubWorkerArray."""
194 
195  def _get_workers(self):
196  """Return a list of Worker objects contained within this array"""
197  pass
198 
199  def _start(self):
200  """Do any necessary startup after all contained Workers have started"""
201  pass
202 
203 @IMP.deprecated_object("2.14", "Use WorkerArray instead")
204 class SlaveArray(WorkerArray):
205  pass
206 
207 
208 class LocalWorker(Worker):
209  """A worker running on the same machine as the manager."""
210 
211  def _start(self, command, unique_id, output):
212  Worker._start(self, command, unique_id, output)
213  cmdline = "%s %s" % (command, unique_id)
214  _run_background(cmdline, output)
215 
216  def __repr__(self):
217  return "<LocalWorker>"
218 
219 
220 @IMP.deprecated_object("2.14", "Use LocalWorker instead")
221 class LocalSlave(LocalWorker):
222  pass
223 
224 
225 class _SGEQsubWorker(Worker):
226  def __init__(self, array):
227  Worker.__init__(self)
228  self._jobid = None
229  self._array = array
230 
231  def _start(self, command, unique_id, output):
232  Worker._start(self, command, unique_id, output)
233  self._array._worker_started(unique_id, output, self)
234 
235  def __repr__(self):
236  jobid = self._jobid
237  if jobid is None:
238  jobid = '(unknown)'
239  return "<SGE qsub worker, ID %s>" % jobid
240 
241 
243  """An array of workers on a Sun Grid Engine system, started with 'qsub'.
244  To use this class, the manager process must be running on a machine that
245  can submit Sun Grid Engine (SGE) jobs using the 'qsub' command (this
246  is termed a 'submit host' by SGE). The class starts an SGE job array
247  (every worker has the same SGE job ID, but a different task ID).
248  """
249 
250 
251  standard_options = '-j y -cwd -r n -o sge-errors'
252 
253  def __init__(self, numworker, options):
254  """Constructor.
255  @param numworker The number of workers, which corresponds to the
256  number of tasks in the SGE job.
257  @param options A string of SGE options that are passed on the 'qsub'
258  command line. This is added to standard_options.
259  """
260  self._numworker = numworker
261  self._options = options
262  self._starting_workers = []
263  self._jobid = None
264 
265  def _get_workers(self):
266  """Return a list of Worker objects contained within this array"""
267  return [_SGEQsubWorker(self) for x in range(self._numworker)]
268 
269  def _worker_started(self, command, output, worker):
270  self._starting_workers.append((command, output, worker))
271 
272  def _start(self, command):
273  qsub = "qsub -S /bin/sh %s %s -t 1-%d" % \
274  (self._options, self.standard_options,
275  len(self._starting_workers))
276  print(qsub)
277  a = _Popen4(qsub)
278  (inp, out) = (a.stdin, a.stdout)
279  worker_uid = " ".join([repr(s[0]) for s in self._starting_workers])
280  worker_out = " ".join([repr(s[1]) for s in self._starting_workers])
281  inp.write("#!/bin/sh\n")
282  inp.write("uid=( '' %s )\n" % worker_uid)
283  inp.write("out=( '' %s )\n" % worker_out)
284  inp.write("myuid=${uid[$SGE_TASK_ID]}\n")
285  inp.write("myout=${out[$SGE_TASK_ID]}\n")
286  inp.write("%s $myuid > $myout 2>&1\n" % command)
287  inp.close()
288  outlines = out.readlines()
289  out.close()
290  for line in outlines:
291  print(line.rstrip('\r\n'))
292  a.require_clean_exit()
293  self._set_jobid(outlines)
294  self._starting_workers = []
295 
296  def _set_jobid(self, outlines):
297  """Try to figure out the job ID from the SGE qsub output"""
298  if len(outlines) > 0:
299  m = re.compile(r"\d+").search(outlines[0])
300  if m:
301  self._jobid = int(m.group())
302  for (num, worker) in enumerate(self._starting_workers):
303  worker[2]._jobid = "%d.%d" % (self._jobid, num+1)
304 
305 
306 @IMP.deprecated_object("2.14", "Use SGEQsubWorkerArray instead")
307 class SGEQsubSlaveArray(SGEQsubWorkerArray):
308  pass
309 
310 
311 class _SGEPEWorker(Worker):
312  def __init__(self, host):
313  Worker.__init__(self)
314  self._host = host
315 
316  def _start(self, command, unique_id, output):
317  Worker._start(self, command, unique_id, output)
318  cmdline = "qrsh -inherit -V %s %s %s" % (self._host, command, unique_id)
319  _run_background(cmdline, output)
320 
321  def __repr__(self):
322  return "<SGE PE worker on %s>" % self._host
323 
324 
326  """An array of workers in a Sun Grid Engine system parallel environment.
327  In order to use this class, the manager must be run via Sun Grid Engine's
328  'qsub' command and submitted to a parallel environment using the qsub
329  -pe option. This class will start workers on every node in the parallel
330  environment (including the node running the manager). Each worker is
331  started using the 'qrsh' command with the '-inherit' option."""
332 
333  def _get_workers(self):
334  workers = []
335 
336  pe = os.environ['PE_HOSTFILE']
337  fh = open(pe, "r")
338  while True:
339  line = fh.readline()
340  if line == '':
341  break
342  (node, num, queue) = line.split(None, 2)
343  for i in range(int(num)):
344  workers.append(_SGEPEWorker(node))
345  # Replace first worker with a local worker, as this is ourself, and SGE
346  # won't let us start this process with qrsh (as we are already
347  # occupying the slot)
348  if len(workers) > 0:
349  workers[0] = LocalWorker()
350  return workers
351 
352 
353 @IMP.deprecated_object("2.14", "Use SGEPEWorkerArray instead")
354 class SGEPESlaveArray(SGEPEWorkerArray):
355  pass
356 
357 
358 class Context(object):
359  """A collection of tasks that run in the same environment.
360  Context objects are typically created by calling Manager::get_context().
361  """
362  def __init__(self, manager, startup=None):
363  """Constructor."""
364  self._manager = manager
365  self._startup = startup
366  self._tasks = []
367 
368  def add_task(self, task):
369  """Add a task to this context.
370  Tasks are any Python callable object that can be pickled (e.g. a
371  function or a class that implements the \_\_call\_\_ method). When
372  the task is run on the worker its arguments are the return value
373  from this context's startup function."""
374  self._tasks.append(task)
375 
376  def get_results_unordered(self):
377  """Run all of the tasks on available workers, and return results.
378  If there are more tasks than workers, subsequent tasks are
379  started only once a running task completes: each worker only runs
380  a single task at a time. As each task completes, the return value(s)
381  from the task callable are returned from this method, as a
382  Python generator. Note that the results are returned in the order
383  that tasks complete, which may not be the same as the order they
384  were submitted in.
386  @exception NoMoreWorkersError there are no workers available
387  to run the tasks (or they all failed during execution).
388  @exception RemoteError a worker encountered an unhandled exception.
389  @exception NetworkError the manager lost (or was unable to
390  establish) communication with any worker.
391  """
392  return self._manager._get_results_unordered(self)
393 
394 
395 class Manager(object):
396  """Manages workers and contexts.
397  """
398 
399  connect_timeout = 7200
400 
401  # Note: must be higher than that in worker_handler._HeartBeatThread
402  heartbeat_timeout = 7200
403 
404  def __init__(self, python=None, host=None, output='worker%d.output'):
405  """Constructor.
406  @param python If not None, the command to run to start a Python
407  interpreter that can import the IMP module. Otherwise,
408  the same interpreter that the manager is currently
409  using is used. This is passed to the shell, so a full
410  command line (including multiple words separated by
411  spaces) can be used if necessary.
412  @param host The hostname that workers use to connect back to the
413  manager. If not specified, the manager machine's
414  primary IP address is used. On multi-homed machines,
415  such as compute cluster headnodes, this may need to be
416  changed to allow all workers to reach the manager
417  (typically the name of the machine's internal network
418  address is needed). If only running local workers,
419  'localhost' can be used to prohibit connections
420  across the network.
421  @param output A format string used to name worker output files. It is
422  given the numeric worker id, so for example the default
423  value 'worker\%d.output' will yield output files called
424  worker0.output, worker1.output, etc.
425  """
426  if python is None:
427  self._python = sys.executable
428  else:
429  self._python = python
430  self._host = host
431  self._output = output
432  self._all_workers = []
433  self._starting_workers = {}
434  self._worker_arrays = []
435  if host:
436  self._host = host
437  else:
438  # Get primary IP address of this machine
439  self._host = socket.gethostbyname_ex(socket.gethostname())[-1][0]
440  self._listen_sock = _ListenSocket(self._host, self.connect_timeout)
441 
442  def add_worker(self, worker):
443  """Add a Worker object."""
444  if hasattr(worker, '_get_workers'):
445  self._worker_arrays.append(worker)
446  else:
447  self._all_workers.append(worker)
448 
449  def get_context(self, startup=None):
450  """Create and return a new Context in which tasks can be run.
451  @param startup If not None, a callable (Python function or class
452  that implements the \_\_call\_\_ method) that sets up
453  the worker to run tasks. This method is only called
454  once per worker. The return values from this method
455  are passed to the task object when it runs on
456  the worker.
457  @return A new Context object.
458  """
459  return Context(self, startup)
460 
461  def _get_results_unordered(self, context):
462  """Run all of a context's tasks, and yield results"""
463  self._send_tasks_to_workers(context)
464  try:
465  while True:
466  for task in self._get_finished_tasks(context):
467  tasks_queued = len(context._tasks)
468  yield task._results
469  # If the user added more tasks while processing these
470  # results, make sure they get sent off to the workers
471  if len(context._tasks) > tasks_queued:
472  self._send_tasks_to_workers(context)
473  except _NoMoreTasksError:
474  return
475 
476  def _start_all_workers(self):
477  for array in self._worker_arrays:
478  self._all_workers.extend(array._get_workers())
479 
480  command = ("%s -c \"import IMP.parallel.worker_handler as s; s.main()\""
481  " %s %d") % (self._python, self._host,
482  self._listen_sock.port)
483 
484  for (num, worker) in enumerate(self._all_workers):
485  if worker._ready_to_start():
486  unique_id = self._get_unique_id(num)
487  self._starting_workers[unique_id] = worker
488  worker._start(command, unique_id, self._output % num)
489 
490  for array in self._worker_arrays:
491  array._start(command)
492  self._worker_arrays = []
493 
494  def _get_unique_id(self, num):
495  id = "%d:" % num
496  for i in range(0, 8):
497  id += chr(random.randint(0, 25) + ord('A'))
498  return id
499 
500  def _send_tasks_to_workers(self, context):
501  self._start_all_workers()
502  # Prefer workers that already have the task context
503  available_workers = [a for a in self._all_workers
504  if a._ready_for_task(context)] + \
505  [a for a in self._all_workers
506  if a._ready_for_task(None)]
507  for worker in available_workers:
508  if len(context._tasks) == 0:
509  break
510  else:
511  self._send_task_to_worker(worker, context)
512 
513  def _send_task_to_worker(self, worker, context):
514  if len(context._tasks) == 0:
515  return
516  t = context._tasks[0]
517  try:
518  worker._start_task(t, context)
519  context._tasks.pop(0)
520  except socket.error as detail:
521  worker._kill()
522 
523  def _get_finished_tasks(self, context):
524  while True:
525  events = self._get_network_events(context)
526  if len(events) == 0:
527  self._kill_all_running_workers(context)
528  for event in events:
529  task = self._process_event(event, context)
530  if task:
531  yield task
532 
533  def _process_event(self, event, context):
534  if event == self._listen_sock:
535  # New worker just connected
536  (conn, addr) = self._listen_sock.accept()
537  new_worker = self._accept_worker(conn, context)
538  elif event._running_task(context):
539  try:
540  task = event._get_finished_task()
541  if task:
542  self._send_task_to_worker(event, context)
543  return task
544  else: # the worker sent back a heartbeat
545  self._kill_timed_out_workers(context)
546  except NetworkError as detail:
547  task = event._kill()
548  print("Worker %s failed (%s): rescheduling task %s" \
549  % (str(event), str(detail), str(task)))
550  context._tasks.append(task)
551  self._send_tasks_to_workers(context)
552  else:
553  pass # Worker not running a task
554 
555  def _kill_timed_out_workers(self, context):
556  timed_out = [a for a in self._all_workers if a._running_task(context) \
557  and a.get_contact_timed_out(self.heartbeat_timeout)]
558  for worker in timed_out:
559  task = worker._kill()
560  print("Did not hear from worker %s in %d seconds; rescheduling "
561  "task %s" % (str(worker), self.heartbeat_timeout, str(task)))
562  context._tasks.append(task)
563  if len(timed_out) > 0:
564  self._send_tasks_to_workers(context)
565 
566  def _kill_all_running_workers(self, context):
567  running = [a for a in self._all_workers if a._running_task(context)]
568  for worker in running:
569  task = worker._kill()
570  context._tasks.append(task)
571  raise NetworkError("Did not hear from any running worker in "
572  "%d seconds" % self.heartbeat_timeout)
573 
574  def _accept_worker(self, sock, context):
575  sock.setblocking(True)
576  identifier = sock.recv(1024)
577  if identifier:
578  identifier = identifier.decode('ascii')
579  if identifier and identifier in self._starting_workers:
580  worker = self._starting_workers.pop(identifier)
581  worker._accept_connection(sock)
582  print("Identified worker %s " % str(worker))
583  self._init_worker(worker)
584  self._send_task_to_worker(worker, context)
585  return worker
586  else:
587  print("Ignoring request from unknown worker")
588 
589  def _init_worker(self, worker):
590  if _import_time_path[0] != '':
591  worker._set_python_search_path(_import_time_path[0])
592  if sys.path[0] != '' and sys.path[0] != _import_time_path[0]:
593  worker._set_python_search_path(sys.path[0])
594 
595  def _get_network_events(self, context):
596  running = [a for a in self._all_workers if a._running_task(context)]
597  if len(running) == 0:
598  if len(context._tasks) == 0:
599  raise _NoMoreTasksError()
600  elif len(self._starting_workers) == 0:
601  raise NoMoreWorkersError("Ran out of workers to run tasks")
602  # Otherwise, wait for starting workers to connect back and get tasks
603 
604  return util._poll_events(self._listen_sock, running,
605  self.heartbeat_timeout)
606 
607 
608 __version__ = "2.14.0"
609 
611  '''Return the version of this module, as a string'''
612  return "2.14.0"
613 
614 def get_module_name():
615  '''Return the fully-qualified name of this module'''
616  return "IMP::parallel"
617 
618 def get_data_path(fname):
619  '''Return the full path to one of this module's data files'''
620  import IMP
621  return IMP._get_module_data_path("parallel", fname)
622 
623 def get_example_path(fname):
624  '''Return the full path to one of this module's example files'''
625  import IMP
626  return IMP._get_module_example_path("parallel", fname)
Representation of a single worker.
Error raised if a worker has an unhandled exception.
An array of workers in a Sun Grid Engine system parallel environment.
def get_module_name
Return the fully-qualified name of this module.
Representation of an array of workers.
Subprocess handling.
Definition: subproc.py:1
def __init__
Constructor.
A collection of tasks that run in the same environment.
A worker running on the same machine as the manager.
Error raised if all workers failed, so tasks cannot be run.
def add_task
Add a task to this context.
Utilities for the IMP.parallel module.
Definition: parallel/util.py:1
def add_worker
Add a Worker object.
def deprecated_object
Python decorator to mark a class as deprecated.
Definition: __init__.py:10240
An array of workers on a Sun Grid Engine system, started with 'qsub'.
Base class for all errors specific to the parallel module.
The general base class for IMP exceptions.
Definition: exception.h:49
def get_module_version
Return the version of this module, as a string.
def get_example_path
Return the full path to one of this module's example files.
def get_context
Create and return a new Context in which tasks can be run.
def get_data_path
Return the full path to one of this module's data files.
Error raised if a problem occurs with the network.
def __init__
Constructor.
Distribute IMP tasks to multiple processors or machines.
def get_results_unordered
Run all of the tasks on available workers, and return results.
Manages workers and contexts.