IMP logo
IMP Reference Guide  2.16.0
The Integrative Modeling Platform
parallel/__init__.py
1 import socket
2 import time
3 import sys
4 import os
5 import re
6 import random
7 import xdrlib
8 try:
9  import cPickle as pickle
10 except ImportError:
11  import pickle
12 import IMP
13 from IMP.parallel import workerstate
14 from IMP.parallel.subproc import _run_background, _Popen4
15 from IMP.parallel.util import _ListenSocket, _ErrorWrapper
16 from IMP.parallel.util import _TaskWrapper, _HeartBeat, _ContextWrapper
17 from IMP.parallel.util import _SetPathAction
18 
19 # Save sys.path at import time, so that workers can import using the same
20 # path that works for the manager imports
21 _import_time_path = sys.path[:]
22 
23 
25  """Base class for all errors specific to the parallel module"""
26  pass
27 
28 
29 class _NoMoreTasksError(Error):
30  pass
31 
32 
34  """Error raised if all workers failed, so tasks cannot be run"""
35  pass
36 
37 
38 @IMP.deprecated_object("2.14", "Use NoMoreWorkersError instead")
39 class NoMoreSlavesError(NoMoreWorkersError):
40  pass
41 
42 
43 class NetworkError(Error):
44  """Error raised if a problem occurs with the network"""
45  pass
46 
47 
49  """Error raised if a worker has an unhandled exception"""
50  def __init__(self, exc, traceback, worker):
51  self.exc = exc
52  self.traceback = traceback
53  self.worker = worker
54 
55  def __str__(self):
56  errstr = str(self.exc.__class__).replace("exceptions.", "")
57  return "%s: %s from %s\nRemote traceback:\n%s" \
58  % (errstr, str(self.exc), str(self.worker), self.traceback)
59 
60 
61 class _Communicator(object):
62  """Simple support for sending Python pickled objects over the network"""
63 
64  def __init__(self):
65  self._socket = None
66  self._ibuffer = b''
67 
68  def _send(self, obj):
69  p = xdrlib.Packer()
70  try:
71  p.pack_string(pickle.dumps(obj, -1))
72  # Python < 2.5 can fail trying to send Inf or NaN floats in binary
73  # mode, so fall back to the old protocol in this case:
74  except SystemError:
75  p.pack_string(pickle.dumps(obj, 0))
76  self._socket.sendall(p.get_buffer())
77 
78  def get_data_pending(self):
79  return len(self._ibuffer) > 0
80 
81  def _recv(self):
82  while True:
83  try:
84  obj, self._ibuffer = self._unpickle(self._ibuffer)
85  if isinstance(obj, _ErrorWrapper):
86  raise RemoteError(obj.obj, obj.traceback, self)
87  else:
88  return obj
89  except (IndexError, EOFError):
90  try:
91  data = self._socket.recv(4096)
92  except socket.error as detail:
93  raise NetworkError("Connection lost to %s: %s"
94  % (str(self), str(detail)))
95  if len(data) > 0:
96  self._ibuffer += data
97  else:
98  raise NetworkError("%s closed connection" % str(self))
99 
100  def _unpickle(self, ibuffer):
101  p = xdrlib.Unpacker(ibuffer)
102  obj = p.unpack_string()
103  return (pickle.loads(obj), ibuffer[p.get_position():])
104 
105 
106 class Worker(_Communicator):
107  """Representation of a single worker.
108  Each worker uses a single thread of execution (i.e. a single CPU core)
109  to run tasks sequentially.
110  Worker is an abstract class; instead of using this class directly, use
111  a subclass such as LocalWorker or SGEQsubWorkerArray."""
112 
113  def __init__(self):
114  _Communicator.__init__(self)
115  self._state = workerstate.init
116  self._context = None
117  self._task = None
118  self.update_contact_time()
119 
120  def _start(self, command, unique_id, output):
121  """Start the worker running on the remote host; override in
122  subclasses"""
123  self._state = workerstate.started
124 
125  def _accept_connection(self, sock):
126  self._socket = sock
127  self._state = workerstate.connected
128  self.update_contact_time()
129 
130  def _set_python_search_path(self, path):
131  self._send(_SetPathAction(path))
132 
133  def update_contact_time(self):
134  self.last_contact_time = time.time()
135 
136  def get_contact_timed_out(self, timeout):
137  return (time.time() - self.last_contact_time) > timeout
138 
139  def _start_task(self, task, context):
140  if not self._ready_for_task(context) \
141  and not self._ready_for_task(None):
142  raise TypeError("%s not ready for task" % str(self))
143  if self._context != context:
144  self._context = context
145  self._send(_ContextWrapper(context._startup))
146  self._state = workerstate.running_task
147  self._task = task
148  self._send(_TaskWrapper(task))
149 
150  def _get_finished_task(self):
151  while True:
152  r = self._recv()
153  self.update_contact_time()
154  if isinstance(r, _HeartBeat):
155  if not self.get_data_pending():
156  return None
157  else:
158  break
159  task = self._task
160  task._results = r
161  self._task = None
162  self._state = workerstate.connected
163  return task
164 
165  def _kill(self):
166  task = self._task
167  self._task = None
168  self._context = None
169  self._state = workerstate.dead
170  return task
171 
172  def _ready_to_start(self):
173  return self._state == workerstate.init
174 
175  def _ready_for_task(self, context):
176  return self._state == workerstate.connected \
177  and self._context == context
178 
179  def _running_task(self, context):
180  return self._state == workerstate.running_task \
181  and self._context == context
182 
183 
184 @IMP.deprecated_object("2.14", "Use Worker instead")
185 class Slave(Worker):
186  pass
187 
188 
189 class WorkerArray(object):
190  """Representation of an array of workers.
191  This is similar to Worker, except that it represents a collection of
192  workers that are controlled together, such as a batch submission system
193  array job on a compute cluster.
194  Worker is an abstract class; instead of using this class directly, use
195  a subclass such as SGEQsubWorkerArray."""
196 
197  def _get_workers(self):
198  """Return a list of Worker objects contained within this array"""
199  pass
200 
201  def _start(self):
202  """Do any necessary startup after all contained Workers have started"""
203  pass
204 
205 
206 @IMP.deprecated_object("2.14", "Use WorkerArray instead")
207 class SlaveArray(WorkerArray):
208  pass
209 
210 
211 class LocalWorker(Worker):
212  """A worker running on the same machine as the manager."""
213 
214  def _start(self, command, unique_id, output):
215  Worker._start(self, command, unique_id, output)
216  cmdline = "%s %s" % (command, unique_id)
217  _run_background(cmdline, output)
218 
219  def __repr__(self):
220  return "<LocalWorker>"
221 
222 
223 @IMP.deprecated_object("2.14", "Use LocalWorker instead")
224 class LocalSlave(LocalWorker):
225  pass
226 
227 
228 class _SGEQsubWorker(Worker):
229  def __init__(self, array):
230  Worker.__init__(self)
231  self._jobid = None
232  self._array = array
233 
234  def _start(self, command, unique_id, output):
235  Worker._start(self, command, unique_id, output)
236  self._array._worker_started(unique_id, output, self)
237 
238  def __repr__(self):
239  jobid = self._jobid
240  if jobid is None:
241  jobid = '(unknown)'
242  return "<SGE qsub worker, ID %s>" % jobid
243 
244 
246  """An array of workers on a Sun Grid Engine system, started with 'qsub'.
247  To use this class, the manager process must be running on a machine that
248  can submit Sun Grid Engine (SGE) jobs using the 'qsub' command (this
249  is termed a 'submit host' by SGE). The class starts an SGE job array
250  (every worker has the same SGE job ID, but a different task ID).
251  """
252 
253  standard_options = '-j y -cwd -r n -o sge-errors'
254 
255  def __init__(self, numworker, options):
256  """Constructor.
257  @param numworker The number of workers, which corresponds to the
258  number of tasks in the SGE job.
259  @param options A string of SGE options that are passed on the
260  'qsub' command line. This is added to standard_options.
261  """
262  self._numworker = numworker
263  self._options = options
264  self._starting_workers = []
265  self._jobid = None
266 
267  def _get_workers(self):
268  """Return a list of Worker objects contained within this array"""
269  return [_SGEQsubWorker(self) for x in range(self._numworker)]
270 
271  def _worker_started(self, command, output, worker):
272  self._starting_workers.append((command, output, worker))
273 
274  def _start(self, command):
275  qsub = "qsub -S /bin/sh %s %s -t 1-%d" % \
276  (self._options, self.standard_options,
277  len(self._starting_workers))
278  print(qsub)
279  a = _Popen4(qsub)
280  (inp, out) = (a.stdin, a.stdout)
281  worker_uid = " ".join([repr(s[0]) for s in self._starting_workers])
282  worker_out = " ".join([repr(s[1]) for s in self._starting_workers])
283  inp.write("#!/bin/sh\n")
284  inp.write("uid=( '' %s )\n" % worker_uid)
285  inp.write("out=( '' %s )\n" % worker_out)
286  inp.write("myuid=${uid[$SGE_TASK_ID]}\n")
287  inp.write("myout=${out[$SGE_TASK_ID]}\n")
288  inp.write("%s $myuid > $myout 2>&1\n" % command)
289  inp.close()
290  outlines = out.readlines()
291  out.close()
292  for line in outlines:
293  print(line.rstrip('\r\n'))
294  a.require_clean_exit()
295  self._set_jobid(outlines)
296  self._starting_workers = []
297 
298  def _set_jobid(self, outlines):
299  """Try to figure out the job ID from the SGE qsub output"""
300  if len(outlines) > 0:
301  m = re.compile(r"\d+").search(outlines[0])
302  if m:
303  self._jobid = int(m.group())
304  for (num, worker) in enumerate(self._starting_workers):
305  worker[2]._jobid = "%d.%d" % (self._jobid, num+1)
306 
307 
308 @IMP.deprecated_object("2.14", "Use SGEQsubWorkerArray instead")
309 class SGEQsubSlaveArray(SGEQsubWorkerArray):
310  pass
311 
312 
313 class _SGEPEWorker(Worker):
314  def __init__(self, host):
315  Worker.__init__(self)
316  self._host = host
317 
318  def _start(self, command, unique_id, output):
319  Worker._start(self, command, unique_id, output)
320  cmdline = "qrsh -inherit -V %s %s %s" % (self._host, command,
321  unique_id)
322  _run_background(cmdline, output)
323 
324  def __repr__(self):
325  return "<SGE PE worker on %s>" % self._host
326 
327 
329  """An array of workers in a Sun Grid Engine system parallel environment.
330  In order to use this class, the manager must be run via
331  Sun Grid Engine's 'qsub' command and submitted to a parallel
332  environment using the qsub -pe option. This class will start workers
333  on every node in the parallel environment (including the node running
334  the manager). Each worker is started using the 'qrsh' command with
335  the '-inherit' option."""
336 
337  def _get_workers(self):
338  workers = []
339 
340  pe = os.environ['PE_HOSTFILE']
341  fh = open(pe, "r")
342  while True:
343  line = fh.readline()
344  if line == '':
345  break
346  (node, num, queue) = line.split(None, 2)
347  for i in range(int(num)):
348  workers.append(_SGEPEWorker(node))
349  # Replace first worker with a local worker, as this is ourself, and SGE
350  # won't let us start this process with qrsh (as we are already
351  # occupying the slot)
352  if len(workers) > 0:
353  workers[0] = LocalWorker()
354  return workers
355 
356 
357 @IMP.deprecated_object("2.14", "Use SGEPEWorkerArray instead")
358 class SGEPESlaveArray(SGEPEWorkerArray):
359  pass
360 
361 
362 class Context(object):
363  """A collection of tasks that run in the same environment.
364  Context objects are typically created by calling Manager::get_context().
365  """
366  def __init__(self, manager, startup=None):
367  """Constructor."""
368  self._manager = manager
369  self._startup = startup
370  self._tasks = []
371 
372  def add_task(self, task):
373  """Add a task to this context.
374  Tasks are any Python callable object that can be pickled (e.g. a
375  function or a class that implements the \_\_call\_\_ method).
376  When the task is run on the worker its arguments are the return
377  value from this context's startup function."""
378  self._tasks.append(task)
379 
380  def get_results_unordered(self):
381  """Run all of the tasks on available workers, and return results.
382  If there are more tasks than workers, subsequent tasks are
383  started only once a running task completes: each worker only runs
384  a single task at a time. As each task completes, the return value(s)
385  from the task callable are returned from this method, as a
386  Python generator. Note that the results are returned in the order
387  that tasks complete, which may not be the same as the order they
388  were submitted in.
390  @exception NoMoreWorkersError there are no workers available
391  to run the tasks (or they all failed during execution).
392  @exception RemoteError a worker encountered an unhandled exception.
393  @exception NetworkError the manager lost (or was unable to
394  establish) communication with any worker.
395  """
396  return self._manager._get_results_unordered(self)
397 
398 
399 class Manager(object):
400  """Manages workers and contexts.
401  """
402 
403  connect_timeout = 7200
404 
405  # Note: must be higher than that in worker_handler._HeartBeatThread
406  heartbeat_timeout = 7200
407 
408  def __init__(self, python=None, host=None, output='worker%d.output'):
409  """Constructor.
410  @param python If not None, the command to run to start a Python
411  interpreter that can import the IMP module. Otherwise,
412  the same interpreter that the manager is currently
413  using is used. This is passed to the shell, so a full
414  command line (including multiple words separated by
415  spaces) can be used if necessary.
416  @param host The hostname that workers use to connect back to the
417  manager. If not specified, the manager machine's
418  primary IP address is used. On multi-homed machines,
419  such as compute cluster headnodes, this may need to be
420  changed to allow all workers to reach the manager
421  (typically the name of the machine's internal network
422  address is needed). If only running local workers,
423  'localhost' can be used to prohibit connections
424  across the network.
425  @param output A format string used to name worker output files.
426  It is given the numeric worker id, so for example the default
427  value 'worker\%d.output' will yield output files called
428  worker0.output, worker1.output, etc.
429  """
430  if python is None:
431  self._python = sys.executable
432  else:
433  self._python = python
434  self._host = host
435  self._output = output
436  self._all_workers = []
437  self._starting_workers = {}
438  self._worker_arrays = []
439  if host:
440  self._host = host
441  else:
442  # Get primary IP address of this machine
443  self._host = socket.gethostbyname_ex(socket.gethostname())[-1][0]
444  self._listen_sock = _ListenSocket(self._host, self.connect_timeout)
445 
446  def add_worker(self, worker):
447  """Add a Worker object."""
448  if hasattr(worker, '_get_workers'):
449  self._worker_arrays.append(worker)
450  else:
451  self._all_workers.append(worker)
452 
453  def get_context(self, startup=None):
454  """Create and return a new Context in which tasks can be run.
455  @param startup If not None, a callable (Python function or class
456  that implements the \_\_call\_\_ method) that sets up
457  the worker to run tasks. This method is only called
458  once per worker. The return values from this method
459  are passed to the task object when it runs on
460  the worker.
461  @return A new Context object.
462  """
463  return Context(self, startup)
464 
465  def _get_results_unordered(self, context):
466  """Run all of a context's tasks, and yield results"""
467  self._send_tasks_to_workers(context)
468  try:
469  while True:
470  for task in self._get_finished_tasks(context):
471  tasks_queued = len(context._tasks)
472  yield task._results
473  # If the user added more tasks while processing these
474  # results, make sure they get sent off to the workers
475  if len(context._tasks) > tasks_queued:
476  self._send_tasks_to_workers(context)
477  except _NoMoreTasksError:
478  return
479 
480  def _start_all_workers(self):
481  for array in self._worker_arrays:
482  self._all_workers.extend(array._get_workers())
483 
484  command = \
485  ("%s -c \"import IMP.parallel.worker_handler as s; s.main()\""
486  " %s %d") % (self._python, self._host, self._listen_sock.port)
487 
488  for (num, worker) in enumerate(self._all_workers):
489  if worker._ready_to_start():
490  unique_id = self._get_unique_id(num)
491  self._starting_workers[unique_id] = worker
492  worker._start(command, unique_id, self._output % num)
493 
494  for array in self._worker_arrays:
495  array._start(command)
496  self._worker_arrays = []
497 
498  def _get_unique_id(self, num):
499  id = "%d:" % num
500  for i in range(0, 8):
501  id += chr(random.randint(0, 25) + ord('A'))
502  return id
503 
504  def _send_tasks_to_workers(self, context):
505  self._start_all_workers()
506  # Prefer workers that already have the task context
507  available_workers = \
508  [a for a in self._all_workers if a._ready_for_task(context)] + \
509  [a for a in self._all_workers if a._ready_for_task(None)]
510  for worker in available_workers:
511  if len(context._tasks) == 0:
512  break
513  else:
514  self._send_task_to_worker(worker, context)
515 
516  def _send_task_to_worker(self, worker, context):
517  if len(context._tasks) == 0:
518  return
519  t = context._tasks[0]
520  try:
521  worker._start_task(t, context)
522  context._tasks.pop(0)
523  except socket.error:
524  worker._kill()
525 
526  def _get_finished_tasks(self, context):
527  while True:
528  events = self._get_network_events(context)
529  if len(events) == 0:
530  self._kill_all_running_workers(context)
531  for event in events:
532  task = self._process_event(event, context)
533  if task:
534  yield task
535 
536  def _process_event(self, event, context):
537  if event == self._listen_sock:
538  # New worker just connected
539  (conn, addr) = self._listen_sock.accept()
540  self._accept_worker(conn, context)
541  elif event._running_task(context):
542  try:
543  task = event._get_finished_task()
544  if task:
545  self._send_task_to_worker(event, context)
546  return task
547  else: # the worker sent back a heartbeat
548  self._kill_timed_out_workers(context)
549  except NetworkError as detail:
550  task = event._kill()
551  print("Worker %s failed (%s): rescheduling task %s"
552  % (str(event), str(detail), str(task)))
553  context._tasks.append(task)
554  self._send_tasks_to_workers(context)
555  else:
556  pass # Worker not running a task
557 
558  def _kill_timed_out_workers(self, context):
559  timed_out = [a for a in self._all_workers if a._running_task(context)
560  and a.get_contact_timed_out(self.heartbeat_timeout)]
561  for worker in timed_out:
562  task = worker._kill()
563  print("Did not hear from worker %s in %d seconds; rescheduling "
564  "task %s" % (str(worker), self.heartbeat_timeout, str(task)))
565  context._tasks.append(task)
566  if len(timed_out) > 0:
567  self._send_tasks_to_workers(context)
568 
569  def _kill_all_running_workers(self, context):
570  running = [a for a in self._all_workers if a._running_task(context)]
571  for worker in running:
572  task = worker._kill()
573  context._tasks.append(task)
574  raise NetworkError("Did not hear from any running worker in "
575  "%d seconds" % self.heartbeat_timeout)
576 
577  def _accept_worker(self, sock, context):
578  sock.setblocking(True)
579  identifier = sock.recv(1024)
580  if identifier:
581  identifier = identifier.decode('ascii')
582  if identifier and identifier in self._starting_workers:
583  worker = self._starting_workers.pop(identifier)
584  worker._accept_connection(sock)
585  print("Identified worker %s " % str(worker))
586  self._init_worker(worker)
587  self._send_task_to_worker(worker, context)
588  return worker
589  else:
590  print("Ignoring request from unknown worker")
591 
592  def _init_worker(self, worker):
593  if _import_time_path[0] != '':
594  worker._set_python_search_path(_import_time_path[0])
595  if sys.path[0] != '' and sys.path[0] != _import_time_path[0]:
596  worker._set_python_search_path(sys.path[0])
597 
598  def _get_network_events(self, context):
599  running = [a for a in self._all_workers if a._running_task(context)]
600  if len(running) == 0:
601  if len(context._tasks) == 0:
602  raise _NoMoreTasksError()
603  elif len(self._starting_workers) == 0:
604  raise NoMoreWorkersError("Ran out of workers to run tasks")
605  # Otherwise, wait for starting workers to connect back and
606  # get tasks
607 
608  return IMP.parallel.util._poll_events(
609  self._listen_sock, running, self.heartbeat_timeout)
610 
611 
612 __version__ = "2.16.0"
613 
615  '''Return the version of this module, as a string'''
616  return "2.16.0"
617 
618 def get_module_name():
619  '''Return the fully-qualified name of this module'''
620  return "IMP::parallel"
621 
622 def get_data_path(fname):
623  '''Return the full path to one of this module's data files'''
624  import IMP
625  return IMP._get_module_data_path("parallel", fname)
626 
627 def get_example_path(fname):
628  '''Return the full path to one of this module's example files'''
629  import IMP
630  return IMP._get_module_example_path("parallel", fname)
Representation of a single worker.
Error raised if a worker has an unhandled exception.
An array of workers in a Sun Grid Engine system parallel environment.
def get_module_name
Return the fully-qualified name of this module.
Representation of an array of workers.
Subprocess handling.
Definition: subproc.py:1
def __init__
Constructor.
A collection of tasks that run in the same environment.
A worker running on the same machine as the manager.
Error raised if all workers failed, so tasks cannot be run.
def add_task
Add a task to this context.
Utilities for the IMP.parallel module.
Definition: parallel/util.py:1
def add_worker
Add a Worker object.
def deprecated_object
Python decorator to mark a class as deprecated.
Definition: __init__.py:10568
An array of workers on a Sun Grid Engine system, started with 'qsub'.
Base class for all errors specific to the parallel module.
The general base class for IMP exceptions.
Definition: exception.h:49
def get_module_version
Return the version of this module, as a string.
def get_example_path
Return the full path to one of this module's example files.
def get_context
Create and return a new Context in which tasks can be run.
def get_data_path
Return the full path to one of this module's data files.
Error raised if a problem occurs with the network.
def __init__
Constructor.
Distribute IMP tasks to multiple processors or machines.
def get_results_unordered
Run all of the tasks on available workers, and return results.
Manages workers and contexts.