IMP  2.0.1
The Integrative Modeling Platform
AbstractGrid.py
1 ##
2 ## The Inferential Structure Determination (ISD) software library
3 ##
4 ## Authors: Darima Lamazhapova and Wolfgang Rieping
5 ##
6 ## Copyright (C) Michael Habeck and Wolfgang Rieping
7 ##
8 ## All rights reserved.
9 ##
10 ## NO WARRANTY. This library is provided 'as is' without warranty of any
11 ## kind, expressed or implied, including, but not limited to the implied
12 ## warranties of merchantability and fitness for a particular purpose or
13 ## a warranty of non-infringement.
14 ##
15 ## Distribution of substantively modified versions of this module is
16 ## prohibited without the explicit permission of the copyright holders.
17 ##
18 
19 import os, sys, string
20 from threading import Thread, Event, Lock
21 from Queue import Queue
22 from fnmatch import fnmatch
23 import py_compile
24 
25 
26 
27 
28 from hosts import abspath, remote_mkdirs
29 from utils import atexit_register, atexit_unregister,Pipe,average
30 
31 
32 
33 class AbstractGrid(Thread):
34  """
35  Grid main functionality: manage services
36 
37  It contains list of servers, which provide services.
38  When instance is published in the grid, corresponding servers object
39  are created on each host and saved as list in the dictionary, with
40  a key corresponding to their service_id.
41 
42  Servers has part specific for communication method
43  (at the creation stage), thus have to be defined in derived classes.
44 
45  USAGE:
46 
47  g = DerivedGrid(..)
48  g.copy_files(isd_src_path, ['*.pyc', '*.so'])
49 
50  some_obj_instance = some_obj_class(..)
51 
52  service_id = g.publish(some_obj_instance)
53 
54  -> a) assigns some_service_id to this instance
55  b) creates servers that manage the execution of this instance
56  in remote hosts: servers[some_service_id] = [server1, server2, ...]
57 
58  proxy = g.acquire_service(service_id) ## returns proxy object
59 
60  proxy.set_parameters(p) # these functions will be performed sequentally
61  proxy.f() # and service won't be freed
62  proxy.g() # unless .release_service() is called
63 
64  g.release_service(proxy)
65 
66  """
67 
68  def __init__(self, hosts, src_path, display, X11_delay, debug, verbose, shared_temp_path):
69 
70  """
71  not shared_temp_path: temp_path == None
72  look for individual temp_paths in host list
73 
74  """
75 
76  Thread.__init__(self)
77 
78  self.verbose = verbose
79  self.debug = debug
80  self.debug_out = sys.stderr
81 
82  self.display = display
83  self.X11_delay = X11_delay
84  self.window_size = '30x10'
85 
86  ## list of Host objects
87 
88  self.hosts = hosts
89 
90  self.shared_temp_path = shared_temp_path
91 
92  self.src_files = {}
93 
94  #copy general files needed for all grids
95  self.initialise(src_path, ['AbstractGrid','utils','hosts','logfile'])
96 
97  ## key = service_id
98  ## self.servers[key] = list of servers providing this service
99  ## self.queues[key] = a queue with available servers for the service
100 
101  self.servers = {}
102  self.queues = {}
103 
104  atexit_register(self.terminate)
105 
106  def dp(self, str):
107 
108  print >>self.debug_out, str
109  self.debug_out.flush()
110 
111  def set_loader(self, src_path, loader_filename):
112  "sets the name of the compiled python file of the loader"
113 
114  if os.path.exists(os.path.join(src_path, '%s.pyo' % loader_filename)):
115  self.loader_filename = '%s.pyo' % loader_filename
116  else:
117  self.loader_filename = '%s.pyc' % loader_filename
118 
119  def set_debug(self, debug, verbose = None):
120 
121  self.debug = debug
122  if not verbose == None: self.verbose = verbose
123 
124  def publish(self, instance):
125  """
126  Publish instance on all the hosts of the grid
127  (specific for the communication method)
129  Returns: service_id
130 
131  Internally:
132 
133  It creates list of servers performing the job
134  specified by this service_id
135 
136  """
137  raise NotImplementedError
138 
139  def create_service_id(self, instance):
140 
141  return string.join( (instance.__class__.__name__, str(abs(id(instance)))), '.' )
142 
143  def add_server(self, server): # from AbstractISDGrid
144  """
145  (for internal use)
147  Registers the server within a grid
148 
149  1) register itself (grid) in the server
150  2) proxy must have _selfrelease attribute to be able
151  to release service (i.e. put the connected to it server
152  in the .queues[service_id]), once the last function
153  call before invoking .release_service() has finished
154  calculations
155 
156  self.queues: a dict[DerivedServer.service_id]=Queue(-1) containing the DerivedServers
157  self.servers: dict[DerivedServer.service_id]=[list of DerivedServer instances]
158  """
159 
160  server.grid = self
161  server.proxy.__dict__['_selfrelease'] = False
162 
163  service_id = server.service_id
164 
165  if not service_id in self.queues:
166  self.queues[service_id] = Queue(-1)
167  self.servers[service_id] = []
168 
169  self.queues[service_id].put(server)
170  self.servers[service_id].append(server)
171 
172  if self.debug:
173  self.dp('AbstractGrid.add_server: %s' % server)
174 
175  def find_server(self, service_id):
176  """
177  (for internal use)
178 
179  Selects a server that is capable of executing
180  the requested service. If they are all busy,
181  waits until there will be one available.
182 
183  """
184 
185  if not service_id in self.queues:
186  raise StandardError, 'Service type "%s" not known.' % service_id
187 
188  queue = self.queues[service_id]
189 
190  ## if there is no servers in the queue, waits until it is in the queue
191  server = queue.get()
192 
193  return server
194 
195  def acquire_service(self, service_id):
196  """
197  For a given service, it finds available server and
198  returns corresponding proxy worker.
200  """
201 
202  server = self.find_server(service_id)
203  server.jobs += 1
204  server.acquired = True
205 
206  if self.debug and self.verbose:
207  msg = 'Grid.acquire_service: service "%s" on host "%s" acquired [%s]'
208  self.dp( msg % (service_id, server.host, server.proxy) )
209 
210  return server.proxy
211 
212  def release_service(self, proxy):
213  """
214  Used to inform server/proxy that we are done using the service,
215  and it should be released, when the last funciton call of the
216  proxy is finished.
217 
218  However, it means that someone (the proxy or the grid)
219  will have to call _release_service() itself and also
220  restore ._selfrelease to the previos state (FALSE)
221 
222  """
223 
224  proxy._selfrelease = True
225 
226  def _release_service(self, proxy):
227 
228  ## we put server back in the queue only if it is still
229  ## in the list of grid servers (it may already be deleted from
230  ## self.servers, if it is planned to be terminated/deleted)
231 
232  proxy._server.acquired = False
233 
234  if proxy._server in self.servers[proxy._server.service_id]:
235  self.queues[proxy._server.service_id].put(proxy._server)
236 
237  if self.debug and self.verbose:
238  msg = 'Grid.release_service: service "%s" on host "%s" released [%s]'
239  self.dp( msg % (proxy._server.service_id, proxy._server.host, proxy) )
240 
241  def add_time(self, proxy, time):
242 
243  proxy._server.time.put(time)
244 
245  def copy_files(self, rootdir, filelist, hosts = None,
246  exclude = ['cns/','.svn/']):
247  """
248  (for internal use)
249 
250  Copies source files to a common folder so that
251  they were accessible to the grid on remote hosts
252 
253  g = DerivedGrid(...)
254 
255  src_path = os.path.join(os.environ['ISD_ROOT'],'/src/py')
256 
257  from compileall import compile_dir
258  compile_dir(src_path, maxlevels=0, force=True, quiet=True)
259 
260  g.copy_files(src_path, ['*.pyc','*.so'])
261 
262  The method will replicate the directory structure of
263  items in the filelist, e.g. will create data folder
264  in the destination for
265 
266  g.copy_files(src_path, ['data/*.txt'])
267 
268  also, it will copy all subfolders matching the required name, i.e. it
269  will copy all files called foo.txt from all existing subfolders it can
270  find
271 
272  g.copy_files(src_path, ['Isd/foo.txt'])
273  will copy files like 'Isd/foo.txt', 'Isd/a/foo.txt', 'Isd/a/b/c/foo.txt' etc.
274 
275  the exclude keyword excludes paths containing the given strings.
276 
277  """
278 
279  if hosts == None: hosts = self.hosts
280  elif type(hosts).__name__ == 'Host': hosts = [hosts]
281 
282  if self.debug:
283 
284  print 'AbstractGrid.copy_files: filelist = %s' % filelist
285 
286  if self.shared_temp_path:
287  print 'AbstractGrid.copy_files: to %s' % self.hosts[0].temp_path
288  else:
289  for host in hosts:
290  print 'AbstractGrid.copy_files: to %s on %s' % (host.temp_path, host.name)
291 
292  subroots = {}
293 
294  #create necessary folders and store files to be copied in a
295  #dictionnary: subroots['path/to']=[list, of, files]
296  cwd = os.getcwd()
297  os.chdir(rootdir)
298  try:
299 
300  for f in filelist:
301 
302  folder, pattern = os.path.split(f)
303  if folder=='': folder = './'
304  for root,useless,filematch in os.walk(folder):
305 
306  for file in filematch:
307  fullfile = os.path.join(rootdir, root, file)
308  if fnmatch(file, pattern) and (
309  not False in [fullfile.find(ex) < 0 for ex in exclude] ):
310  if not root in subroots:
311  self.create_subroot(root)
312  subroots[root]=[]
313  subroots[root].append(os.path.join(rootdir, root, file))
314 
315  for subroot, subfilelist in subroots.items():
316 
317  _from = string.join(subfilelist)
318 
319  if self.shared_temp_path:
320 
321  _to = os.path.join(self.hosts[0].temp_path, subroot)
322  os.system('cp %s %s' % (_from, _to))
323 
324  else:
325 
326  for host in hosts:
327 
328  _to = os.path.join(host.temp_path, subroot)
329 
330  os.system('scp -r %s %s:%s > /dev/null' % (_from, host.name, _to))
331 
332  if self.debug:
333  print 'Host %s: done. (%s)' % (host.name, _from)
334  self.copied_files = subroots
335  try:
336  self.others = {'rootdir':rootdir,'root':root, 'files':files}
337  except:
338  pass
339 
340  finally:
341  os.chdir(cwd)
342 
343  def create_subroot(self, subroot):
344 
345  if self.shared_temp_path:
346 
347  tempdir = os.path.join( self.hosts[0].temp_path, subroot)
348 
349  if not os.path.exists(tempdir):
350  os.makedirs(tempdir)
351 
352  else:
353 
354  for host in self.hosts:
355 
356  tempdir = abspath(os.path.join(host.temp_path, subroot))
357 
358  if not tempdir in self.temp_paths[host.name]:
359 
360  remote_mkdirs(host, tempdir, False)
361 
362  while not tempdir in self.temp_paths[host.name]:
363 
364  self.temp_paths[host.name].append(tempdir)
365  tempdir = os.path.split(tempdir)[0]
366 
367  def initialise(self, src_path, src_files):
368  """
369  Initialise the grid:
370  Create temp paths if needed, either shared or remotely on each host
371  Copy source files to this folder
372  """
373 
374  ## create temporary directories if needed
375 
376  if self.shared_temp_path:
377  if not os.path.exists(self.hosts[0].temp_path):
378  os.makedirs(self.hosts[0].temp_path)
379 
380  else:
381  if not hasattr(self, 'temp_paths'): self.temp_paths = {}
382 
383  for host in self.hosts:
384  if not host.name in self.temp_paths:
385 
386  remote_mkdirs(host, host.temp_path, self.debug)
387  self.temp_paths[host.name] = [host.temp_path,] ## saves folders created remotely
388 
389  ## copy source files
390 
391  compiled_files = []
392 
393  for f in src_files:
394  py_compile.compile(os.path.join(src_path, '%s.py' % f))
395 
396  if os.path.exists(os.path.join(src_path, '%s.pyo' % f)):
397  compiled_files += ['%s.pyo' % f]
398  else:
399  compiled_files += ['%s.pyc' % f]
400 
401  self.copy_files(src_path, compiled_files)
402 
403  def ishalted(self):
404  raise NotImplementedError
405 
406  def terminate(self, service_id = None):
407 
408  ## if the grid is already terminated (e.g. by hands)
409  ## then function is not called on the exit
410 
411  if self.ishalted(): return
412 
413  atexit_unregister( self.terminate )
414 
415  if service_id == None: service_ids = self.servers.keys()
416  else: service_ids = [service_id]
417 
418  for service_id in service_ids:
419 
420  servers = self.servers[service_id]
421 
422  while len(servers) > 0:
423 
424  if self.debug:
425  print 'AbstractGrid.terminate: terminating %s' % servers[-1]
426 
427  ## 1. prevents server from being acquired
428  server = servers.pop()
429 
430  ## 2. terminates server (and corresponding proxy) if needed
431  server.terminate()
432 
433  if self.debug: ## PLEASE: keep debug statement. PLEASE!
434  print 'AbstractGrid: terminated.'
435 
436 class Server:
437  """
438  Contains all the information that is required
439  to use a remote object (alone and within a grid)
440 
441  Also it does all the job to launch the instance on remote side
442  and wrap it within a Grid specific Proxy object
443 
444  1) self.service_id
445  2) self.proxy
446 
447  3) self.host
448 
449  """
450 
451  def __init__(self, proxy, service_id, host, debug):
452 
453  self.debug = debug
454  self.debug_out = sys.stdout
455 
456  self.proxy = proxy
457  self.service_id = service_id
458  self.host = host
459 
460  ## register itself (server) in the proxy
461 
462  self.proxy.__dict__['_server'] = self
463 
464  ## variables for load balancing
465 
466  self.time = Pipe(50)
467  self.time.put(0.)
468  self.jobs = 0 ## (log) number of jobs done by the Server
469  self.acquired = False ## (log)
470 
471  def dp(self, str):
472 
473  print >>self.debug_out, str
474  self.debug_out.flush()
475 
476  def terminate(self):
477  """
478  It terminates self.proxy if needed
479 
480  """
481  raise NotImplementedError
482 
483  def __str__(self):
484 
485  s = '%s(busy=%s, <time>=%.2f, jobs=%d, host="%s", proxy="%s"' \
486  % (self.__class__.__name__, self.acquired, average(self.time), self.jobs, \
487  self.host.name, self.proxy)
488 
489  return s
490 
491  __repr__ = __str__
492 
493 class TimeoutError(Exception):
494  pass
495 
496 class Result(object):
497  """
498  A Result object is returned by a Proxy object, when some remote
499  function invoked.
500 
501  When results of the remote function calculations are ready,
502  the .event is set and the result of calculations is stored
503  in the .value field. These has to be done externally,
504  for example in the Proxy object.
505 
506  USAGE:
507 
508  result = Result()
509 
510  ## assigning result value
511 
512  result.value = result of the function call
513  result.event.set()
514 
515  ## collecting the result
516 
517  result.get(timeout = 60)
518 
519  """
520 
521  def __init__(self, proxy):
522 
523  self.event = Event()
524 
525  ## register proxy in the Result object it has produced
526 
527  self.proxy = proxy ## used in FBGrid, in PyroGrid for debugging only
528 
529  def get(self, timeout = None):
530 
531  self.event.wait(timeout)
532 
533  if timeout is not None:
534 
535  if not self.event.isSet():
536 
537  raise TimeoutError
538 
539  return self.value
540 
541  def __str__(self):
542 
543  sx = []
544  if hasattr(self, 'proxy'): sx += ['proxy="%s"' % self.proxy]
545  if hasattr(self, 'value'): sx += ['value=%s' % self.value]
546 
547  s = '%s(%s)' % ( self.__class__.__name__, string.join(sx, ',') )
548 
549  return s
550 
551  def __del__(self):
552  pass
553 
554  __repr__ = __str__
555 
556 class AbstractService(object):
557  """
558  Wrapper around Grid services, facilitating non parameter-specific
559  usage of the remote objects.
560 
561  AbstractService resides on the local side and is parameter-specific.
562  It provides the interface for remote services using grid.
563 
564  self._set_methods - are used to set up parameters specific to the
565  local service to the remote objects
566 
567  self._get_methods - are used to get updated parameters from remote side
568 
569  self._parameters - parameters that have to be set remotely
570  1. kept on local side
571  2. sets to the remote object before each call
572 
573  Example:
574 
575  heatbath = GridService(grid, service_id)
576  heatbath.parameters = {'T': 30, etc}
577  heatbath.set_methods = {'T': 'set_temperature' }
578  heatbath.get_methods = {'T': 'get_temperature' }
579  heatbath.generate_sample(x,y,z)
580 
581  """
582 
583  def __init__(self, grid, service_id, debug = False,
584  return_remote_attributes = False):
585 
586  object.__init__(self)
587 
588  if not service_id in grid.queues:
589  raise StandardError, 'Service type "%s" not known in the grid' % service_id
590 
591  self.service_id = service_id
592  self.debug = debug
593 
594  self._set_methods = None
595  self._get_methods = None
596  self._parameters = {}
597 
598  self.__proxy_lock = Lock() ## protects PyroProxy from overwriting
599 
600  self.__parameters_last = {} ## results with parameters from previous call
601  self.__parameters_lock = Lock()
602 
603  self.__return_remote_attributes = return_remote_attributes
604 
605  self.grid = grid
606 
607  def get_parameter(self, attr_name):
608  """
609  Makes sure that the parameters requested (locally)
610  were updated before being returned
611 
612  Example: (in derived class)
613 
614  >> def get_temperature(self):
615  >> return self.get_parameter('T')
616 
617  """
618  if self._get_methods is not None:
619  self.__get_parameters()
620 
621  return self._parameters[attr_name]
622 
623  def __get_parameters(self):
624 
625  self.__parameters_lock.acquire()
626 
627  for attr_name in self.__parameters_last:
628  self._parameters[attr_name] = self.__parameters_last[attr_name].get()
629 
630  self.__parameters_last = {}
631 
632  def __proxy_acquire(self):
633 
634  if self.debug:
635  print 'AbstractService.__proxy_acquire(): %s' % (self.service_id)
636 
637  self.__proxy_lock.acquire()
638 
639  self.proxy = self.grid.acquire_service(self.service_id)
640 
641  def __proxy_release(self):
642 
643  if self.debug:
644  print 'AbstractService.__proxy_release(): ' % (self.proxy)
645 
646  self.grid.release_service(self.proxy)
647  self.proxy = None
648 
649  self.__proxy_lock.release()
650 
651  def __proxy_release_now(self):
652 
653  self.grid._release_service(self.proxy)
654  self.proxy = None
655 
656  self.__proxy_lock.release()
657 
658  def __getattr__(self, name):
659 
660  # use these mechanism only when __init__() is completed
661  # all the new attribute calls and declarations will go through
662  # to remote methods
663 
664  if not 'grid' in self.__dict__:
665  return getattr(object, name)
666 
667  self.__proxy_acquire()
668 
669  if self.debug:
670  print 'AbstractService.__getattr__(%s): acquired %s [%s]' % (name, self.proxy, id(self))
671 
672  try:
673  attr = getattr(self.proxy, name)
674 
675  except:
676  self.__proxy_release_now()
677  raise
678 
679  if callable(attr):
680 
681  if self.debug:
682  print 'AbstractService.__getattr__(%s): is callable' % name
683 
684  return lambda *args: self.__call_method(name, *args)
685 
686  else:
687  if self.debug:
688  print 'AbstractService.__getattr__(%s): is not callable' % name
689 
690  if self.__return_remote_attributes:
691 
692  attr = getattr(self.proxy, name)
693 
694  self.__proxy_release_now()
695 
696  return attr
697 
698  else:
699  raise 'AbstractService must not return remote attributes ' +\
700  '(because remote workers are parameter not specific)'
701 
702  def __call_method(self, name, *args):
703 
704  if self.debug:
705  print 'AbstractService.__call_method: ', self.proxy
706 
707  ## renew parameters from the results of the last call
708 
709  if self._get_methods is not None:
710 
711  if self.debug:
712  print 'AbstractService.__call_method(%s): renew local parameters [%s]' % (name, id(self))
713 
714  self.__get_parameters()
715 
716  self.__parameters_lock.acquire() ## now parameters may change
717 
718  ## set up remote parameters
719 
720  for attr_name, set_method in self._set_methods.items():
721  getattr(self.proxy, set_method)( self._parameters[attr_name] )
722 
723  ## actual call (immediately returns a Result object)
724 
725  if self.debug:
726  print 'AbstractService.__call_method(%s): actual call %s [%s]' % (name, self.proxy, id(self))
727 
728  try:
729  result = getattr(self.proxy, name)(*args)
730  result.proxy = self.proxy
731 
732  except:
733  self.__proxy_release()
734  raise
735 
736  ## requests for changed parameters and releases service
737  ## to the grid (when remote job is finished)
738 
739  if self._get_methods is not None:
740 
741  if self.debug:
742  print 'AbstractService.__call_method(%s): parameter call %s [%s]' % (name, self.proxy, id(self))
743 
744  t = Thread(target = self.__call_parameters, args = ())
745  t.start()
746 
747  else:
748  self.__proxy_release()
749 
750  if self.debug:
751  print 'AbstractService.__call_method(%s): returning result %s [%s]' % (name, self.proxy, id(self))
752 
753  return result
754 
755  def __call_parameters(self):
756 
757  ## waits until the remote job is finished and
758  ## then returns remote parameters in a Result
759  ## object (self.__parameters_last[attr_name])
760 
761  for attr_name, get_method in self._get_methods.items():
762  self.__parameters_last[attr_name] = getattr(self.proxy, get_method)()
763 
764  self.__parameters_lock.release()
765 
766  if self.debug:
767  print 'AbstractService.__call_parameters: releasing %s [%s]' % (self.proxy, id(self))
768 
769  ## releases service to the grid
770 
771  self.__proxy_release()