IMP logo
IMP Reference Guide  develop.330bebda01,2025/01/21
The Integrative Modeling Platform
parallel/__init__.py
1 import socket
2 import time
3 import sys
4 import os
5 import re
6 import random
7 import struct
8 import pickle
9 import IMP
10 from IMP.parallel import workerstate
11 from IMP.parallel.subproc import _run_background, _Popen4
12 from IMP.parallel.util import _ListenSocket, _ErrorWrapper
13 from IMP.parallel.util import _TaskWrapper, _HeartBeat, _ContextWrapper
14 from IMP.parallel.util import _SetPathAction
15 
16 # Save sys.path at import time, so that workers can import using the same
17 # path that works for the manager imports
18 _import_time_path = sys.path[:]
19 
20 
22  """Base class for all errors specific to the parallel module"""
23  pass
24 
25 
26 class _NoMoreTasksError(Error):
27  pass
28 
29 
31  """Error raised if all workers failed, so tasks cannot be run"""
32  pass
33 
34 
36  """Error raised if a problem occurs with the network"""
37  pass
38 
39 
41  """Error raised if a worker has an unhandled exception"""
42  def __init__(self, exc, traceback, worker):
43  self.exc = exc
44  self.traceback = traceback
45  self.worker = worker
46 
47  def __str__(self):
48  errstr = str(self.exc.__class__).replace("exceptions.", "")
49  return "%s: %s from %s\nRemote traceback:\n%s" \
50  % (errstr, str(self.exc), str(self.worker), self.traceback)
51 
52 
53 class _Communicator:
54  """Simple support for sending Python pickled objects over the network"""
55 
56  # Number of bytes used to pack the length of a string
57  _slen_size = struct.calcsize('!i')
58 
59  def __init__(self):
60  self._socket = None
61  self._ibuffer = b''
62 
63  def _send(self, obj):
64  s = pickle.dumps(obj, -1)
65  self._socket.sendall(struct.pack('!i', len(s)) + s)
66 
67  def get_data_pending(self):
68  return len(self._ibuffer) > 0
69 
70  def _recv(self):
71  while True:
72  try:
73  obj, self._ibuffer = self._unpickle(self._ibuffer)
74  if isinstance(obj, _ErrorWrapper):
75  raise RemoteError(obj.obj, obj.traceback, self)
76  else:
77  return obj
78  except (IndexError, EOFError):
79  try:
80  data = self._socket.recv(4096)
81  except socket.error as detail:
82  raise NetworkError("Connection lost to %s: %s"
83  % (str(self), str(detail)))
84  if len(data) > 0:
85  self._ibuffer += data
86  else:
87  raise NetworkError("%s closed connection" % str(self))
88 
89  def _unpickle(self, ibuffer):
90  if len(ibuffer) < self._slen_size:
91  raise IndexError()
92  slen, = struct.unpack_from('!i', ibuffer)
93  if self._slen_size + slen > len(ibuffer):
94  raise IndexError()
95  return (pickle.loads(ibuffer[self._slen_size: self._slen_size + slen]),
96  ibuffer[self._slen_size + slen:])
97 
98 
99 class Worker(_Communicator):
100  """Representation of a single worker.
101  Each worker uses a single thread of execution (i.e. a single CPU core)
102  to run tasks sequentially.
103  Worker is an abstract class; instead of using this class directly, use
104  a subclass such as LocalWorker or SGEQsubWorkerArray."""
105 
106  def __init__(self):
107  _Communicator.__init__(self)
108  self._state = workerstate.init
109  self._context = None
110  self._task = None
111  self.update_contact_time()
112 
113  def _start(self, command, unique_id, output):
114  """Start the worker running on the remote host; override in
115  subclasses"""
116  self._state = workerstate.started
117 
118  def _accept_connection(self, sock):
119  self._socket = sock
120  self._state = workerstate.connected
121  self.update_contact_time()
122 
123  def _set_python_search_path(self, path):
124  self._send(_SetPathAction(path))
125 
126  def update_contact_time(self):
127  self.last_contact_time = time.time()
128 
129  def get_contact_timed_out(self, timeout):
130  return (time.time() - self.last_contact_time) > timeout
131 
132  def _start_task(self, task, context):
133  if not self._ready_for_task(context) \
134  and not self._ready_for_task(None):
135  raise TypeError("%s not ready for task" % str(self))
136  if self._context != context:
137  self._context = context
138  self._send(_ContextWrapper(context._startup))
139  self._state = workerstate.running_task
140  self._task = task
141  self._send(_TaskWrapper(task))
142 
143  def _get_finished_task(self):
144  while True:
145  r = self._recv()
146  self.update_contact_time()
147  if isinstance(r, _HeartBeat):
148  if not self.get_data_pending():
149  return None
150  else:
151  break
152  task = self._task
153  task._results = r
154  self._task = None
155  self._state = workerstate.connected
156  return task
157 
158  def _kill(self):
159  task = self._task
160  self._task = None
161  self._context = None
162  self._state = workerstate.dead
163  return task
164 
165  def _ready_to_start(self):
166  return self._state == workerstate.init
167 
168  def _ready_for_task(self, context):
169  return self._state == workerstate.connected \
170  and self._context == context
171 
172  def _running_task(self, context):
173  return self._state == workerstate.running_task \
174  and self._context == context
175 
176 
178  """Representation of an array of workers.
179  This is similar to Worker, except that it represents a collection of
180  workers that are controlled together, such as a batch submission system
181  array job on a compute cluster.
182  Worker is an abstract class; instead of using this class directly, use
183  a subclass such as SGEQsubWorkerArray."""
184 
185  def _get_workers(self):
186  """Return a list of Worker objects contained within this array"""
187  pass
188 
189  def _start(self):
190  """Do any necessary startup after all contained Workers have started"""
191  pass
192 
193 
194 class LocalWorker(Worker):
195  """A worker running on the same machine as the manager."""
196 
197  def _start(self, command, unique_id, output):
198  Worker._start(self, command, unique_id, output)
199  cmdline = "%s %s" % (command, unique_id)
200  _run_background(cmdline, output)
201 
202  def __repr__(self):
203  return "<LocalWorker>"
204 
205 
206 class _SGEQsubWorker(Worker):
207  def __init__(self, array):
208  Worker.__init__(self)
209  self._jobid = None
210  self._array = array
211 
212  def _start(self, command, unique_id, output):
213  Worker._start(self, command, unique_id, output)
214  self._array._worker_started(unique_id, output, self)
215 
216  def __repr__(self):
217  jobid = self._jobid
218  if jobid is None:
219  jobid = '(unknown)'
220  return "<SGE qsub worker, ID %s>" % jobid
221 
222 
224  """An array of workers on a Sun Grid Engine system, started with 'qsub'.
225  To use this class, the manager process must be running on a machine that
226  can submit Sun Grid Engine (SGE) jobs using the 'qsub' command (this
227  is termed a 'submit host' by SGE). The class starts an SGE job array
228  (every worker has the same SGE job ID, but a different task ID).
229  """
230 
231  standard_options = '-j y -cwd -r n -o sge-errors'
232 
233  def __init__(self, numworker, options):
234  """Constructor.
235  @param numworker The number of workers, which corresponds to the
236  number of tasks in the SGE job.
237  @param options A string of SGE options that are passed on the
238  'qsub' command line. This is added to standard_options.
239  """
240  self._numworker = numworker
241  self._options = options
242  self._starting_workers = []
243  self._jobid = None
244 
245  def _get_workers(self):
246  """Return a list of Worker objects contained within this array"""
247  return [_SGEQsubWorker(self) for x in range(self._numworker)]
248 
249  def _worker_started(self, command, output, worker):
250  self._starting_workers.append((command, output, worker))
251 
252  def _start(self, command):
253  qsub = "qsub -S /bin/sh %s %s -t 1-%d" % \
254  (self._options, self.standard_options,
255  len(self._starting_workers))
256  print(qsub)
257  a = _Popen4(qsub)
258  (inp, out) = (a.stdin, a.stdout)
259  worker_uid = " ".join([repr(s[0]) for s in self._starting_workers])
260  worker_out = " ".join([repr(s[1]) for s in self._starting_workers])
261  inp.write("#!/bin/sh\n")
262  inp.write("uid=( '' %s )\n" % worker_uid)
263  inp.write("out=( '' %s )\n" % worker_out)
264  inp.write("myuid=${uid[$SGE_TASK_ID]}\n")
265  inp.write("myout=${out[$SGE_TASK_ID]}\n")
266  inp.write("%s $myuid > $myout 2>&1\n" % command)
267  inp.close()
268  outlines = out.readlines()
269  out.close()
270  for line in outlines:
271  print(line.rstrip('\r\n'))
272  a.require_clean_exit()
273  self._set_jobid(outlines)
274  self._starting_workers = []
275 
276  def _set_jobid(self, outlines):
277  """Try to figure out the job ID from the SGE qsub output"""
278  if len(outlines) > 0:
279  m = re.compile(r"\d+").search(outlines[0])
280  if m:
281  self._jobid = int(m.group())
282  for (num, worker) in enumerate(self._starting_workers):
283  worker[2]._jobid = "%d.%d" % (self._jobid, num+1)
284 
285 
286 class _SGEPEWorker(Worker):
287  def __init__(self, host):
288  Worker.__init__(self)
289  self._host = host
290 
291  def _start(self, command, unique_id, output):
292  Worker._start(self, command, unique_id, output)
293  cmdline = "qrsh -inherit -V %s %s %s" % (self._host, command,
294  unique_id)
295  _run_background(cmdline, output)
296 
297  def __repr__(self):
298  return "<SGE PE worker on %s>" % self._host
299 
300 
302  """An array of workers in a Sun Grid Engine system parallel environment.
303  In order to use this class, the manager must be run via
304  Sun Grid Engine's 'qsub' command and submitted to a parallel
305  environment using the qsub -pe option. This class will start workers
306  on every node in the parallel environment (including the node running
307  the manager). Each worker is started using the 'qrsh' command with
308  the '-inherit' option."""
309 
310  def _get_workers(self):
311  workers = []
312 
313  pe = os.environ['PE_HOSTFILE']
314  fh = open(pe, "r")
315  while True:
316  line = fh.readline()
317  if line == '':
318  break
319  (node, num, queue) = line.split(None, 2)
320  for i in range(int(num)):
321  workers.append(_SGEPEWorker(node))
322  # Replace first worker with a local worker, as this is ourself, and SGE
323  # won't let us start this process with qrsh (as we are already
324  # occupying the slot)
325  if len(workers) > 0:
326  workers[0] = LocalWorker()
327  return workers
328 
329 
330 class Context:
331  """A collection of tasks that run in the same environment.
332  Context objects are typically created by calling Manager::get_context().
333  """
334  def __init__(self, manager, startup=None):
335  """Constructor."""
336  self._manager = manager
337  self._startup = startup
338  self._tasks = []
339 
340  def add_task(self, task):
341  """Add a task to this context.
342  Tasks are any Python callable object that can be pickled (e.g. a
343  function or a class that implements the \_\_call\_\_ method).
344  When the task is run on the worker its arguments are the return
345  value from this context's startup function."""
346  self._tasks.append(task)
347 
348  def get_results_unordered(self):
349  """Run all of the tasks on available workers, and return results.
350  If there are more tasks than workers, subsequent tasks are
351  started only once a running task completes: each worker only runs
352  a single task at a time. As each task completes, the return value(s)
353  from the task callable are returned from this method, as a
354  Python generator. Note that the results are returned in the order
355  that tasks complete, which may not be the same as the order they
356  were submitted in.
358  @exception NoMoreWorkersError there are no workers available
359  to run the tasks (or they all failed during execution).
360  @exception RemoteError a worker encountered an unhandled exception.
361  @exception NetworkError the manager lost (or was unable to
362  establish) communication with any worker.
363  """
364  return self._manager._get_results_unordered(self)
365 
366 
367 class Manager:
368  """Manages workers and contexts.
369  """
370 
371  connect_timeout = 7200
372 
373  # Note: must be higher than that in worker_handler._HeartBeatThread
374  heartbeat_timeout = 7200
375 
376  def __init__(self, python=None, host=None, output='worker%d.output'):
377  """Constructor.
378  @param python If not None, the command to run to start a Python
379  interpreter that can import the IMP module. Otherwise,
380  the same interpreter that the manager is currently
381  using is used. This is passed to the shell, so a full
382  command line (including multiple words separated by
383  spaces) can be used if necessary.
384  @param host The hostname that workers use to connect back to the
385  manager. If not specified, the manager machine's
386  primary IP address is used. On multi-homed machines,
387  such as compute cluster headnodes, this may need to be
388  changed to allow all workers to reach the manager
389  (typically the name of the machine's internal network
390  address is needed). If only running local workers,
391  'localhost' can be used to prohibit connections
392  across the network.
393  @param output A format string used to name worker output files.
394  It is given the numeric worker id, so for example the default
395  value 'worker\%d.output' will yield output files called
396  worker0.output, worker1.output, etc.
397  """
398  if python is None:
399  self._python = sys.executable
400  else:
401  self._python = python
402  self._host = host
403  self._output = output
404  self._all_workers = []
405  self._starting_workers = {}
406  self._worker_arrays = []
407  if host:
408  self._host = host
409  else:
410  # Get primary IP address of this machine
411  self._host = socket.gethostbyname_ex(socket.gethostname())[-1][0]
412  self._listen_sock = _ListenSocket(self._host, self.connect_timeout)
413 
414  def add_worker(self, worker):
415  """Add a Worker object."""
416  if hasattr(worker, '_get_workers'):
417  self._worker_arrays.append(worker)
418  else:
419  self._all_workers.append(worker)
420 
421  def get_context(self, startup=None):
422  """Create and return a new Context in which tasks can be run.
423  @param startup If not None, a callable (Python function or class
424  that implements the \_\_call\_\_ method) that sets up
425  the worker to run tasks. This method is only called
426  once per worker. The return values from this method
427  are passed to the task object when it runs on
428  the worker.
429  @return A new Context object.
430  """
431  return Context(self, startup)
432 
433  def _get_results_unordered(self, context):
434  """Run all of a context's tasks, and yield results"""
435  self._send_tasks_to_workers(context)
436  try:
437  while True:
438  for task in self._get_finished_tasks(context):
439  tasks_queued = len(context._tasks)
440  yield task._results
441  # If the user added more tasks while processing these
442  # results, make sure they get sent off to the workers
443  if len(context._tasks) > tasks_queued:
444  self._send_tasks_to_workers(context)
445  except _NoMoreTasksError:
446  return
447 
448  def _start_all_workers(self):
449  for array in self._worker_arrays:
450  self._all_workers.extend(array._get_workers())
451 
452  command = \
453  ("%s -c \"import IMP.parallel.worker_handler as s; s.main()\""
454  " %s %d") % (self._python, self._host, self._listen_sock.port)
455 
456  for (num, worker) in enumerate(self._all_workers):
457  if worker._ready_to_start():
458  unique_id = self._get_unique_id(num)
459  self._starting_workers[unique_id] = worker
460  worker._start(command, unique_id, self._output % num)
461 
462  for array in self._worker_arrays:
463  array._start(command)
464  self._worker_arrays = []
465 
466  def _get_unique_id(self, num):
467  id = "%d:" % num
468  for i in range(0, 8):
469  id += chr(random.randint(0, 25) + ord('A'))
470  return id
471 
472  def _send_tasks_to_workers(self, context):
473  self._start_all_workers()
474  # Prefer workers that already have the task context
475  available_workers = \
476  [a for a in self._all_workers if a._ready_for_task(context)] + \
477  [a for a in self._all_workers if a._ready_for_task(None)]
478  for worker in available_workers:
479  if len(context._tasks) == 0:
480  break
481  else:
482  self._send_task_to_worker(worker, context)
483 
484  def _send_task_to_worker(self, worker, context):
485  if len(context._tasks) == 0:
486  return
487  t = context._tasks[0]
488  try:
489  worker._start_task(t, context)
490  context._tasks.pop(0)
491  except socket.error:
492  worker._kill()
493 
494  def _get_finished_tasks(self, context):
495  while True:
496  events = self._get_network_events(context)
497  if len(events) == 0:
498  self._kill_all_running_workers(context)
499  for event in events:
500  task = self._process_event(event, context)
501  if task:
502  yield task
503 
504  def _process_event(self, event, context):
505  if event == self._listen_sock:
506  # New worker just connected
507  (conn, addr) = self._listen_sock.accept()
508  self._accept_worker(conn, context)
509  elif event._running_task(context):
510  try:
511  task = event._get_finished_task()
512  if task:
513  self._send_task_to_worker(event, context)
514  return task
515  else: # the worker sent back a heartbeat
516  self._kill_timed_out_workers(context)
517  except NetworkError as detail:
518  task = event._kill()
519  print("Worker %s failed (%s): rescheduling task %s"
520  % (str(event), str(detail), str(task)))
521  context._tasks.append(task)
522  self._send_tasks_to_workers(context)
523  else:
524  pass # Worker not running a task
525 
526  def _kill_timed_out_workers(self, context):
527  timed_out = [a for a in self._all_workers if a._running_task(context)
528  and a.get_contact_timed_out(self.heartbeat_timeout)]
529  for worker in timed_out:
530  task = worker._kill()
531  print("Did not hear from worker %s in %d seconds; rescheduling "
532  "task %s" % (str(worker), self.heartbeat_timeout, str(task)))
533  context._tasks.append(task)
534  if len(timed_out) > 0:
535  self._send_tasks_to_workers(context)
536 
537  def _kill_all_running_workers(self, context):
538  running = [a for a in self._all_workers if a._running_task(context)]
539  for worker in running:
540  task = worker._kill()
541  context._tasks.append(task)
542  raise NetworkError("Did not hear from any running worker in "
543  "%d seconds" % self.heartbeat_timeout)
544 
545  def _accept_worker(self, sock, context):
546  sock.setblocking(True)
547  identifier = sock.recv(1024)
548  if identifier:
549  identifier = identifier.decode('ascii')
550  if identifier and identifier in self._starting_workers:
551  worker = self._starting_workers.pop(identifier)
552  worker._accept_connection(sock)
553  print("Identified worker %s " % str(worker))
554  self._init_worker(worker)
555  self._send_task_to_worker(worker, context)
556  return worker
557  else:
558  print("Ignoring request from unknown worker")
559 
560  def _init_worker(self, worker):
561  if _import_time_path[0] != '':
562  worker._set_python_search_path(_import_time_path[0])
563  if sys.path[0] != '' and sys.path[0] != _import_time_path[0]:
564  worker._set_python_search_path(sys.path[0])
565 
566  def _get_network_events(self, context):
567  running = [a for a in self._all_workers if a._running_task(context)]
568  if len(running) == 0:
569  if len(context._tasks) == 0:
570  raise _NoMoreTasksError()
571  elif len(self._starting_workers) == 0:
572  raise NoMoreWorkersError("Ran out of workers to run tasks")
573  # Otherwise, wait for starting workers to connect back and
574  # get tasks
575 
576  return IMP.parallel.util._poll_events(
577  self._listen_sock, running, self.heartbeat_timeout)
578 
579 
580 __version__ = "20250121.develop.330bebda01"
581 
583  '''Return the version of this module, as a string'''
584  return "20250121.develop.330bebda01"
585 
586 def get_module_name():
587  '''Return the fully-qualified name of this module'''
588  return "IMP::parallel"
589 
590 def get_data_path(fname):
591  '''Return the full path to one of this module's data files'''
592  import IMP
593  return IMP._get_module_data_path("parallel", fname)
594 
595 def get_example_path(fname):
596  '''Return the full path to one of this module's example files'''
597  import IMP
598  return IMP._get_module_example_path("parallel", fname)
Representation of a single worker.
Error raised if a worker has an unhandled exception.
An array of workers in a Sun Grid Engine system parallel environment.
def get_module_name
Return the fully-qualified name of this module.
Representation of an array of workers.
Subprocess handling.
Definition: subproc.py:1
def __init__
Constructor.
A collection of tasks that run in the same environment.
A worker running on the same machine as the manager.
Error raised if all workers failed, so tasks cannot be run.
def add_task
Add a task to this context.
Utilities for the IMP.parallel module.
Definition: parallel/util.py:1
def add_worker
Add a Worker object.
An array of workers on a Sun Grid Engine system, started with 'qsub'.
Base class for all errors specific to the parallel module.
The general base class for IMP exceptions.
Definition: exception.h:48
def get_module_version
Return the version of this module, as a string.
def get_example_path
Return the full path to one of this module's example files.
def get_context
Create and return a new Context in which tasks can be run.
def get_data_path
Return the full path to one of this module's data files.
Error raised if a problem occurs with the network.
def __init__
Constructor.
Distribute IMP tasks to multiple processors or machines.
def get_results_unordered
Run all of the tasks on available workers, and return results.
Manages workers and contexts.