IMP  2.0.1
The Integrative Modeling Platform
FIFOBasedGrid.py
1 # minor modification of ISD's FileBasedGrid to make it communicate through a
2 # FIFO (named piped). Unix only.
3 
4 import os
5 import time
6 import random
7 import cPickle
8 import shutil
9 import socket
10 import stat
11 import select
12 import fcntl
13 
14 from AbstractGrid import AbstractGrid, Server, Result
15 from ro import *
16 from OrderedDict import OrderedDict
17 from utils import touch, my_glob, WatchDog, Pipe
18 
19 ## TODO: move auxiliary functions to a separate module
20 
21 def file_exists(name, timeout=None):
22 
23  name = os.path.expanduser(name)
24 
25  try:
26  f = open(name)
27  f.close()
28  return True
29 
30  except IOError:
31  return False
32 
33 def valid_timestamp(name, max_delay):
34 
35  try:
36  creation_time = os.stat(name).st_ctime
37  except:
38  return True
39 
40  sys_time = time.time()
41 
42  if creation_time-sys_time > 1.:
43  msg = 'Inconsistent clocks: filename=%s, timestamp=%.2f, system time=%.2f'
44  raise StandardError, msg % (name, creation_time, sys_time)
45 
46  return abs(creation_time-sys_time) < max_delay
47 
48 class FIFOBasedCommunicator:
49 
50  remote_shell = 'ssh'
51  polling_interval = 0.02
52  lock_max_delay = 10.
53 
54  def __init__(self, temp_path, tid=None, debug=False, nfs_care=False):
55 
56 
57  self.comm_path = os.path.join(temp_path, 'comm')
58  self.temp_path = temp_path
59 
60  if not os.path.exists(self.comm_path):
61  os.system('mkdir ' + self.comm_path)
62 
63  if tid is None:
64 
65  self.tid_counter = random.randint(0, 12345678)
66 
67  tid = self.tid_counter
68 
69  self.create_slot(tid)
70 
71  self.tid = tid
72 
73  self.message_id = 0
74 
75  self.data = OrderedDict()
76 
77  self.debug = debug
78 
79  self.nfs_care = nfs_care
80 
81  self.__stop = False
82 
83  #if not hasattr(os,'mkfifo'):
84  if not (hasattr(fcntl, 'LOCK_EX') and hasattr(os,'mkfifo')):
85  raise RuntimeError, "Exclusive locks / FIFOs not supported "\
86  "on this system, aborting..."
87 
88  def halt(self):
89  self.__stop = True
90  #needed to unlock forked process
91  self.send(self.tid, MSG_TERMINATE, None)
92 
93  def create_slot(self, tid):
94 
95  slot = '%s/%d' % (self.comm_path, tid)
96 
97  if not os.path.exists(slot):
98  os.mkfifo(slot)
99  open(slot+'.lockr','w').close()
100  open(slot+'.lockw','w').close()
101  elif not stat.S_ISFIFO(os.stat(slot)[stat.ST_MODE]):
102  raise "comm file exists but is not a FIFO!"
103 
104  def spawn(self, host_name, command, args, init_data, pipe = '',
105  X_forward = False):
106 
107  self.tid_counter += 1
108 
109  self.create_slot(self.tid_counter)
110 
111  ssh_cmd = self.remote_shell
112 
113  if X_forward:
114  ssh_cmd += ' -X'
115 
116  init_data['temp_path'] = self.temp_path
117  init_data['debug'] = self.debug
118  init_data['parent_tid'] = self.tid
119  init_data['tid'] = self.tid_counter
120  init_data['nfs_care'] = self.nfs_care
121 
122  args = ' '.join([str(x) for x in args])
123 
124  filename = os.path.join(self.temp_path, 'init_data.%d' % init_data['tid'])
125 
126  f = open(filename, 'w')
127  cPickle.dump(init_data, f)
128  f.close()
129 
130  sh_cmd = "%s %s '%s %s %s %s' &" % (ssh_cmd, host_name, command, args,
131  filename, pipe)
132 
133  os.system(sh_cmd)
134 
135  return self.tid_counter
136 
137  def dump(self, op, info, fifo):
138  #fl = os.open(fifo+'.lockw', os.O_RDONLY)
139  #fcntl.lockf(fl, fcntl.LOCK_EX)
140  time.sleep(self.polling_interval)
141  f = open(fifo, 'w')
142  while True:
143  r,w,e = select.select([],[f],[])
144  if w:
145  break
146  cPickle.dump((info,op), f)
147  f.flush()
148  f.close()
149  #os.close(fl)
150 
151  def register(self, _id, tid, msg, data):
152 
153  key = (tid, msg)
154 
155  if key in self.data:
156  self.data[key].append(data)
157  else:
158  self.data[key] = [data]
159 
160  def pop_message(self, tid, msg):
161 
162  key = (tid, msg)
163  data = self.data
164 
165  if key in data and data[key]:
166 
167  value = data[key].pop(0)
168 
169  if not data[key]:
170  del data[key]
171 
172  if self.debug:
173  print 'pop_message', tid, msg
174 
175  return tid, msg, value
176 
177  else:
178  return None
179 
180  def poll(self):
181  """block until message arrives,
182  extract data and register to self.data construct,
183  """
184 
185  fifo = self.comm_path + '/%d' % self.tid
186  #fl = os.open(fifo+'.lockr', os.O_RDONLY)
187  #blocking call
188  #fcntl.lockf(fl, fcntl.LOCK_EX)
189  #blocking call
190  f = open(fifo)
191  time.sleep(self.polling_interval)
192  while True:
193  r,w,e = select.select([f],[],[])
194  if r:
195  break
196  while True:
197  try:
198  ((sender,msg,_id),_data) = cPickle.load(f)
199  self.register(int(_id), int(sender), int(msg), _data)
200  if self.debug:
201  print 'poll: sender=%s, msg=%s, id=%s, type=%s, time=%f'%\
202  (sender, msg, _id,_data.__class__.__name__, time.time())
203  except EOFError:
204  break
205  f.close()
206  #os.close(fl)
207 
208  def find_message(self, sender, msg):
209  """get message msg from sender sender, -1 stands for any.
210  returns (tid, msg, data) or None
211  """
212 
213  data = self.data
214  if sender == -1:
215  if msg == -1:
216  if data:
217  (tid, _msg), _data = data.get_first_item()
218  return self.pop_message(tid, _msg)
219  else:
220  for (tid, _msg), _data in data.items():
221  if _msg == msg:
222  return self.pop_message(tid, _msg)
223  else:
224  if msg == -1:
225  for (tid, _msg), _data in data.items():
226  if tid == sender:
227  return self.pop_message(tid, _msg)
228  else:
229  return self.pop_message(sender, msg)
230  return None
231 
232  def recv(self, sender, msg):
233  """get new message from FIFO, block if necessary
234  returns (tid, msg, data)
235  """
236 
237  if self.debug:
238  print 'recv (tid=%d): waiting for sender=%d, msg=%d, time=%f' % \
239  (self.tid, sender, msg, time.time())
240  result = None
241 
242  while True:
243  if self.__stop:
244  return result
245  #register incoming data
246  if len(self.data) == 0:
247  self.poll()
248  result = self.find_message(sender, msg)
249  if result is not None:
250  break
251 
252  if self.debug:
253  print 'received.'
254 
255  return result
256 
257  def send(self, receiver, msg, value):
258 
259  recv_path = self.comm_path + '/%d' % receiver
260  while not os.path.exists(recv_path):
261  print 'send: path %s does not exist. waiting ...' % recv_path
262  time.sleep(self.polling_interval)
263  info = (self.tid, msg, self.message_id)
264  self.message_id += 1
265  self.dump(value, info, recv_path)
266  if self.debug:
267  print 'sent: receiver=%d, msg=%d, msg_id=%d, time=%f' % \
268  (receiver, msg, self.message_id - 1, time.time())
269 
270 class FIFOBasedServer(Server):
271 
272  def terminate(self):
273 
274  if self.debug:
275  self.dp( 'FIFOBasedServer.terminate: terminating proxy %s' % self.proxy)
276 
277  self.grid.send(self.url, MSG_TERMINATE, None)
278 
279 class FIFOBasedRemoteObjectHandler(RemoteObjectHandler):
280 
281  def __init__(self, kill_on_error=0, signal_file='', temp_path='',
282  parent_tid=None, tid=None, debug=False, nfs_care=False):
283 
284 
285  RemoteObjectHandler.__init__(self, kill_on_error, signal_file, debug)
286 
287  ## create watchdog to check every 60 mins whether child processes
288  ## are still alive.
289 
290  self.watchdog = WatchDog(60, debug=debug)
291  self.watchdog.start()
292 
293  self.create_communicator(temp_path, tid, debug, nfs_care)
294 
295  self.parent_tid = parent_tid
296 
297  self.message_id = 0
298 
299  self.initialize()
300 
301  def create_communicator(self, temp_path, tid, debug, nfs_care):
302 
303  self.communicator = FIFOBasedCommunicator(temp_path, tid, debug,
304  nfs_care)
305 
306  def send(self, msg, value = None):
307  self.communicator.send(self.parent_tid, msg, value)
308 
309  def recv(self, msg):
310  return self.communicator.recv(self.parent_tid, msg)
311 
312  def initialize(self):
313  """wait for initialization request and initialize accordingly"""
314 
315  print 'Initializing...'
316 
317  tid, msg, vals = self.recv(MSG_INIT)
318 
319  if 'object' in vals:
320  self.set_object(vals['object'])
321 
322  if 'daemon' in vals:
323  self.daemon = vals['daemon']
324 
325  if 'expiration_time' in vals:
326  self.t_expire = vals['expiration_time']
327 
328  self.send(MSG_INIT_DONE)
329 
330  print 'Done.'
331 
332  def start(self):
333  """main request management loop. Head node sends commands which this
334  thread will execute"""
335 
336  print 'Ready.'
337 
338  self._terminate = False
339 
340  self.watchdog.set(time.time())
341 
342  while 1 and not self._terminate:
343 
344  tid, msg, data = self.recv(-1)
345 
346  if msg == MSG_STOP:
347  return
348 
349  if type(data) is not tuple:
350  data = (data,)
351 
352  method = self.bindings[msg]
353 
354  if self.kill_on_error:
355 
356  try:
357  method(*data)
358  self.watchdog.set(time.time())
359 
360  except:
361  self.send(MSG_CLIENT_CRASHED)
362  sys.exit(0)
363 
364  else:
365 
366  method(*data)
367  self.watchdog.set(time.time())
368 
369  if not self.debug:
370  print 'Grid has been halted.'
371  sys.exit(0)
372 
373  else:
374  print 'Debugging mode, keeping Python interpreter alive.'
375 
376  def terminate(self, x=None):
377 
378  self._terminate = True
379 
380 class FIFOBasedRemoteObject(RemoteObject):
381  """the filebased proxy instance"""
382 
383  def _call_method(self, name, *args, **kw):
384 
385  #result = self.__manager.create_result_object(self.__handler_tid)
386 
387  result = self.__manager.create_result_object(self)
388 
389  result.info = {'name': name, 'args': args, 'kw': kw}
390 
391  value = (result.key, name, args, kw)
392 
393  self.__manager.send(self.__handler_tid, MSG_CALL_METHOD, value)
394 
395  return result
396 
397 class FIFOBasedGrid(AbstractGrid):
398 
399  def __init__(self, hosts, src_path, display, X11_delay, debug, verbose, nfs_care=False):
400 
401  AbstractGrid.__init__(self, hosts, src_path, display, X11_delay, debug, verbose, shared_temp_path=True)
402 
403  #copy all files in current dir to remote host.
404  if self.debug: print "initialising"
405  self.initialise(src_path, ['fifobased_loader','ro',
406  'FIFOBasedGrid','OrderedDict'])
407 
408 
409  if self.debug: print "setting filebased_loader"
410  self.set_loader(src_path, 'fifobased_loader')
411 
412  if self.debug: print "creating communicator"
413  self.create_communicator(nfs_care)
414 
415  self.results = {}
416  self.key = 0
417 
418  self.__stop = False
419  self.__stopped = False
420 
421  if self.debug:
422  print 'FIFOBasedGrid created: tid = ', self.communicator.tid
423 
424  def set_debug(self, debug):
425 
426  AbstractGrid.set_debug(self, debug)
427  self.communicator.debug = debug
428 
429  def create_communicator(self, nfs_care):
430 
431  self.communicator = FIFOBasedCommunicator(self.hosts[0].temp_path,
432  debug = self.debug, nfs_care = nfs_care)
433 
434  def publish(self, instance):
435  """
436  for each host of self.hosts
437  -launch a FIFOBasedRemoteObjectHandler through the filebased loader.
438  - pickle the instance object and send it to the handler.
439  setting an attribute of the proxy results in a message being sent to
440  the concerned host.
441  - create a FIFOBasedServed for the proxy
442  - add it to the queue self.queues[sid]
443  and to the list self.servers[sid]
444  and set server.grid and server.proxy
445  returns the service id associated to this instance.
446 
447  note from the authors
448  In FIFOBasedGrid there is one to one correspondence between
449  a Server and a Handler...
450 
451  """
452 
453  if self.debug:
454  try:
455  print "publishing instance %s" % \
456  instance.__class__.__name__
457  except:
458  print "publishing instance"
459 
460  if self.debug: print " creating sevice id"
461  service_id = self.create_service_id(instance)
462 
463  for host in self.hosts:
464 
465  if self.debug: print " host ",host.name
466 
467  if self.debug: print " creating proxy"
468  proxy = self.create_proxy(instance, host, self.display, daemon = 1)
469 
470  if self.debug: print " creating FIFOBasedServer"
471  server = FIFOBasedServer(proxy, service_id, host, self.debug)
472 
473  if self.debug: print " proxy._get_url()"
474  server.url = proxy._get_url()
475 
476  if self.debug: print " adding server"
477  self.add_server(server)
478 
479  if self.display and self.X11_delay is not None:
480  time.sleep(self.X11_delay)
481 
482  return service_id
483 
484  def create_proxy(self, instance, host, display = 0, daemon = 0):
485  """
486  (copied from Grid, called from AbstractISDGrid.create_server)
487 
488  """
489 
490  if self.debug: print " creating handler"
491  handler_tid = self.create_handler(instance, host, display, daemon)
492 
493  if self.debug: print " creating proxy"
494  proxy = FIFOBasedRemoteObject(instance, handler_tid, manager = self)
495 
496  if self.debug:
497  print 'Connected: tid=%d' % handler_tid
498 
499  return proxy
500 
501  def create_handler(self, instance, host, display, daemon):
502 
503  d = {'object': instance,
504  'daemon': daemon}
505 
506  handler_tid = self.spawn_handler(host, display)
507 
508  if self.debug:
509  print 'Initialising service on host "%s"' % host.name
510 
511  self.send(handler_tid, MSG_INIT, d)
512 
513  if self.debug:
514  print 'MSG_INIT sent.'
515 
516  return handler_tid
517 
518  def spawn_handler(self, host, display):
519 
520  handler_script = os.path.join(self.hosts[0].temp_path,
521  self.loader_filename)
522 
523  init_data = {'niceness': host.niceness,
524  'display': display}
525 
526  argv = [handler_script]
527 
528  #add required init commands prior to launching anything else on the target host.
529  if host.init_cmd != '':
530  if host.init_cmd.rstrip().endswith(';'):
531  command = host.init_cmd
532  elif host.init_cmd.rstrip().endswith('!'):
533  command = host.init_cmd.rstrip()[:-1]
534  else:
535  command = host.init_cmd + ';'
536  else:
537  command = ''
538 
539  if display:
540 
541  if type(display) is type(0):
542 
543  master_name = socket.gethostname()
544 
545  if host.name == master_name:
546  display = ':0.0'
547  else:
548  display = master_name + ':0.0'
549 
550  command += 'xterm'
551 
552  argv = ['-title', host.name,
553  '-geometry', self.window_size,
554  '-hold',
555  '-e',
556  host.python, '-i'] + argv
557 
558  pipe = ''
559 
560  else:
561  command += host.python
562  pipe = '> /dev/null'
563 
564  if self.debug:
565  print 'Spawning service on host "%s"' % host.name
566 
567  X_forward = display
568 
569  tid = self.communicator.spawn(host.name, command, argv,
570  init_data, pipe, X_forward)
571 
572  if self.debug:
573  print 'Service spawned: tid = ', tid
574 
575  return tid
576 
577  def recv(self, tid, msg):
578  return self.communicator.recv(tid, msg)
579 
580  def send(self, tid, msg, value = None):
581  self.communicator.send(tid, msg, value)
582 
583  def create_result_object(self, proxy):
584  """
585  Results has to be temporarily stored in the Grid
586  until their values are calculated
587 
588  """
589 
590  key = self.key
591  self.key += 1
592 
593  if key in self.results:
594  print 'Result object for key %d already exists.' % key
595 
596  #result = Result(tid, key, self)
597 
598  result = Result(proxy)
599 
600  result.key = key
601 
602  self.results[key] = result
603 
604  return result
605 
606  def run(self):
607 
608  self.removed = Pipe(100000)
609 
610  while not self.__stop:
611 
612  if os.path.exists(os.path.join(self.hosts[0].temp_path, 'quit')):
613 
614  [self.send(server.url, MSG_TERMINATE) for \
615  server in self.servers[service_id]]
616 
617  #self.halt()
618  self.terminate()
619 
620  break
621 
622  recv = self.recv(-1, -1)
623 
624  if recv is None:
625  continue
626 
627  tid, msg, data = recv
628 
629  if self.debug:
630  template = 'received: tid=%d, msg=%d, type(data)=%s'
631 
632  if msg == MSG_INIT_DONE:
633 
634  for service_id, server_list in self.servers.items():
635 
636  server = [s for s in server_list if s.url == tid]
637 
638  if server:
639 
640  if len(server) > 1:
641  raise 'Inconsistency'
642 
643  if self.debug:
644  print server[0].host.name, 'ready.'
645 
646  continue
647 
648  elif msg == MSG_CALL_METHOD_RESULT:
649 
650  try:
651  key, value, elapsed = data
652  except:
653  print data, len(data)
654 
655  if key in self.results:
656 
657  result = self.results[key]
658 
659  result.value = value
660  result.event.set()
661 
662  self.add_time(result.proxy, elapsed)
663 
664  if result.proxy._selfrelease:
665 
666  if self.debug:
667  self.dp( 'FIFOBasedGrid.run: releasing server %s for the grid' \
668  % result.proxy._server )
669 
670  result.proxy._selfrelease = False
671  self._release_service(result.proxy)
672 
673  del self.results[key]
674  self.removed.put(key)
675 
676  else:
677 
678  print 'Result object, key %d, not known.' % key
679 
680  if key in self.removed.pipe:
681  print 'Key has already been deleted.'
682 
683  elif msg == MSG_CLIENT_CRASHED:
684 
685  ## find service to which the crashed client belongs
686 
687  for service_id, server_list in self.servers.items():
688 
689  crashed = [server for server in server_list \
690  if server.url == tid]
691 
692  if crashed:
693  break
694 
695  else:
696  raise 'Inconsistency: TID %d not known.' % tid
697 
698  msg = 'Client on host "%s" inaccessible. Attempting shutdown...'
699  print msg % crashed[0].host.name
700 
701  self.terminate(service_id)
702 
703  elif msg == MSG_TERMINATE:
704  self.__stop = True
705 
706  else:
707  print 'Unknown message', tid, msg
708 
709  self.__stopped = self.__stop
710 
711  def ishalted(self):
712  return self.__stopped
713 
714  def terminate(self, service_id = None):
715 
716  AbstractGrid.terminate(self, service_id)
717 
718  self.communicator.halt()
719  self.__stop = True
720 
721  if self.debug: ## please keep debug statement!
722  print 'FIFOBasedGrid: terminated.'
723 
724  def __del__(self):
725  self.terminate()
726 
727  #### YS: a few additions
728 
729  def broadcast(self, sfo_id, funcname, *args, **kw):
730  results = []
731  for server in self.servers[sfo_id]:
732  func=getattr(server.proxy, funcname)
733  results.append(func(*args, **kw))
734  return results
735 
736  def scatter(self, sfo_id, funcname, arglist, kwlist=None):
737  results = []
738  if kwlist is None:
739  kwlist=[{} for i in xrange(len(arglist))]
740  if not hasattr(arglist[0],'__iter__'):
741  arglist = [[i] for i in arglist]
742  for server,args,kw in zip(self.servers[sfo_id],arglist,kwlist):
743  func=getattr(server.proxy, funcname)
744  results.append(func(*args, **kw))
745  return results
746 
747  def gather(self, results):
748  retval=[]
749  for server in results:
750  retval.append(server.get())
751  return retval