14 from AbstractGrid
import AbstractGrid, Server, Result
16 from OrderedDict
import OrderedDict
17 from utils
import touch, my_glob, WatchDog, Pipe
21 def file_exists(name, timeout=None):
23 name = os.path.expanduser(name)
33 def valid_timestamp(name, max_delay):
36 creation_time = os.stat(name).st_ctime
40 sys_time = time.time()
42 if creation_time-sys_time > 1.:
43 msg =
'Inconsistent clocks: filename=%s, timestamp=%.2f, system time=%.2f'
44 raise StandardError, msg % (name, creation_time, sys_time)
46 return abs(creation_time-sys_time) < max_delay
48 class FIFOBasedCommunicator:
51 polling_interval = 0.02
54 def __init__(self, temp_path, tid=None, debug=False, nfs_care=False):
57 self.comm_path = os.path.join(temp_path,
'comm')
58 self.temp_path = temp_path
60 if not os.path.exists(self.comm_path):
61 os.system(
'mkdir ' + self.comm_path)
65 self.tid_counter = random.randint(0, 12345678)
67 tid = self.tid_counter
75 self.data = OrderedDict()
79 self.nfs_care = nfs_care
84 if not (hasattr(fcntl,
'LOCK_EX')
and hasattr(os,
'mkfifo')):
85 raise RuntimeError,
"Exclusive locks / FIFOs not supported "\
86 "on this system, aborting..."
91 self.send(self.tid, MSG_TERMINATE,
None)
93 def create_slot(self, tid):
95 slot =
'%s/%d' % (self.comm_path, tid)
97 if not os.path.exists(slot):
99 open(slot+
'.lockr',
'w').close()
100 open(slot+
'.lockw',
'w').close()
101 elif not stat.S_ISFIFO(os.stat(slot)[stat.ST_MODE]):
102 raise "comm file exists but is not a FIFO!"
104 def spawn(self, host_name, command, args, init_data, pipe = '',
107 self.tid_counter += 1
109 self.create_slot(self.tid_counter)
111 ssh_cmd = self.remote_shell
116 init_data[
'temp_path'] = self.temp_path
117 init_data[
'debug'] = self.debug
118 init_data[
'parent_tid'] = self.tid
119 init_data[
'tid'] = self.tid_counter
120 init_data[
'nfs_care'] = self.nfs_care
122 args =
' '.join([str(x)
for x
in args])
124 filename = os.path.join(self.temp_path,
'init_data.%d' % init_data[
'tid'])
126 f = open(filename,
'w')
127 cPickle.dump(init_data, f)
130 sh_cmd =
"%s %s '%s %s %s %s' &" % (ssh_cmd, host_name, command, args,
135 return self.tid_counter
137 def dump(self, op, info, fifo):
140 time.sleep(self.polling_interval)
143 r,w,e = select.select([],[f],[])
146 cPickle.dump((info,op), f)
151 def register(self, _id, tid, msg, data):
156 self.data[key].append(data)
158 self.data[key] = [data]
160 def pop_message(self, tid, msg):
165 if key
in data
and data[key]:
167 value = data[key].pop(0)
173 print 'pop_message', tid, msg
175 return tid, msg, value
181 """block until message arrives,
182 extract data and register to self.data construct,
185 fifo = self.comm_path +
'/%d' % self.tid
191 time.sleep(self.polling_interval)
193 r,w,e = select.select([f],[],[])
198 ((sender,msg,_id),_data) = cPickle.load(f)
199 self.register(int(_id), int(sender), int(msg), _data)
201 print 'poll: sender=%s, msg=%s, id=%s, type=%s, time=%f'%\
202 (sender, msg, _id,_data.__class__.__name__, time.time())
208 def find_message(self, sender, msg):
209 """get message msg from sender sender, -1 stands for any.
210 returns (tid, msg, data) or None
217 (tid, _msg), _data = data.get_first_item()
218 return self.pop_message(tid, _msg)
220 for (tid, _msg), _data
in data.items():
222 return self.pop_message(tid, _msg)
225 for (tid, _msg), _data
in data.items():
227 return self.pop_message(tid, _msg)
229 return self.pop_message(sender, msg)
232 def recv(self, sender, msg):
233 """get new message from FIFO, block if necessary
234 returns (tid, msg, data)
238 print 'recv (tid=%d): waiting for sender=%d, msg=%d, time=%f' % \
239 (self.tid, sender, msg, time.time())
246 if len(self.data) == 0:
248 result = self.find_message(sender, msg)
249 if result
is not None:
257 def send(self, receiver, msg, value):
259 recv_path = self.comm_path +
'/%d' % receiver
260 while not os.path.exists(recv_path):
261 print 'send: path %s does not exist. waiting ...' % recv_path
262 time.sleep(self.polling_interval)
263 info = (self.tid, msg, self.message_id)
265 self.dump(value, info, recv_path)
267 print 'sent: receiver=%d, msg=%d, msg_id=%d, time=%f' % \
268 (receiver, msg, self.message_id - 1, time.time())
270 class FIFOBasedServer(Server):
275 self.dp(
'FIFOBasedServer.terminate: terminating proxy %s' % self.proxy)
277 self.grid.send(self.url, MSG_TERMINATE,
None)
279 class FIFOBasedRemoteObjectHandler(RemoteObjectHandler):
281 def __init__(self, kill_on_error=0, signal_file='', temp_path='',
282 parent_tid=
None, tid=
None, debug=
False, nfs_care=
False):
285 RemoteObjectHandler.__init__(self, kill_on_error, signal_file, debug)
290 self.watchdog = WatchDog(60, debug=debug)
291 self.watchdog.start()
293 self.create_communicator(temp_path, tid, debug, nfs_care)
295 self.parent_tid = parent_tid
301 def create_communicator(self, temp_path, tid, debug, nfs_care):
303 self.communicator = FIFOBasedCommunicator(temp_path, tid, debug,
306 def send(self, msg, value = None):
307 self.communicator.send(self.parent_tid, msg, value)
310 return self.communicator.recv(self.parent_tid, msg)
312 def initialize(self):
313 """wait for initialization request and initialize accordingly"""
315 print 'Initializing...'
317 tid, msg, vals = self.recv(MSG_INIT)
320 self.set_object(vals[
'object'])
323 self.daemon = vals[
'daemon']
325 if 'expiration_time' in vals:
326 self.t_expire = vals[
'expiration_time']
328 self.send(MSG_INIT_DONE)
333 """main request management loop. Head node sends commands which this
334 thread will execute"""
338 self._terminate =
False
340 self.watchdog.set(time.time())
342 while 1
and not self._terminate:
344 tid, msg, data = self.recv(-1)
349 if type(data)
is not tuple:
352 method = self.bindings[msg]
354 if self.kill_on_error:
358 self.watchdog.set(time.time())
361 self.send(MSG_CLIENT_CRASHED)
367 self.watchdog.set(time.time())
370 print 'Grid has been halted.'
374 print 'Debugging mode, keeping Python interpreter alive.'
376 def terminate(self, x=None):
378 self._terminate =
True
381 """the filebased proxy instance"""
383 def _call_method(self, name, *args, **kw):
387 result = self.__manager.create_result_object(self)
389 result.info = {
'name': name,
'args': args,
'kw': kw}
391 value = (result.key, name, args, kw)
393 self.__manager.send(self.__handler_tid, MSG_CALL_METHOD, value)
397 class FIFOBasedGrid(AbstractGrid):
399 def __init__(self, hosts, src_path, display, X11_delay, debug, verbose, nfs_care=False):
401 AbstractGrid.__init__(self, hosts, src_path, display, X11_delay, debug, verbose, shared_temp_path=
True)
404 if self.debug:
print "initialising"
405 self.initialise(src_path, [
'fifobased_loader',
'ro',
406 'FIFOBasedGrid',
'OrderedDict'])
409 if self.debug:
print "setting filebased_loader"
410 self.set_loader(src_path,
'fifobased_loader')
412 if self.debug:
print "creating communicator"
413 self.create_communicator(nfs_care)
419 self.__stopped =
False
422 print 'FIFOBasedGrid created: tid = ', self.communicator.tid
424 def set_debug(self, debug):
426 AbstractGrid.set_debug(self, debug)
427 self.communicator.debug = debug
429 def create_communicator(self, nfs_care):
431 self.communicator = FIFOBasedCommunicator(self.hosts[0].temp_path,
432 debug = self.debug, nfs_care = nfs_care)
434 def publish(self, instance):
436 for each host of self.hosts
437 -launch a FIFOBasedRemoteObjectHandler through the filebased loader.
438 - pickle the instance object and send it to the handler.
439 setting an attribute of the proxy results in a message being sent to
441 - create a FIFOBasedServed for the proxy
442 - add it to the queue self.queues[sid]
443 and to the list self.servers[sid]
444 and set server.grid and server.proxy
445 returns the service id associated to this instance.
447 note from the authors
448 In FIFOBasedGrid there is one to one correspondence between
449 a Server and a Handler...
455 print "publishing instance %s" % \
456 instance.__class__.__name__
458 print "publishing instance"
460 if self.debug:
print " creating sevice id"
461 service_id = self.create_service_id(instance)
463 for host
in self.hosts:
465 if self.debug:
print " host ",host.name
467 if self.debug:
print " creating proxy"
468 proxy = self.create_proxy(instance, host, self.display, daemon = 1)
470 if self.debug:
print " creating FIFOBasedServer"
471 server = FIFOBasedServer(proxy, service_id, host, self.debug)
473 if self.debug:
print " proxy._get_url()"
474 server.url = proxy._get_url()
476 if self.debug:
print " adding server"
477 self.add_server(server)
479 if self.display
and self.X11_delay
is not None:
480 time.sleep(self.X11_delay)
484 def create_proxy(self, instance, host, display = 0, daemon = 0):
486 (copied from Grid, called from AbstractISDGrid.create_server)
490 if self.debug:
print " creating handler"
491 handler_tid = self.create_handler(instance, host, display, daemon)
493 if self.debug:
print " creating proxy"
497 print 'Connected: tid=%d' % handler_tid
501 def create_handler(self, instance, host, display, daemon):
503 d = {
'object': instance,
506 handler_tid = self.spawn_handler(host, display)
509 print 'Initialising service on host "%s"' % host.name
511 self.send(handler_tid, MSG_INIT, d)
514 print 'MSG_INIT sent.'
518 def spawn_handler(self, host, display):
520 handler_script = os.path.join(self.hosts[0].temp_path,
521 self.loader_filename)
523 init_data = {
'niceness': host.niceness,
526 argv = [handler_script]
529 if host.init_cmd !=
'':
530 if host.init_cmd.rstrip().endswith(
';'):
531 command = host.init_cmd
532 elif host.init_cmd.rstrip().endswith(
'!'):
533 command = host.init_cmd.rstrip()[:-1]
535 command = host.init_cmd +
';'
541 if type(display)
is type(0):
543 master_name = socket.gethostname()
545 if host.name == master_name:
548 display = master_name +
':0.0'
552 argv = [
'-title', host.name,
553 '-geometry', self.window_size,
556 host.python,
'-i'] + argv
561 command += host.python
565 print 'Spawning service on host "%s"' % host.name
569 tid = self.communicator.spawn(host.name, command, argv,
570 init_data, pipe, X_forward)
573 print 'Service spawned: tid = ', tid
577 def recv(self, tid, msg):
578 return self.communicator.recv(tid, msg)
580 def send(self, tid, msg, value = None):
581 self.communicator.send(tid, msg, value)
583 def create_result_object(self, proxy):
585 Results has to be temporarily stored in the Grid
586 until their values are calculated
593 if key
in self.results:
594 print 'Result object for key %d already exists.' % key
598 result = Result(proxy)
602 self.results[key] = result
608 self.removed = Pipe(100000)
610 while not self.__stop:
612 if os.path.exists(os.path.join(self.hosts[0].temp_path,
'quit')):
614 [self.send(server.url, MSG_TERMINATE)
for \
615 server
in self.servers[service_id]]
622 recv = self.recv(-1, -1)
627 tid, msg, data = recv
630 template =
'received: tid=%d, msg=%d, type(data)=%s'
632 if msg == MSG_INIT_DONE:
634 for service_id, server_list
in self.servers.items():
636 server = [s
for s
in server_list
if s.url == tid]
641 raise 'Inconsistency'
644 print server[0].host.name,
'ready.'
648 elif msg == MSG_CALL_METHOD_RESULT:
651 key, value, elapsed = data
653 print data, len(data)
655 if key
in self.results:
657 result = self.results[key]
662 self.add_time(result.proxy, elapsed)
664 if result.proxy._selfrelease:
667 self.dp(
'FIFOBasedGrid.run: releasing server %s for the grid' \
668 % result.proxy._server )
670 result.proxy._selfrelease =
False
671 self._release_service(result.proxy)
673 del self.results[key]
674 self.removed.put(key)
678 print 'Result object, key %d, not known.' % key
680 if key
in self.removed.pipe:
681 print 'Key has already been deleted.'
683 elif msg == MSG_CLIENT_CRASHED:
687 for service_id, server_list
in self.servers.items():
689 crashed = [server
for server
in server_list \
690 if server.url == tid]
696 raise 'Inconsistency: TID %d not known.' % tid
698 msg =
'Client on host "%s" inaccessible. Attempting shutdown...'
699 print msg % crashed[0].host.name
701 self.terminate(service_id)
703 elif msg == MSG_TERMINATE:
707 print 'Unknown message', tid, msg
709 self.__stopped = self.__stop
712 return self.__stopped
714 def terminate(self, service_id = None):
716 AbstractGrid.terminate(self, service_id)
718 self.communicator.halt()
722 print 'FIFOBasedGrid: terminated.'
729 def broadcast(self, sfo_id, funcname, *args, **kw):
731 for server
in self.servers[sfo_id]:
732 func=getattr(server.proxy, funcname)
733 results.append(func(*args, **kw))
736 def scatter(self, sfo_id, funcname, arglist, kwlist=None):
739 kwlist=[{}
for i
in xrange(len(arglist))]
740 if not hasattr(arglist[0],
'__iter__'):
741 arglist = [[i]
for i
in arglist]
742 for server,args,kw
in zip(self.servers[sfo_id],arglist,kwlist):
743 func=getattr(server.proxy, funcname)
744 results.append(func(*args, **kw))
747 def gather(self, results):
749 for server
in results:
750 retval.append(server.get())