26 from AbstractGrid
import AbstractGrid, Server, Result
28 from OrderedDict
import OrderedDict
29 from utils
import touch, my_glob, WatchDog, Pipe
33 def file_exists(name, timeout=None):
35 name = os.path.expanduser(name)
45 def valid_timestamp(name, max_delay):
48 creation_time = os.stat(name).st_ctime
52 sys_time = time.time()
54 if creation_time-sys_time > 1.:
55 msg =
'Inconsistent clocks: filename=%s, timestamp=%.2f, system time=%.2f'
56 raise StandardError, msg % (name, creation_time, sys_time)
58 return abs(creation_time-sys_time) < max_delay
60 class FileBasedCommunicator:
63 polling_interval = 0.1
66 def __init__(self, temp_path, tid=None, debug=False, nfs_care=False):
69 self.comm_path = os.path.join(temp_path,
'comm')
70 self.temp_path = temp_path
72 if not os.path.exists(self.comm_path):
73 os.system(
'mkdir ' + self.comm_path)
77 self.tid_counter = random.randint(0, 12345678)
79 tid = self.tid_counter
87 self.data = OrderedDict()
91 self.nfs_care = nfs_care
98 def create_slot(self, tid):
100 slot =
'%s/%d' % (self.comm_path, tid)
102 if not os.path.exists(slot):
103 os.system(
'mkdir ' + slot)
105 def spawn(self, host_name, command, args, init_data, pipe = '',
108 self.tid_counter += 1
110 self.create_slot(self.tid_counter)
112 ssh_cmd = self.remote_shell
117 init_data[
'temp_path'] = self.temp_path
118 init_data[
'debug'] = self.debug
119 init_data[
'parent_tid'] = self.tid
120 init_data[
'tid'] = self.tid_counter
121 init_data[
'nfs_care'] = self.nfs_care
123 args =
' '.join([str(x)
for x
in args])
125 filename = os.path.join(self.temp_path,
'init_data.%d' % init_data[
'tid'])
127 f = open(filename,
'w')
128 cPickle.dump(init_data, f)
131 sh_cmd =
"%s %s '%s %s %s %s' &" % (ssh_cmd, host_name, command, args,
136 return self.tid_counter
138 def load(self, filename):
139 "load contents of message file filename by unpickling it"
143 while file_exists(filename +
'.lock'):
145 if self.nfs_care
and self.lock_max_delay
is not None:
147 if file_exists(filename)
and not \
148 valid_timestamp(filename +
'.lock', self.lock_max_delay):
150 sys_time = time.time()
153 file_time = os.stat(filename +
'.lock').st_ctime
158 msg =
'cPickle.load: lock file %s.lock has old timestamp: %d, system time is %d.'
159 print msg % (filename, file_time, sys_time)
163 if self.debug
and not debug_shown:
164 print 'Waiting for message to be unlocked ...'
167 time.sleep(self.polling_interval)
172 _data = cPickle.load(f)
175 except IOError, err_msg:
179 print 'Error accessing %s (%s). Error message was "%s". Trying again...' % \
180 (filename, time.ctime(), err_msg)
187 _data = cPickle.load(f)
197 def dump(self, op, filename):
202 f = open(filename +
'.lock',
'w')
207 except IOError, err_msg:
211 print 'Error accessing %s (%s). Error message was "%s". Trying again...' % \
212 (filename, time.ctime(), err_msg)
218 f = open(filename +
'.lock',
'w')
219 cPickle.dump(op, f, 1)
227 os.link(
'%s.lock' % filename,
'%s' % filename)
228 os.unlink(
'%s.lock' % filename)
230 def dump_old(self, op, filename):
234 f = open(filename +
'.lock',
'w')
239 except IOError, err_msg:
242 print 'Error accessing %s (%s). Error message was "%s". Trying again...' % \
243 (filename, time.ctime(), err_msg)
249 f = open(filename +
'.lock',
'w')
250 cPickle.dump(op, f, 1)
258 shutil.copyfile(
'%s.lock' % filename,
'%s' % filename)
259 os.system(
'cat %s > /dev/null' % filename)
260 os.unlink(
'%s.lock' % filename)
262 def register(self, _id, tid, msg, data):
267 self.data[key].append(data)
269 self.data[key] = [data]
271 def pop_message(self, tid, msg):
276 if key
in data
and data[key]:
278 value = data[key].pop(0)
284 print 'pop_message', tid, msg
286 return tid, msg, value
292 """check if new message arrived,
293 extract data and register to self.data construct,
297 filenames = self.comm_path +
'/%d/msg.*.msg' % self.tid
299 touch = self.nfs_care
301 files = my_glob(filenames, touch)
303 if len(files)
and self.debug:
304 print 'poll: new message arrived:'
310 sender, msg, _id = f.split(
'.')[-4:-1]
314 data.append((int(_id), int(sender), int(msg), _data))
317 print 'poll: sender=%s, msg=%s, id=%s, type=%s'%\
318 (sender, msg, _id,_data.__class__.__name__)
322 data.sort(
lambda a, b: cmp(a[0], b[0]))
324 [self.register(*d)
for d
in data]
326 def find_message(self, sender, msg):
327 """get message msg from sender sender, -1 stands for any.
328 returns (tid, msg, data) or None
339 (tid, _msg), _data = data.get_first_item()
341 return self.pop_message(tid, _msg)
345 for (tid, _msg), _data
in data.items():
348 return self.pop_message(tid, _msg)
354 for (tid, _msg), _data
in data.items():
357 return self.pop_message(tid, _msg)
360 return self.pop_message(sender, msg)
364 def recv(self, sender, msg):
365 """get new message from sender, wait if necessary
366 returns (tid, msg, data)
370 print 'recv (tid=%d): waiting for sender=%d, msg=%d' % \
371 (self.tid, sender, msg)
375 while not self.__stop:
379 result = self.find_message(sender, msg)
381 if result
is not None:
384 time.sleep(self.polling_interval)
391 def send(self, receiver, msg, value):
393 recv_path = self.comm_path +
'/%d' % receiver
395 while not os.path.exists(recv_path):
397 print 'send: path %s does not exist. waiting ...' % recv_path
399 time.sleep(self.polling_interval)
401 time.sleep(self.polling_interval)
403 filename = recv_path +
'/msg.%d.%d.%d.msg' % (self.tid, msg, self.message_id)
407 self.dump(value, filename)
410 print 'sent: receiver=%d, msg=%d, msg_id=%d' % (receiver, msg, self.message_id - 1)
412 class FileBasedServer(Server):
417 self.dp(
'FileBasedServer.terminate: terminating proxy %s' % self.proxy)
419 self.grid.send(self.url, MSG_TERMINATE,
None)
421 class FileBasedRemoteObjectHandler(RemoteObjectHandler):
423 def __init__(self, kill_on_error=0, signal_file='', temp_path='',
424 parent_tid=
None, tid=
None, debug=
False, nfs_care=
False):
427 RemoteObjectHandler.__init__(self, kill_on_error, signal_file, debug)
432 self.watchdog = WatchDog(60, debug=debug)
433 self.watchdog.start()
435 self.create_communicator(temp_path, tid, debug, nfs_care)
437 self.parent_tid = parent_tid
443 def create_communicator(self, temp_path, tid, debug, nfs_care):
445 self.communicator = FileBasedCommunicator(temp_path, tid, debug,
448 def send(self, msg, value = None):
449 self.communicator.send(self.parent_tid, msg, value)
452 return self.communicator.recv(self.parent_tid, msg)
454 def initialize(self):
455 """wait for initialization request and initialize accordingly"""
457 print 'Initializing...'
459 tid, msg, vals = self.recv(MSG_INIT)
462 self.set_object(vals[
'object'])
465 self.daemon = vals[
'daemon']
467 if 'expiration_time' in vals:
468 self.t_expire = vals[
'expiration_time']
470 self.send(MSG_INIT_DONE)
475 """main request management loop. Head node sends commands which this
476 thread will execute"""
480 self._terminate =
False
482 self.watchdog.set(time.time())
484 while 1
and not self._terminate:
486 tid, msg, data = self.recv(-1)
491 if type(data)
is not tuple:
494 method = self.bindings[msg]
496 if self.kill_on_error:
500 self.watchdog.set(time.time())
503 self.send(MSG_CLIENT_CRASHED)
509 self.watchdog.set(time.time())
512 print 'Grid has been halted.'
516 print 'Debugging mode, keeping Python interpreter alive.'
519 def terminate(self, x=None):
521 self._terminate =
True
524 """the filebased proxy instance"""
526 def _call_method(self, name, *args, **kw):
530 result = self.__manager.create_result_object(self)
532 result.info = {
'name': name,
'args': args,
'kw': kw}
534 value = (result.key, name, args, kw)
536 self.__manager.send(self.__handler_tid, MSG_CALL_METHOD, value)
540 class FileBasedGrid(AbstractGrid):
542 def __init__(self, hosts, src_path, display, X11_delay, debug, verbose, nfs_care=False):
544 AbstractGrid.__init__(self, hosts, src_path, display, X11_delay, debug, verbose, shared_temp_path=
True)
547 if self.debug:
print "initialising"
548 self.initialise(src_path, [
'filebased_loader',
'ro',
549 'FileBasedGrid',
'OrderedDict'])
552 if self.debug:
print "setting filebased_loader"
553 self.set_loader(src_path,
'filebased_loader')
555 if self.debug:
print "creating communicator"
556 self.create_communicator(nfs_care)
562 self.__stopped =
False
565 print 'FileBasedGrid created: tid = ', self.communicator.tid
567 def set_debug(self, debug):
569 AbstractGrid.set_debug(self, debug)
570 self.communicator.debug = debug
572 def create_communicator(self, nfs_care):
574 self.communicator = FileBasedCommunicator(self.hosts[0].temp_path,
575 debug = self.debug, nfs_care = nfs_care)
577 def publish(self, instance):
579 for each host of self.hosts
580 -launch a FileBasedRemoteObjectHandler through the filebased loader.
581 - pickle the instance object and send it to the handler.
582 setting an attribute of the proxy results in a message being sent to
584 - create a FileBasedServed for the proxy
585 - add it to the queue self.queues[sid]
586 and to the list self.servers[sid]
587 and set server.grid and server.proxy
588 returns the service id associated to this instance.
590 note from the authors
591 In FileBasedGrid there is one to one correspondence between
592 a Server and a Handler...
598 print "publishing instance %s" % \
599 instance.__class__.__name__
601 print "publishing instance"
603 if self.debug:
print " creating sevice id"
604 service_id = self.create_service_id(instance)
606 for host
in self.hosts:
608 if self.debug:
print " host ",host.name
610 if self.debug:
print " creating proxy"
611 proxy = self.create_proxy(instance, host, self.display, daemon = 1)
613 if self.debug:
print " creating FileBasedServer"
614 server = FileBasedServer(proxy, service_id, host, self.debug)
616 if self.debug:
print " proxy._get_url()"
617 server.url = proxy._get_url()
619 if self.debug:
print " adding server"
620 self.add_server(server)
622 if self.display
and self.X11_delay
is not None:
623 time.sleep(self.X11_delay)
627 def create_proxy(self, instance, host, display = 0, daemon = 0):
629 (copied from Grid, called from AbstractISDGrid.create_server)
633 if self.debug:
print " creating handler"
634 handler_tid = self.create_handler(instance, host, display, daemon)
636 if self.debug:
print " creating proxy"
640 print 'Connected: tid=%d' % handler_tid
644 def create_handler(self, instance, host, display, daemon):
646 d = {
'object': instance,
649 handler_tid = self.spawn_handler(host, display)
652 print 'Initialising service on host "%s"' % host.name
654 self.send(handler_tid, MSG_INIT, d)
657 print 'MSG_INIT sent.'
661 def spawn_handler(self, host, display):
663 handler_script = os.path.join(self.hosts[0].temp_path,
664 self.loader_filename)
666 init_data = {
'niceness': host.niceness,
669 argv = [handler_script]
672 if host.init_cmd !=
'':
673 if host.init_cmd.rstrip().endswith(
';'):
674 command = host.init_cmd
675 elif host.init_cmd.rstrip().endswith(
'!'):
676 command = host.init_cmd.rstrip()[:-1]
678 command = host.init_cmd +
';'
684 if type(display)
is type(0):
686 master_name = socket.gethostname()
688 if host.name == master_name:
691 display = master_name +
':0.0'
695 argv = [
'-title', host.name,
696 '-geometry', self.window_size,
699 host.python,
'-i'] + argv
704 command += host.python
708 print 'Spawning service on host "%s"' % host.name
712 tid = self.communicator.spawn(host.name, command, argv,
713 init_data, pipe, X_forward)
716 print 'Service spawned: tid = ', tid
720 def recv(self, tid, msg):
721 return self.communicator.recv(tid, msg)
723 def send(self, tid, msg, value = None):
724 self.communicator.send(tid, msg, value)
726 def create_result_object(self, proxy):
728 Results has to be temporarily stored in the Grid
729 until their values are calculated
736 if key
in self.results:
737 print 'Result object for key %d already exists.' % key
741 result = Result(proxy)
745 self.results[key] = result
751 self.removed = Pipe(100000)
753 while not self.__stop:
755 if os.path.exists(os.path.join(self.hosts[0].temp_path,
'quit')):
757 [self.send(server.url, MSG_TERMINATE)
for \
758 server
in self.servers[service_id]]
765 recv = self.recv(-1, -1)
770 tid, msg, data = recv
773 template =
'received: tid=%d, msg=%d, type(data)=%s'
775 if msg == MSG_INIT_DONE:
777 for service_id, server_list
in self.servers.items():
779 server = [s
for s
in server_list
if s.url == tid]
784 raise 'Inconsistency'
787 print server[0].host.name,
'ready.'
791 elif msg == MSG_CALL_METHOD_RESULT:
794 key, value, elapsed = data
796 print data, len(data)
798 if key
in self.results:
800 result = self.results[key]
805 self.add_time(result.proxy, elapsed)
807 if result.proxy._selfrelease:
810 self.dp(
'FileBasedGrid.run: releasing server %s for the grid' \
811 % result.proxy._server )
813 result.proxy._selfrelease =
False
814 self._release_service(result.proxy)
816 del self.results[key]
817 self.removed.put(key)
821 print 'Result object, key %d, not known.' % key
823 if key
in self.removed.pipe:
824 print 'Key has already been deleted.'
826 elif msg == MSG_CLIENT_CRASHED:
830 for service_id, server_list
in self.servers.items():
832 crashed = [server
for server
in server_list \
833 if server.url == tid]
839 raise 'Inconsistency: TID %d not known.' % tid
841 msg =
'Client on host "%s" inaccessible. Attempting shutdown...'
842 print msg % crashed[0].host.name
844 self.terminate(service_id)
847 print 'Unknown message', tid, msg
849 self.__stopped = self.__stop
852 return self.__stopped
854 def terminate(self, service_id = None):
856 AbstractGrid.terminate(self, service_id)
858 self.communicator.halt()
862 print 'FileBasedGrid: terminated.'
869 def broadcast(self, sfo_id, funcname, *args, **kw):
871 for server
in self.servers[sfo_id]:
872 func=getattr(server.proxy, funcname)
873 results.append(func(*args, **kw))
876 def scatter(self, sfo_id, funcname, arglist, kwlist=None):
879 kwlist=[{}
for i
in xrange(len(arglist))]
880 if not hasattr(arglist[0],
'__iter__'):
881 arglist = [[i]
for i
in arglist]
882 for server,args,kw
in zip(self.servers[sfo_id],arglist,kwlist):
883 func=getattr(server.proxy, funcname)
884 results.append(func(*args, **kw))
887 def gather(self, results):
889 for server
in results:
890 retval.append(server.get())