IMP  2.0.1
The Integrative Modeling Platform
FileBasedGrid.py
1 ##
2 ## The Inferential Structure Determination (ISD) software library
3 ##
4 ## Authors: Michael Habeck and Wolfgang Rieping
5 ##
6 ## Copyright (C) Michael Habeck and Wolfgang Rieping
7 ##
8 ## All rights reserved.
9 ##
10 ## NO WARRANTY. This library is provided 'as is' without warranty of any
11 ## kind, expressed or implied, including, but not limited to the implied
12 ## warranties of merchantability and fitness for a particular purpose or
13 ## a warranty of non-infringement.
14 ##
15 ## Distribution of substantively modified versions of this module is
16 ## prohibited without the explicit permission of the copyright holders.
17 ##
18 
19 import os
20 import time
21 import random
22 import cPickle
23 import shutil
24 import socket
25 
26 from AbstractGrid import AbstractGrid, Server, Result
27 from ro import *
28 from OrderedDict import OrderedDict
29 from utils import touch, my_glob, WatchDog, Pipe
30 
31 ## TODO: move auxiliary functions to a separate module
32 
33 def file_exists(name, timeout=None):
34 
35  name = os.path.expanduser(name)
36 
37  try:
38  f = open(name)
39  f.close()
40  return True
41 
42  except IOError:
43  return False
44 
45 def valid_timestamp(name, max_delay):
46 
47  try:
48  creation_time = os.stat(name).st_ctime
49  except:
50  return True
51 
52  sys_time = time.time()
53 
54  if creation_time-sys_time > 1.:
55  msg = 'Inconsistent clocks: filename=%s, timestamp=%.2f, system time=%.2f'
56  raise StandardError, msg % (name, creation_time, sys_time)
57 
58  return abs(creation_time-sys_time) < max_delay
59 
60 class FileBasedCommunicator:
61 
62  remote_shell = 'ssh'
63  polling_interval = 0.1
64  lock_max_delay = 10.
65 
66  def __init__(self, temp_path, tid=None, debug=False, nfs_care=False):
67 
68 
69  self.comm_path = os.path.join(temp_path, 'comm')
70  self.temp_path = temp_path
71 
72  if not os.path.exists(self.comm_path):
73  os.system('mkdir ' + self.comm_path)
74 
75  if tid is None:
76 
77  self.tid_counter = random.randint(0, 12345678)
78 
79  tid = self.tid_counter
80 
81  self.create_slot(tid)
82 
83  self.tid = tid
84 
85  self.message_id = 0
86 
87  self.data = OrderedDict()
88 
89  self.debug = debug
90 
91  self.nfs_care = nfs_care
92 
93  self.__stop = False
94 
95  def halt(self):
96  self.__stop = True
97 
98  def create_slot(self, tid):
99 
100  slot = '%s/%d' % (self.comm_path, tid)
101 
102  if not os.path.exists(slot):
103  os.system('mkdir ' + slot)
104 
105  def spawn(self, host_name, command, args, init_data, pipe = '',
106  X_forward = False):
107 
108  self.tid_counter += 1
109 
110  self.create_slot(self.tid_counter)
111 
112  ssh_cmd = self.remote_shell
113 
114  if X_forward:
115  ssh_cmd += ' -X'
116 
117  init_data['temp_path'] = self.temp_path
118  init_data['debug'] = self.debug
119  init_data['parent_tid'] = self.tid
120  init_data['tid'] = self.tid_counter
121  init_data['nfs_care'] = self.nfs_care
122 
123  args = ' '.join([str(x) for x in args])
124 
125  filename = os.path.join(self.temp_path, 'init_data.%d' % init_data['tid'])
126 
127  f = open(filename, 'w')
128  cPickle.dump(init_data, f)
129  f.close()
130 
131  sh_cmd = "%s %s '%s %s %s %s' &" % (ssh_cmd, host_name, command, args,
132  filename, pipe)
133 
134  os.system(sh_cmd)
135 
136  return self.tid_counter
137 
138  def load(self, filename):
139  "load contents of message file filename by unpickling it"
140 
141  debug_shown = False
142 
143  while file_exists(filename + '.lock'):
144 
145  if self.nfs_care and self.lock_max_delay is not None:
146 
147  if file_exists(filename) and not \
148  valid_timestamp(filename + '.lock', self.lock_max_delay):
149 
150  sys_time = time.time()
151 
152  try:
153  file_time = os.stat(filename + '.lock').st_ctime
154  except:
155  continue
156 
157  if self.debug:
158  msg = 'cPickle.load: lock file %s.lock has old timestamp: %d, system time is %d.'
159  print msg % (filename, file_time, sys_time)
160 
161  break
162 
163  if self.debug and not debug_shown:
164  print 'Waiting for message to be unlocked ...'
165  debug_shown = True
166 
167  time.sleep(self.polling_interval)
168 
169  try:
170 
171  f = open(filename)
172  _data = cPickle.load(f)
173  f.close()
174 
175  except IOError, err_msg:
176 
177  while 1:
178 
179  print 'Error accessing %s (%s). Error message was "%s". Trying again...' % \
180  (filename, time.ctime(), err_msg)
181 
182  time.sleep(60.)
183 
184  try:
185 
186  f = open(filename)
187  _data = cPickle.load(f)
188  f.close()
189 
190  break
191 
192  except IOError:
193  continue
194 
195  return _data
196 
197  def dump(self, op, filename):
198 
199 
200  try:
201 
202  f = open(filename + '.lock', 'w')
203  cPickle.dump(op, f)
204  f.flush()
205  f.close()
206 
207  except IOError, err_msg:
208 
209  while 1:
210 
211  print 'Error accessing %s (%s). Error message was "%s". Trying again...' % \
212  (filename, time.ctime(), err_msg)
213 
214  time.sleep(60.)
215 
216  try:
217 
218  f = open(filename + '.lock', 'w')
219  cPickle.dump(op, f, 1)
220  f.close()
221 
222  break
223 
224  except IOError:
225  continue
226 
227  os.link('%s.lock' % filename, '%s' % filename)
228  os.unlink('%s.lock' % filename)
229 
230  def dump_old(self, op, filename):
231 
232  try:
233 
234  f = open(filename + '.lock', 'w')
235  cPickle.dump(op, f)
236  f.flush()
237  f.close()
238 
239  except IOError, err_msg:
240 
241  while 1:
242  print 'Error accessing %s (%s). Error message was "%s". Trying again...' % \
243  (filename, time.ctime(), err_msg)
244 
245  time.sleep(60.)
246 
247  try:
248 
249  f = open(filename + '.lock', 'w')
250  cPickle.dump(op, f, 1)
251  f.close()
252 
253  break
254 
255  except IOError:
256  continue
257 
258  shutil.copyfile('%s.lock' % filename, '%s' % filename)
259  os.system('cat %s > /dev/null' % filename)
260  os.unlink('%s.lock' % filename)
261 
262  def register(self, _id, tid, msg, data):
263 
264  key = (tid, msg)
265 
266  if key in self.data:
267  self.data[key].append(data)
268  else:
269  self.data[key] = [data]
270 
271  def pop_message(self, tid, msg):
272 
273  key = (tid, msg)
274  data = self.data
275 
276  if key in data and data[key]:
277 
278  value = data[key].pop(0)
279 
280  if not data[key]:
281  del data[key]
282 
283  if self.debug:
284  print 'pop_message', tid, msg
285 
286  return tid, msg, value
287 
288  else:
289  return None
290 
291  def poll(self):
292  """check if new message arrived,
293  extract data and register to self.data construct,
294  remove message file
295  """
296 
297  filenames = self.comm_path + '/%d/msg.*.msg' % self.tid
298 
299  touch = self.nfs_care
300 
301  files = my_glob(filenames, touch)
302 
303  if len(files) and self.debug:
304  print 'poll: new message arrived:'
305 
306  data = []
307 
308  for f in files:
309 
310  sender, msg, _id = f.split('.')[-4:-1]
311 
312  _data = self.load(f)
313 
314  data.append((int(_id), int(sender), int(msg), _data))
315 
316  if self.debug:
317  print 'poll: sender=%s, msg=%s, id=%s, type=%s'%\
318  (sender, msg, _id,_data.__class__.__name__)
319 
320  os.unlink(f)
321 
322  data.sort(lambda a, b: cmp(a[0], b[0]))
323 
324  [self.register(*d) for d in data]
325 
326  def find_message(self, sender, msg):
327  """get message msg from sender sender, -1 stands for any.
328  returns (tid, msg, data) or None
329  """
330 
331  data = self.data
332 
333  if sender == -1:
334 
335  if msg == -1:
336 
337  if data:
338 
339  (tid, _msg), _data = data.get_first_item()
340 
341  return self.pop_message(tid, _msg)
342 
343  else:
344 
345  for (tid, _msg), _data in data.items():
346 
347  if _msg == msg:
348  return self.pop_message(tid, _msg)
349 
350  else:
351 
352  if msg == -1:
353 
354  for (tid, _msg), _data in data.items():
355 
356  if tid == sender:
357  return self.pop_message(tid, _msg)
358 
359  else:
360  return self.pop_message(sender, msg)
361 
362  return None
363 
364  def recv(self, sender, msg):
365  """get new message from sender, wait if necessary
366  returns (tid, msg, data)
367  """
368 
369  if self.debug:
370  print 'recv (tid=%d): waiting for sender=%d, msg=%d' % \
371  (self.tid, sender, msg)
372 
373  result = None
374 
375  while not self.__stop:
376 
377  self.poll()
378 
379  result = self.find_message(sender, msg)
380 
381  if result is not None:
382  break
383 
384  time.sleep(self.polling_interval)
385 
386  if self.debug:
387  print 'received.'
388 
389  return result
390 
391  def send(self, receiver, msg, value):
392 
393  recv_path = self.comm_path + '/%d' % receiver
394 
395  while not os.path.exists(recv_path):
396 
397  print 'send: path %s does not exist. waiting ...' % recv_path
398 
399  time.sleep(self.polling_interval)
400 
401  time.sleep(self.polling_interval)
402 
403  filename = recv_path + '/msg.%d.%d.%d.msg' % (self.tid, msg, self.message_id)
404 
405  self.message_id += 1
406 
407  self.dump(value, filename)
408 
409  if self.debug:
410  print 'sent: receiver=%d, msg=%d, msg_id=%d' % (receiver, msg, self.message_id - 1)
411 
412 class FileBasedServer(Server):
413 
414  def terminate(self):
415 
416  if self.debug:
417  self.dp( 'FileBasedServer.terminate: terminating proxy %s' % self.proxy)
418 
419  self.grid.send(self.url, MSG_TERMINATE, None)
420 
421 class FileBasedRemoteObjectHandler(RemoteObjectHandler):
422 
423  def __init__(self, kill_on_error=0, signal_file='', temp_path='',
424  parent_tid=None, tid=None, debug=False, nfs_care=False):
425 
426 
427  RemoteObjectHandler.__init__(self, kill_on_error, signal_file, debug)
428 
429  ## create watchdog to check every 60 mins whether child processes
430  ## are still alive.
431 
432  self.watchdog = WatchDog(60, debug=debug)
433  self.watchdog.start()
434 
435  self.create_communicator(temp_path, tid, debug, nfs_care)
436 
437  self.parent_tid = parent_tid
438 
439  self.message_id = 0
440 
441  self.initialize()
442 
443  def create_communicator(self, temp_path, tid, debug, nfs_care):
444 
445  self.communicator = FileBasedCommunicator(temp_path, tid, debug,
446  nfs_care)
447 
448  def send(self, msg, value = None):
449  self.communicator.send(self.parent_tid, msg, value)
450 
451  def recv(self, msg):
452  return self.communicator.recv(self.parent_tid, msg)
453 
454  def initialize(self):
455  """wait for initialization request and initialize accordingly"""
456 
457  print 'Initializing...'
458 
459  tid, msg, vals = self.recv(MSG_INIT)
460 
461  if 'object' in vals:
462  self.set_object(vals['object'])
463 
464  if 'daemon' in vals:
465  self.daemon = vals['daemon']
466 
467  if 'expiration_time' in vals:
468  self.t_expire = vals['expiration_time']
469 
470  self.send(MSG_INIT_DONE)
471 
472  print 'Done.'
473 
474  def start(self):
475  """main request management loop. Head node sends commands which this
476  thread will execute"""
477 
478  print 'Ready.'
479 
480  self._terminate = False
481 
482  self.watchdog.set(time.time())
483 
484  while 1 and not self._terminate:
485 
486  tid, msg, data = self.recv(-1)
487 
488  if msg == MSG_STOP:
489  return
490 
491  if type(data) is not tuple:
492  data = (data,)
493 
494  method = self.bindings[msg]
495 
496  if self.kill_on_error:
497 
498  try:
499  method(*data)
500  self.watchdog.set(time.time())
501 
502  except:
503  self.send(MSG_CLIENT_CRASHED)
504  sys.exit(0)
505 
506  else:
507 
508  method(*data)
509  self.watchdog.set(time.time())
510 
511  if not self.debug:
512  print 'Grid has been halted.'
513  sys.exit(0)
514 
515  else:
516  print 'Debugging mode, keeping Python interpreter alive.'
517 
518 
519  def terminate(self, x=None):
520 
521  self._terminate = True
522 
523 class FileBasedRemoteObject(RemoteObject):
524  """the filebased proxy instance"""
525 
526  def _call_method(self, name, *args, **kw):
527 
528  #result = self.__manager.create_result_object(self.__handler_tid)
529 
530  result = self.__manager.create_result_object(self)
531 
532  result.info = {'name': name, 'args': args, 'kw': kw}
533 
534  value = (result.key, name, args, kw)
535 
536  self.__manager.send(self.__handler_tid, MSG_CALL_METHOD, value)
537 
538  return result
539 
540 class FileBasedGrid(AbstractGrid):
541 
542  def __init__(self, hosts, src_path, display, X11_delay, debug, verbose, nfs_care=False):
543 
544  AbstractGrid.__init__(self, hosts, src_path, display, X11_delay, debug, verbose, shared_temp_path=True)
545 
546  #copy all files in current dir to remote host.
547  if self.debug: print "initialising"
548  self.initialise(src_path, ['filebased_loader','ro',
549  'FileBasedGrid','OrderedDict'])
550 
551 
552  if self.debug: print "setting filebased_loader"
553  self.set_loader(src_path, 'filebased_loader')
554 
555  if self.debug: print "creating communicator"
556  self.create_communicator(nfs_care)
557 
558  self.results = {}
559  self.key = 0
560 
561  self.__stop = False
562  self.__stopped = False
563 
564  if self.debug:
565  print 'FileBasedGrid created: tid = ', self.communicator.tid
566 
567  def set_debug(self, debug):
568 
569  AbstractGrid.set_debug(self, debug)
570  self.communicator.debug = debug
571 
572  def create_communicator(self, nfs_care):
573 
574  self.communicator = FileBasedCommunicator(self.hosts[0].temp_path,
575  debug = self.debug, nfs_care = nfs_care)
576 
577  def publish(self, instance):
578  """
579  for each host of self.hosts
580  -launch a FileBasedRemoteObjectHandler through the filebased loader.
581  - pickle the instance object and send it to the handler.
582  setting an attribute of the proxy results in a message being sent to
583  the concerned host.
584  - create a FileBasedServed for the proxy
585  - add it to the queue self.queues[sid]
586  and to the list self.servers[sid]
587  and set server.grid and server.proxy
588  returns the service id associated to this instance.
589 
590  note from the authors
591  In FileBasedGrid there is one to one correspondence between
592  a Server and a Handler...
593 
594  """
595 
596  if self.debug:
597  try:
598  print "publishing instance %s" % \
599  instance.__class__.__name__
600  except:
601  print "publishing instance"
602 
603  if self.debug: print " creating sevice id"
604  service_id = self.create_service_id(instance)
605 
606  for host in self.hosts:
607 
608  if self.debug: print " host ",host.name
609 
610  if self.debug: print " creating proxy"
611  proxy = self.create_proxy(instance, host, self.display, daemon = 1)
612 
613  if self.debug: print " creating FileBasedServer"
614  server = FileBasedServer(proxy, service_id, host, self.debug)
615 
616  if self.debug: print " proxy._get_url()"
617  server.url = proxy._get_url()
618 
619  if self.debug: print " adding server"
620  self.add_server(server)
621 
622  if self.display and self.X11_delay is not None:
623  time.sleep(self.X11_delay)
624 
625  return service_id
626 
627  def create_proxy(self, instance, host, display = 0, daemon = 0):
628  """
629  (copied from Grid, called from AbstractISDGrid.create_server)
630 
631  """
632 
633  if self.debug: print " creating handler"
634  handler_tid = self.create_handler(instance, host, display, daemon)
635 
636  if self.debug: print " creating proxy"
637  proxy = FileBasedRemoteObject(instance, handler_tid, manager = self)
638 
639  if self.debug:
640  print 'Connected: tid=%d' % handler_tid
641 
642  return proxy
643 
644  def create_handler(self, instance, host, display, daemon):
645 
646  d = {'object': instance,
647  'daemon': daemon}
648 
649  handler_tid = self.spawn_handler(host, display)
650 
651  if self.debug:
652  print 'Initialising service on host "%s"' % host.name
653 
654  self.send(handler_tid, MSG_INIT, d)
655 
656  if self.debug:
657  print 'MSG_INIT sent.'
658 
659  return handler_tid
660 
661  def spawn_handler(self, host, display):
662 
663  handler_script = os.path.join(self.hosts[0].temp_path,
664  self.loader_filename)
665 
666  init_data = {'niceness': host.niceness,
667  'display': display}
668 
669  argv = [handler_script]
670 
671  #add required init commands prior to launching anything else on the target host.
672  if host.init_cmd != '':
673  if host.init_cmd.rstrip().endswith(';'):
674  command = host.init_cmd
675  elif host.init_cmd.rstrip().endswith('!'):
676  command = host.init_cmd.rstrip()[:-1]
677  else:
678  command = host.init_cmd + ';'
679  else:
680  command = ''
681 
682  if display:
683 
684  if type(display) is type(0):
685 
686  master_name = socket.gethostname()
687 
688  if host.name == master_name:
689  display = ':0.0'
690  else:
691  display = master_name + ':0.0'
692 
693  command += 'xterm'
694 
695  argv = ['-title', host.name,
696  '-geometry', self.window_size,
697  '-hold',
698  '-e',
699  host.python, '-i'] + argv
700 
701  pipe = ''
702 
703  else:
704  command += host.python
705  pipe = '> /dev/null'
706 
707  if self.debug:
708  print 'Spawning service on host "%s"' % host.name
709 
710  X_forward = display
711 
712  tid = self.communicator.spawn(host.name, command, argv,
713  init_data, pipe, X_forward)
714 
715  if self.debug:
716  print 'Service spawned: tid = ', tid
717 
718  return tid
719 
720  def recv(self, tid, msg):
721  return self.communicator.recv(tid, msg)
722 
723  def send(self, tid, msg, value = None):
724  self.communicator.send(tid, msg, value)
725 
726  def create_result_object(self, proxy):
727  """
728  Results has to be temporarily stored in the Grid
729  until their values are calculated
730 
731  """
732 
733  key = self.key
734  self.key += 1
735 
736  if key in self.results:
737  print 'Result object for key %d already exists.' % key
738 
739  #result = Result(tid, key, self)
740 
741  result = Result(proxy)
742 
743  result.key = key
744 
745  self.results[key] = result
746 
747  return result
748 
749  def run(self):
750 
751  self.removed = Pipe(100000)
752 
753  while not self.__stop:
754 
755  if os.path.exists(os.path.join(self.hosts[0].temp_path, 'quit')):
756 
757  [self.send(server.url, MSG_TERMINATE) for \
758  server in self.servers[service_id]]
759 
760  #self.halt()
761  self.terminate()
762 
763  break
764 
765  recv = self.recv(-1, -1)
766 
767  if recv is None:
768  continue
769 
770  tid, msg, data = recv
771 
772  if self.debug:
773  template = 'received: tid=%d, msg=%d, type(data)=%s'
774 
775  if msg == MSG_INIT_DONE:
776 
777  for service_id, server_list in self.servers.items():
778 
779  server = [s for s in server_list if s.url == tid]
780 
781  if server:
782 
783  if len(server) > 1:
784  raise 'Inconsistency'
785 
786  if self.debug:
787  print server[0].host.name, 'ready.'
788 
789  continue
790 
791  elif msg == MSG_CALL_METHOD_RESULT:
792 
793  try:
794  key, value, elapsed = data
795  except:
796  print data, len(data)
797 
798  if key in self.results:
799 
800  result = self.results[key]
801 
802  result.value = value
803  result.event.set()
804 
805  self.add_time(result.proxy, elapsed)
806 
807  if result.proxy._selfrelease:
808 
809  if self.debug:
810  self.dp( 'FileBasedGrid.run: releasing server %s for the grid' \
811  % result.proxy._server )
812 
813  result.proxy._selfrelease = False
814  self._release_service(result.proxy)
815 
816  del self.results[key]
817  self.removed.put(key)
818 
819  else:
820 
821  print 'Result object, key %d, not known.' % key
822 
823  if key in self.removed.pipe:
824  print 'Key has already been deleted.'
825 
826  elif msg == MSG_CLIENT_CRASHED:
827 
828  ## find service to which the crashed client belongs
829 
830  for service_id, server_list in self.servers.items():
831 
832  crashed = [server for server in server_list \
833  if server.url == tid]
834 
835  if crashed:
836  break
837 
838  else:
839  raise 'Inconsistency: TID %d not known.' % tid
840 
841  msg = 'Client on host "%s" inaccessible. Attempting shutdown...'
842  print msg % crashed[0].host.name
843 
844  self.terminate(service_id)
845 
846  else:
847  print 'Unknown message', tid, msg
848 
849  self.__stopped = self.__stop
850 
851  def ishalted(self):
852  return self.__stopped
853 
854  def terminate(self, service_id = None):
855 
856  AbstractGrid.terminate(self, service_id)
857 
858  self.communicator.halt()
859  self.__stop = True
860 
861  if self.debug: ## please keep debug statement!
862  print 'FileBasedGrid: terminated.'
863 
864  def __del__(self):
865  self.terminate()
866 
867  #### YS: a few additions
868 
869  def broadcast(self, sfo_id, funcname, *args, **kw):
870  results = []
871  for server in self.servers[sfo_id]:
872  func=getattr(server.proxy, funcname)
873  results.append(func(*args, **kw))
874  return results
875 
876  def scatter(self, sfo_id, funcname, arglist, kwlist=None):
877  results = []
878  if kwlist is None:
879  kwlist=[{} for i in xrange(len(arglist))]
880  if not hasattr(arglist[0],'__iter__'):
881  arglist = [[i] for i in arglist]
882  for server,args,kw in zip(self.servers[sfo_id],arglist,kwlist):
883  func=getattr(server.proxy, funcname)
884  results.append(func(*args, **kw))
885  return results
886 
887  def gather(self, results):
888  retval=[]
889  for server in results:
890  retval.append(server.get())
891  return retval