IMP logo
IMP Reference Guide  develop.e004443c3b,2024/04/25
The Integrative Modeling Platform
parallel/__init__.py
1 import socket
2 import time
3 import sys
4 import os
5 import re
6 import random
7 import struct
8 try:
9  import cPickle as pickle
10 except ImportError:
11  import pickle
12 import IMP
13 from IMP.parallel import workerstate
14 from IMP.parallel.subproc import _run_background, _Popen4
15 from IMP.parallel.util import _ListenSocket, _ErrorWrapper
16 from IMP.parallel.util import _TaskWrapper, _HeartBeat, _ContextWrapper
17 from IMP.parallel.util import _SetPathAction
18 
19 # Save sys.path at import time, so that workers can import using the same
20 # path that works for the manager imports
21 _import_time_path = sys.path[:]
22 
23 
25  """Base class for all errors specific to the parallel module"""
26  pass
27 
28 
29 class _NoMoreTasksError(Error):
30  pass
31 
32 
34  """Error raised if all workers failed, so tasks cannot be run"""
35  pass
36 
37 
39  """Error raised if a problem occurs with the network"""
40  pass
41 
42 
44  """Error raised if a worker has an unhandled exception"""
45  def __init__(self, exc, traceback, worker):
46  self.exc = exc
47  self.traceback = traceback
48  self.worker = worker
49 
50  def __str__(self):
51  errstr = str(self.exc.__class__).replace("exceptions.", "")
52  return "%s: %s from %s\nRemote traceback:\n%s" \
53  % (errstr, str(self.exc), str(self.worker), self.traceback)
54 
55 
56 class _Communicator(object):
57  """Simple support for sending Python pickled objects over the network"""
58 
59  # Number of bytes used to pack the length of a string
60  _slen_size = struct.calcsize('!i')
61 
62  def __init__(self):
63  self._socket = None
64  self._ibuffer = b''
65 
66  def _send(self, obj):
67  s = pickle.dumps(obj, -1)
68  self._socket.sendall(struct.pack('!i', len(s)) + s)
69 
70  def get_data_pending(self):
71  return len(self._ibuffer) > 0
72 
73  def _recv(self):
74  while True:
75  try:
76  obj, self._ibuffer = self._unpickle(self._ibuffer)
77  if isinstance(obj, _ErrorWrapper):
78  raise RemoteError(obj.obj, obj.traceback, self)
79  else:
80  return obj
81  except (IndexError, EOFError):
82  try:
83  data = self._socket.recv(4096)
84  except socket.error as detail:
85  raise NetworkError("Connection lost to %s: %s"
86  % (str(self), str(detail)))
87  if len(data) > 0:
88  self._ibuffer += data
89  else:
90  raise NetworkError("%s closed connection" % str(self))
91 
92  def _unpickle(self, ibuffer):
93  if len(ibuffer) < self._slen_size:
94  raise IndexError()
95  slen, = struct.unpack_from('!i', ibuffer)
96  if self._slen_size + slen > len(ibuffer):
97  raise IndexError()
98  return (pickle.loads(ibuffer[self._slen_size: self._slen_size + slen]),
99  ibuffer[self._slen_size + slen:])
100 
101 
102 class Worker(_Communicator):
103  """Representation of a single worker.
104  Each worker uses a single thread of execution (i.e. a single CPU core)
105  to run tasks sequentially.
106  Worker is an abstract class; instead of using this class directly, use
107  a subclass such as LocalWorker or SGEQsubWorkerArray."""
108 
109  def __init__(self):
110  _Communicator.__init__(self)
111  self._state = workerstate.init
112  self._context = None
113  self._task = None
114  self.update_contact_time()
115 
116  def _start(self, command, unique_id, output):
117  """Start the worker running on the remote host; override in
118  subclasses"""
119  self._state = workerstate.started
120 
121  def _accept_connection(self, sock):
122  self._socket = sock
123  self._state = workerstate.connected
124  self.update_contact_time()
125 
126  def _set_python_search_path(self, path):
127  self._send(_SetPathAction(path))
128 
129  def update_contact_time(self):
130  self.last_contact_time = time.time()
131 
132  def get_contact_timed_out(self, timeout):
133  return (time.time() - self.last_contact_time) > timeout
134 
135  def _start_task(self, task, context):
136  if not self._ready_for_task(context) \
137  and not self._ready_for_task(None):
138  raise TypeError("%s not ready for task" % str(self))
139  if self._context != context:
140  self._context = context
141  self._send(_ContextWrapper(context._startup))
142  self._state = workerstate.running_task
143  self._task = task
144  self._send(_TaskWrapper(task))
145 
146  def _get_finished_task(self):
147  while True:
148  r = self._recv()
149  self.update_contact_time()
150  if isinstance(r, _HeartBeat):
151  if not self.get_data_pending():
152  return None
153  else:
154  break
155  task = self._task
156  task._results = r
157  self._task = None
158  self._state = workerstate.connected
159  return task
160 
161  def _kill(self):
162  task = self._task
163  self._task = None
164  self._context = None
165  self._state = workerstate.dead
166  return task
167 
168  def _ready_to_start(self):
169  return self._state == workerstate.init
170 
171  def _ready_for_task(self, context):
172  return self._state == workerstate.connected \
173  and self._context == context
174 
175  def _running_task(self, context):
176  return self._state == workerstate.running_task \
177  and self._context == context
178 
179 
180 class WorkerArray(object):
181  """Representation of an array of workers.
182  This is similar to Worker, except that it represents a collection of
183  workers that are controlled together, such as a batch submission system
184  array job on a compute cluster.
185  Worker is an abstract class; instead of using this class directly, use
186  a subclass such as SGEQsubWorkerArray."""
187 
188  def _get_workers(self):
189  """Return a list of Worker objects contained within this array"""
190  pass
191 
192  def _start(self):
193  """Do any necessary startup after all contained Workers have started"""
194  pass
195 
196 
197 class LocalWorker(Worker):
198  """A worker running on the same machine as the manager."""
199 
200  def _start(self, command, unique_id, output):
201  Worker._start(self, command, unique_id, output)
202  cmdline = "%s %s" % (command, unique_id)
203  _run_background(cmdline, output)
204 
205  def __repr__(self):
206  return "<LocalWorker>"
207 
208 
209 class _SGEQsubWorker(Worker):
210  def __init__(self, array):
211  Worker.__init__(self)
212  self._jobid = None
213  self._array = array
214 
215  def _start(self, command, unique_id, output):
216  Worker._start(self, command, unique_id, output)
217  self._array._worker_started(unique_id, output, self)
218 
219  def __repr__(self):
220  jobid = self._jobid
221  if jobid is None:
222  jobid = '(unknown)'
223  return "<SGE qsub worker, ID %s>" % jobid
224 
225 
227  """An array of workers on a Sun Grid Engine system, started with 'qsub'.
228  To use this class, the manager process must be running on a machine that
229  can submit Sun Grid Engine (SGE) jobs using the 'qsub' command (this
230  is termed a 'submit host' by SGE). The class starts an SGE job array
231  (every worker has the same SGE job ID, but a different task ID).
232  """
233 
234  standard_options = '-j y -cwd -r n -o sge-errors'
235 
236  def __init__(self, numworker, options):
237  """Constructor.
238  @param numworker The number of workers, which corresponds to the
239  number of tasks in the SGE job.
240  @param options A string of SGE options that are passed on the
241  'qsub' command line. This is added to standard_options.
242  """
243  self._numworker = numworker
244  self._options = options
245  self._starting_workers = []
246  self._jobid = None
247 
248  def _get_workers(self):
249  """Return a list of Worker objects contained within this array"""
250  return [_SGEQsubWorker(self) for x in range(self._numworker)]
251 
252  def _worker_started(self, command, output, worker):
253  self._starting_workers.append((command, output, worker))
254 
255  def _start(self, command):
256  qsub = "qsub -S /bin/sh %s %s -t 1-%d" % \
257  (self._options, self.standard_options,
258  len(self._starting_workers))
259  print(qsub)
260  a = _Popen4(qsub)
261  (inp, out) = (a.stdin, a.stdout)
262  worker_uid = " ".join([repr(s[0]) for s in self._starting_workers])
263  worker_out = " ".join([repr(s[1]) for s in self._starting_workers])
264  inp.write("#!/bin/sh\n")
265  inp.write("uid=( '' %s )\n" % worker_uid)
266  inp.write("out=( '' %s )\n" % worker_out)
267  inp.write("myuid=${uid[$SGE_TASK_ID]}\n")
268  inp.write("myout=${out[$SGE_TASK_ID]}\n")
269  inp.write("%s $myuid > $myout 2>&1\n" % command)
270  inp.close()
271  outlines = out.readlines()
272  out.close()
273  for line in outlines:
274  print(line.rstrip('\r\n'))
275  a.require_clean_exit()
276  self._set_jobid(outlines)
277  self._starting_workers = []
278 
279  def _set_jobid(self, outlines):
280  """Try to figure out the job ID from the SGE qsub output"""
281  if len(outlines) > 0:
282  m = re.compile(r"\d+").search(outlines[0])
283  if m:
284  self._jobid = int(m.group())
285  for (num, worker) in enumerate(self._starting_workers):
286  worker[2]._jobid = "%d.%d" % (self._jobid, num+1)
287 
288 
289 class _SGEPEWorker(Worker):
290  def __init__(self, host):
291  Worker.__init__(self)
292  self._host = host
293 
294  def _start(self, command, unique_id, output):
295  Worker._start(self, command, unique_id, output)
296  cmdline = "qrsh -inherit -V %s %s %s" % (self._host, command,
297  unique_id)
298  _run_background(cmdline, output)
299 
300  def __repr__(self):
301  return "<SGE PE worker on %s>" % self._host
302 
303 
305  """An array of workers in a Sun Grid Engine system parallel environment.
306  In order to use this class, the manager must be run via
307  Sun Grid Engine's 'qsub' command and submitted to a parallel
308  environment using the qsub -pe option. This class will start workers
309  on every node in the parallel environment (including the node running
310  the manager). Each worker is started using the 'qrsh' command with
311  the '-inherit' option."""
312 
313  def _get_workers(self):
314  workers = []
315 
316  pe = os.environ['PE_HOSTFILE']
317  fh = open(pe, "r")
318  while True:
319  line = fh.readline()
320  if line == '':
321  break
322  (node, num, queue) = line.split(None, 2)
323  for i in range(int(num)):
324  workers.append(_SGEPEWorker(node))
325  # Replace first worker with a local worker, as this is ourself, and SGE
326  # won't let us start this process with qrsh (as we are already
327  # occupying the slot)
328  if len(workers) > 0:
329  workers[0] = LocalWorker()
330  return workers
331 
332 
333 class Context(object):
334  """A collection of tasks that run in the same environment.
335  Context objects are typically created by calling Manager::get_context().
336  """
337  def __init__(self, manager, startup=None):
338  """Constructor."""
339  self._manager = manager
340  self._startup = startup
341  self._tasks = []
342 
343  def add_task(self, task):
344  """Add a task to this context.
345  Tasks are any Python callable object that can be pickled (e.g. a
346  function or a class that implements the \_\_call\_\_ method).
347  When the task is run on the worker its arguments are the return
348  value from this context's startup function."""
349  self._tasks.append(task)
350 
351  def get_results_unordered(self):
352  """Run all of the tasks on available workers, and return results.
353  If there are more tasks than workers, subsequent tasks are
354  started only once a running task completes: each worker only runs
355  a single task at a time. As each task completes, the return value(s)
356  from the task callable are returned from this method, as a
357  Python generator. Note that the results are returned in the order
358  that tasks complete, which may not be the same as the order they
359  were submitted in.
361  @exception NoMoreWorkersError there are no workers available
362  to run the tasks (or they all failed during execution).
363  @exception RemoteError a worker encountered an unhandled exception.
364  @exception NetworkError the manager lost (or was unable to
365  establish) communication with any worker.
366  """
367  return self._manager._get_results_unordered(self)
368 
369 
370 class Manager(object):
371  """Manages workers and contexts.
372  """
373 
374  connect_timeout = 7200
375 
376  # Note: must be higher than that in worker_handler._HeartBeatThread
377  heartbeat_timeout = 7200
378 
379  def __init__(self, python=None, host=None, output='worker%d.output'):
380  """Constructor.
381  @param python If not None, the command to run to start a Python
382  interpreter that can import the IMP module. Otherwise,
383  the same interpreter that the manager is currently
384  using is used. This is passed to the shell, so a full
385  command line (including multiple words separated by
386  spaces) can be used if necessary.
387  @param host The hostname that workers use to connect back to the
388  manager. If not specified, the manager machine's
389  primary IP address is used. On multi-homed machines,
390  such as compute cluster headnodes, this may need to be
391  changed to allow all workers to reach the manager
392  (typically the name of the machine's internal network
393  address is needed). If only running local workers,
394  'localhost' can be used to prohibit connections
395  across the network.
396  @param output A format string used to name worker output files.
397  It is given the numeric worker id, so for example the default
398  value 'worker\%d.output' will yield output files called
399  worker0.output, worker1.output, etc.
400  """
401  if python is None:
402  self._python = sys.executable
403  else:
404  self._python = python
405  self._host = host
406  self._output = output
407  self._all_workers = []
408  self._starting_workers = {}
409  self._worker_arrays = []
410  if host:
411  self._host = host
412  else:
413  # Get primary IP address of this machine
414  self._host = socket.gethostbyname_ex(socket.gethostname())[-1][0]
415  self._listen_sock = _ListenSocket(self._host, self.connect_timeout)
416 
417  def add_worker(self, worker):
418  """Add a Worker object."""
419  if hasattr(worker, '_get_workers'):
420  self._worker_arrays.append(worker)
421  else:
422  self._all_workers.append(worker)
423 
424  def get_context(self, startup=None):
425  """Create and return a new Context in which tasks can be run.
426  @param startup If not None, a callable (Python function or class
427  that implements the \_\_call\_\_ method) that sets up
428  the worker to run tasks. This method is only called
429  once per worker. The return values from this method
430  are passed to the task object when it runs on
431  the worker.
432  @return A new Context object.
433  """
434  return Context(self, startup)
435 
436  def _get_results_unordered(self, context):
437  """Run all of a context's tasks, and yield results"""
438  self._send_tasks_to_workers(context)
439  try:
440  while True:
441  for task in self._get_finished_tasks(context):
442  tasks_queued = len(context._tasks)
443  yield task._results
444  # If the user added more tasks while processing these
445  # results, make sure they get sent off to the workers
446  if len(context._tasks) > tasks_queued:
447  self._send_tasks_to_workers(context)
448  except _NoMoreTasksError:
449  return
450 
451  def _start_all_workers(self):
452  for array in self._worker_arrays:
453  self._all_workers.extend(array._get_workers())
454 
455  command = \
456  ("%s -c \"import IMP.parallel.worker_handler as s; s.main()\""
457  " %s %d") % (self._python, self._host, self._listen_sock.port)
458 
459  for (num, worker) in enumerate(self._all_workers):
460  if worker._ready_to_start():
461  unique_id = self._get_unique_id(num)
462  self._starting_workers[unique_id] = worker
463  worker._start(command, unique_id, self._output % num)
464 
465  for array in self._worker_arrays:
466  array._start(command)
467  self._worker_arrays = []
468 
469  def _get_unique_id(self, num):
470  id = "%d:" % num
471  for i in range(0, 8):
472  id += chr(random.randint(0, 25) + ord('A'))
473  return id
474 
475  def _send_tasks_to_workers(self, context):
476  self._start_all_workers()
477  # Prefer workers that already have the task context
478  available_workers = \
479  [a for a in self._all_workers if a._ready_for_task(context)] + \
480  [a for a in self._all_workers if a._ready_for_task(None)]
481  for worker in available_workers:
482  if len(context._tasks) == 0:
483  break
484  else:
485  self._send_task_to_worker(worker, context)
486 
487  def _send_task_to_worker(self, worker, context):
488  if len(context._tasks) == 0:
489  return
490  t = context._tasks[0]
491  try:
492  worker._start_task(t, context)
493  context._tasks.pop(0)
494  except socket.error:
495  worker._kill()
496 
497  def _get_finished_tasks(self, context):
498  while True:
499  events = self._get_network_events(context)
500  if len(events) == 0:
501  self._kill_all_running_workers(context)
502  for event in events:
503  task = self._process_event(event, context)
504  if task:
505  yield task
506 
507  def _process_event(self, event, context):
508  if event == self._listen_sock:
509  # New worker just connected
510  (conn, addr) = self._listen_sock.accept()
511  self._accept_worker(conn, context)
512  elif event._running_task(context):
513  try:
514  task = event._get_finished_task()
515  if task:
516  self._send_task_to_worker(event, context)
517  return task
518  else: # the worker sent back a heartbeat
519  self._kill_timed_out_workers(context)
520  except NetworkError as detail:
521  task = event._kill()
522  print("Worker %s failed (%s): rescheduling task %s"
523  % (str(event), str(detail), str(task)))
524  context._tasks.append(task)
525  self._send_tasks_to_workers(context)
526  else:
527  pass # Worker not running a task
528 
529  def _kill_timed_out_workers(self, context):
530  timed_out = [a for a in self._all_workers if a._running_task(context)
531  and a.get_contact_timed_out(self.heartbeat_timeout)]
532  for worker in timed_out:
533  task = worker._kill()
534  print("Did not hear from worker %s in %d seconds; rescheduling "
535  "task %s" % (str(worker), self.heartbeat_timeout, str(task)))
536  context._tasks.append(task)
537  if len(timed_out) > 0:
538  self._send_tasks_to_workers(context)
539 
540  def _kill_all_running_workers(self, context):
541  running = [a for a in self._all_workers if a._running_task(context)]
542  for worker in running:
543  task = worker._kill()
544  context._tasks.append(task)
545  raise NetworkError("Did not hear from any running worker in "
546  "%d seconds" % self.heartbeat_timeout)
547 
548  def _accept_worker(self, sock, context):
549  sock.setblocking(True)
550  identifier = sock.recv(1024)
551  if identifier:
552  identifier = identifier.decode('ascii')
553  if identifier and identifier in self._starting_workers:
554  worker = self._starting_workers.pop(identifier)
555  worker._accept_connection(sock)
556  print("Identified worker %s " % str(worker))
557  self._init_worker(worker)
558  self._send_task_to_worker(worker, context)
559  return worker
560  else:
561  print("Ignoring request from unknown worker")
562 
563  def _init_worker(self, worker):
564  if _import_time_path[0] != '':
565  worker._set_python_search_path(_import_time_path[0])
566  if sys.path[0] != '' and sys.path[0] != _import_time_path[0]:
567  worker._set_python_search_path(sys.path[0])
568 
569  def _get_network_events(self, context):
570  running = [a for a in self._all_workers if a._running_task(context)]
571  if len(running) == 0:
572  if len(context._tasks) == 0:
573  raise _NoMoreTasksError()
574  elif len(self._starting_workers) == 0:
575  raise NoMoreWorkersError("Ran out of workers to run tasks")
576  # Otherwise, wait for starting workers to connect back and
577  # get tasks
578 
579  return IMP.parallel.util._poll_events(
580  self._listen_sock, running, self.heartbeat_timeout)
581 
582 
583 __version__ = "20240425.develop.e004443c3b"
584 
586  '''Return the version of this module, as a string'''
587  return "20240425.develop.e004443c3b"
588 
589 def get_module_name():
590  '''Return the fully-qualified name of this module'''
591  return "IMP::parallel"
592 
593 def get_data_path(fname):
594  '''Return the full path to one of this module's data files'''
595  import IMP
596  return IMP._get_module_data_path("parallel", fname)
597 
598 def get_example_path(fname):
599  '''Return the full path to one of this module's example files'''
600  import IMP
601  return IMP._get_module_example_path("parallel", fname)
Representation of a single worker.
Error raised if a worker has an unhandled exception.
An array of workers in a Sun Grid Engine system parallel environment.
def get_module_name
Return the fully-qualified name of this module.
Representation of an array of workers.
Subprocess handling.
Definition: subproc.py:1
def __init__
Constructor.
A collection of tasks that run in the same environment.
A worker running on the same machine as the manager.
Error raised if all workers failed, so tasks cannot be run.
def add_task
Add a task to this context.
Utilities for the IMP.parallel module.
Definition: parallel/util.py:1
def add_worker
Add a Worker object.
An array of workers on a Sun Grid Engine system, started with 'qsub'.
Base class for all errors specific to the parallel module.
The general base class for IMP exceptions.
Definition: exception.h:48
def get_module_version
Return the version of this module, as a string.
def get_example_path
Return the full path to one of this module's example files.
def get_context
Create and return a new Context in which tasks can be run.
def get_data_path
Return the full path to one of this module's data files.
Error raised if a problem occurs with the network.
def __init__
Constructor.
Distribute IMP tasks to multiple processors or machines.
def get_results_unordered
Run all of the tasks on available workers, and return results.
Manages workers and contexts.