25 import Pyro.core, Pyro.errors, Pyro.util
33 Pyro.config.PYRO_PRINT_REMOTE_TRACEBACK = 1
34 Pyro.config.PYRO_DETAILED_TRACEBACK = 1
35 Pyro.config.PYRO_MOBILE_CODE = 1
37 def get_sge_host_list():
38 """parse the sge variable $PE_HOSTFILE"""
40 for line
in open(os.environ[
'PE_HOSTFILE']):
44 hostlist.append(tokens[0])
47 class PyroGrid(AbstractGrid):
49 def __init__(self, hosts, src_path, display, X11_delay,
51 shared_temp_path, nshost, terminate_during_publish,
54 AbstractGrid.__init__(self, hosts, src_path, display, X11_delay,\
55 debug, verbose, shared_temp_path)
58 self.initialise(src_path, [
'PyroGrid',
'PyroUtils',
62 self.set_loader(src_path,
'PyroHandlerLoader')
65 if method !=
'ssh' and method !=
'qsub':
66 raise ValueError,
"unknown method: %s. Try ssh or qsub" % method
70 self.n_hosts = len(hosts)
79 self.__terminate_during_publish = terminate_during_publish
82 self.comm_acquired =
False
84 self.__stopped =
False
86 def _launch_handler(self, handler_uri, host):
87 """start up handlers in parallel. Handlers have URI 'handler_uri' and
88 are launched on 'host'.
91 handler_script = os.path.join(host.temp_path,
95 argv =
"%s %s %s %s %s %s %s %s" \
96 % (handler_script, host.niceness, self.nshost, handler_uri, \
97 self.timeout, self.debug, self.display, host.temp_path)
102 title = socket.getfqdn(host.name).split(
'.')[0]
105 cmd =
"xterm -T %s -geometry %s -hold -e \"%s -i %s\"" \
106 % (title, self.window_size, host.python, argv)
109 cmd =
"%s %s" % (host.python, argv)
113 if host.init_cmd !=
'':
114 if host.init_cmd.rstrip().endswith(
';'):
115 cmd = host.init_cmd + cmd
116 elif host.init_cmd.rstrip().endswith(
'!'):
117 cmd = host.init_cmd.rstrip()[:-1] + cmd
119 cmd = host.init_cmd +
';' + cmd
122 if self.method ==
'ssh':
123 cmd =
"ssh %s '%s' " % (host.name, cmd)
125 cmd =
"qrsh -inherit %s '%s' " % (host.name, cmd)
129 cmd = cmd +
" > /dev/null &"
130 print 'PYRO echo: %s' % cmd
137 def _connect_handlers(self, handler_uris, handlers, handlers_lock, hosts_map):
145 _ns = PyroUtils.get_nameserver(self.nshost)
147 for i
in range(self.n_hosts):
149 handler = PyroUtils.get_proxy(handler_uris[i], ns = _ns)
151 print 'PyroGrid._connect_handlers: failed to connect to "%s"' % handler_uris[i]
157 handler.host = hosts_map[handler_uris[i]]
160 print 'PyroGrid._connect_handlers: handler "%s" on "%s" is ready' \
161 % (handler_uris[i], handler._pyro_suppl[
'host'])
164 handlers_lock.acquire()
166 handlers.put(handler)
169 print 'PyroGrid._connect_handlers: notifies "%s" is in the queue' % handler_uris[i]
171 handlers_lock.notify()
172 handlers_lock.release()
174 def _launch_instances(self, instance, instance_uris, \
175 handlers, handlers_lock, \
176 launched_instances, launched_instances_lock, \
185 ns_clear_interval = 5
187 for i
in range(self.n_hosts):
190 handlers_lock.acquire()
191 if handlers.empty(): handlers_lock.wait()
192 handler = handlers.get()
193 handlers_lock.release()
195 instance_uri = instance_uris[i]
198 print 'PyroGrid._launch_instances: "%s" is being published remotely...' % instance_uri
202 handler.publish(instance, instance_uri, delegate =
True)
205 self_handlers_lock.acquire()
206 self.handlers[instance_uri] = handler
207 self_handlers_lock.release()
209 launched_instances_lock.acquire()
210 launched_instances.put(instance_uri)
213 print 'PyroGrid._launch_instances: notifies "%s" is being published' % instance_uri
215 launched_instances_lock.notify()
216 launched_instances_lock.release()
222 def _connect_instances(self, service_id, \
223 launched_instances, launched_instances_lock, \
224 self_handlers_lock, servers_lock):
228 _ns = PyroUtils.get_nameserver(self.nshost)
230 for i
in range(self.n_hosts):
232 launched_instances_lock.acquire()
233 if launched_instances.empty(): launched_instances_lock.wait()
234 instance_uri = launched_instances.get()
235 launched_instances_lock.release()
238 instance_proxy = PyroUtils.get_proxy(instance_uri, ns = _ns)
240 print 'PyroGrid._connect_instances: failed to connect to "%s"' % instance_uri
244 print 'PyroGrid._connect_instances: instance "%s" on "%s" is ready' \
245 % (instance_uri, instance_proxy._pyro_suppl[
'host'])
247 self._append_instance(service_id, instance_proxy, \
248 self_handlers_lock, servers_lock)
250 self.__published[service_id] =
True
252 def _append_instance(self, service_id, instance_proxy, \
253 self_handlers_lock, servers_lock):
257 instance_proxy._release()
258 instance_uri = instance_proxy._pyro_suppl[
'uri']
260 self_handlers_lock.acquire()
261 handler = self.handlers[instance_uri]
264 self_handlers_lock.release()
269 proxy =
PyroProxy(instance_proxy, debug = self.debug, verbose = self.verbose)
270 server = PyroServer(proxy, service_id, host, self.debug)
274 self_handlers_lock.acquire()
275 proxy.__dict__[
'_handler'] = handler
276 self_handlers_lock.release()
285 servers_lock.acquire()
286 self.add_server(server)
289 print 'PyroGrid._append_instance: notifies %s is ready' % server
291 servers_lock.notify()
292 servers_lock.release()
294 def start_handlers(self, service_id):
296 Starts PyroHandler on each host (non-blocking) and
297 returns a Condition lock object and
298 a FIFO queue that is being filled with their
299 Pyro proxies by a separate thread
303 handler_uris = PyroUtils.create_unique_uri(\
304 string.join((
'PyroHandler', service_id),
'.'),\
305 n = self.n_hosts, ns = self.nshost)
308 handlers = Queue.Queue(-1)
310 handlers_lock = threading.Condition()
314 for i
in range(self.n_hosts):
316 self._launch_handler(handler_uris[i], self.hosts[i])
317 hosts_map[handler_uris[i]] = self.hosts[i]
324 time.sleep(self.X11_delay)
327 t = threading.Thread(target = self._connect_handlers, \
328 args = (handler_uris, handlers, handlers_lock, hosts_map))
331 return handlers, handlers_lock
333 def start_instances(self, service_id, instance, handlers, handlers_lock):
336 instance_uris = PyroUtils.create_unique_uri(\
337 service_id, n = self.n_hosts, ns = self.nshost)
341 launched_instances = Queue.Queue(-1)
342 launched_instances_lock = threading.Condition()
344 instances = Queue.Queue(-1)
346 self_handlers_lock = threading.Condition()
348 servers_lock = threading.Condition()
351 t0 = threading.Thread(target = self._launch_instances, \
352 args = (instance, instance_uris, handlers, handlers_lock,\
353 launched_instances, launched_instances_lock, \
363 t1 = threading.Thread(target = self._connect_instances, \
364 args = (service_id, \
365 launched_instances, launched_instances_lock, \
366 self_handlers_lock, servers_lock))
371 def publish(self, instance):
373 Publish instance on all the hosts of the grid
374 (specific for the communication method)
380 It creates list of servers performing the job
381 specified by this service_id
386 service_id = self.create_service_id(instance)
388 self.__published[service_id] =
False
391 handlers, handlers_lock = self.start_handlers(service_id)
395 servers_lock = self.start_instances(service_id, instance,
396 handlers, handlers_lock)
399 print 'PyroGrid.publish: waiting for service %s to become available' % service_id
401 print 'Distributing jobs on the grid...'
403 servers_lock.acquire()
404 if not service_id
in self.servers: servers_lock.wait()
405 servers_lock.release()
408 print 'Start calculations on the grid... '
412 def check_published(self):
416 return False in self.__published.values()
or \
417 False in [ len(self.hosts) == len(self.servers[service_id])
418 for service_id
in self.__published.keys() ]
422 print 'WARNING: Attempting shutdown while publishing!'
424 if not self.__terminate_during_publish:
425 print ' Wait until publishing is complete...'
429 print ' Do not wait until publishing is complete...'
430 print ' Please, stop remote processes manually!'
433 return self.__stopped
449 self.check_published()
451 AbstractGrid.terminate(self)
453 for instance_uri, handler
in self.handlers.items():
458 self.handlers.pop(instance_uri)
460 except Pyro.errors.PyroError,e:
461 print ''.join(Pyro.util.getPyroTraceback(e))
463 self.__stopped =
True
466 print 'PyroGrid: terminated'
468 def broadcast(self, sfo_id, funcname, *args, **kw):
469 if self.comm_acquired ==
False:
470 self.comm_proxies = [ self.acquire_service(sfo_id)
for i
in \
471 xrange(self.n_hosts)]
472 self.comm_acquired =
True
473 return [getattr(prox, funcname)(*args, **kw)
for prox
in \
476 def scatter(self, sfo_id, funcname, arglist, kwlist=None):
477 if self.comm_acquired ==
False:
478 self.comm_proxies = [ self.acquire_service(sfo_id)
for i
in \
479 xrange(self.n_hosts)]
480 self.comm_acquired =
True
483 kwlist=[{}
for i
in xrange(len(arglist))]
484 if not hasattr(arglist[0],
'__iter__'):
485 arglist = [[i]
for i
in arglist]
486 for prox,args,kw
in zip(self.comm_proxies,arglist,kwlist):
487 func=getattr(prox, funcname)
488 if type(args)
is dict:
489 results.append(func(args,**kw))
491 results.append(func(*args, **kw))
494 def gather(self, results):
496 for server
in results:
497 retval.append(server.get())
500 def release_all(self):
501 for prox
in self.comm_proxies:
502 self.release_service(prox)
503 self.comm_acquired =
False
507 Runs on remote side, non-specific object.
509 It is a tool to launch instances of any picklable object
510 on the remote host (where it resides)
512 If PyroHandler is idle longer than .timeout time, it kills itself
513 on the remote side (together with all registered instances)
515 Returns: raw Pyro proxy
519 def __init__(self, timeout, nshost, debug):
521 Pyro.core.ObjBase.__init__(self)
525 self.instance_uri =
None
531 self.watchdog = WatchDog(timeout, debug, logfile=
'watchdog.debug')
532 self.watchdog.start()
535 self.watchdog.set(time)
537 def publish(self, instance, instance_uri, delegate):
539 delegate = False for descendants of Pyro.core.ObjBase
541 Publishes a copy of the instance on the host where PyroHandler
547 print 'PyroHandler.publish: publishing instance %s with URI %s' % (instance, instance_uri)
549 self.t = threading.Thread(target = PyroUtils.launch_instance, \
550 args = (instance, instance_uri, delegate,
551 self.nshost, self.debug,
False))
554 self.instance_uri = instance_uri
558 atexit.register(self.terminate)
565 print 'PyroHandler.terminate: %s terminating... ' % self
571 topop = [atexit._exithandlers[i][0] == self.terminate
572 for i
in range(len(atexit._exithandlers))]
574 atexit._exithandlers.pop( topop.index(
True) )
580 if not self.instance_uri ==
None:
581 PyroUtils.unregister(self.instance_uri, ns = self.nshost)
590 self._pyro_stop =
True
600 if hasattr(self,
'_pyro_suppl'):
601 s =
'%s(uri="%s", host="%s")' % (self.__class__.__name__, \
602 self._pyro_suppl[
'uri'], self._pyro_suppl[
'host'])
604 s = self.__class__.__name__
608 class PyroServer(Server):
613 print 'PyroServer.terminate: is terminating %s...' % self.proxy
615 self.proxy.terminate()
620 This is high level wrapper of Pyro proxy objects,
621 returns immediately with an empty Result object
623 Used to be able to be used alone (without a grid) since it could take care
624 of clashing invocations.
628 Calls always has to be in the following order:
630 proxy = g.acquire_service(service_id) ## returns proxy object
632 proxy.set_parameters(p) # these functions will be performed sequentally
633 proxy.f() # and service won't be freed
634 proxy.g() # unless .release_service() is called
636 g.release_service(proxy)
638 However if g.acquire_service call is followed immediately by
639 g.release_service, without calling proxy.f() or other proxy functions,
640 service will never be actually freed. Because service is freed from
641 inside PyroProxy in the run() cycle.
645 def __init__(self, pyro_proxy, debug = False, verbose = False):
647 pyro_proxy = raw Pyro proxy object
650 threading.Thread.__init__(self)
651 Pyro.core.initClient()
653 self.__verbose = verbose
656 self.__called = threading.Event()
657 self.__call_method_name =
None
658 self.__call_method_args =
None
659 self.__call_method_result =
None
661 pyro_proxy._release()
662 self.__uri = pyro_proxy._pyro_suppl[
'uri']
663 self.__host = pyro_proxy._pyro_suppl[
'host']
664 self.__nshost = pyro_proxy._pyro_suppl[
'nshost']
665 self.__callable = pyro_proxy._pyro_suppl[
'callable']
668 self.__stopped =
False
670 self.__debug_out = sys.stdout
672 self.__debug_out = open(
'./DEBUG_%s.log' % self.__uri,
'w')
677 self.__pyro_lock = threading.Lock()
678 self.__pyro_proxy = pyro_proxy
680 if self.__debug
and self.__verbose:
681 self.__dp(
'%s is created' % self)
687 if self.__verbose: s0 =
'%s ' % self.__uri
690 if self.__pyro_lock.locked(): s1 =
' locked'
691 else: s1 =
'unlocked'
693 if self.__called.isSet(): s2 =
'called'
696 print >>self.__debug_out,
'[%s %s, %s] %s' \
700 self.__debug_out.flush()
702 def __setattr__(self, name, val):
704 if '__pyro_proxy' in self.__dict__.keys()
and \
705 not name
in self.__dict__.keys():
707 self.__pyro_lock.acquire()
708 self.__pyro_proxy._release()
709 setattr(self.__pyro_proxy, name, val)
710 self.__pyro_lock.release()
714 threading.Thread.__setattr__(self, name, val)
716 def __getattr__(self, name):
722 msg =
'PyroProxy.__getattr__(%s): ' % name
724 if name
in self.__callable:
726 if self.__callable[name]:
731 if self.__debug
and self.__verbose:
732 self.__dp( msg +
'is callable')
734 return lambda *args: self.__call_method(name, *args)
737 self.__pyro_lock.acquire()
738 self.__pyro_proxy._release()
739 attr = getattr(self.__pyro_proxy, name)
740 self.__pyro_lock.release()
744 raise ValueError,
'Proxy object does not have attribute "%s"' % name
746 def __call_method(self, name, *args):
748 msg =
'PyroProxy.__call_method(%s): ' % name
750 if self.__debug
and self.__verbose:
751 self.__dp( msg +
'acquiring the lock "%s", args = <%s>' \
754 self.__pyro_lock.acquire()
762 if self.__debug
and self.__verbose:
763 self.__dp( msg +
'acquired the lock "%s", args = <%s>' % (name, args) )
765 self.__call_method_name = name
766 self.__call_method_args = args
767 self.__call_method_result = Result(self)
769 if self.__debug
and self.__verbose:
770 self.__dp( msg +
'"%s", args = <%s>' \
771 % (self.__call_method_name, self.__call_method_args) )
775 if self.__debug
and self.__verbose:
776 self.__dp( msg +
'now called (triggers START )')
778 return self.__call_method_result
782 while not self.__stop:
784 msg =
'PyroProxy.run(%s): ' % self.__call_method_name
786 if self.__debug
and self.__verbose:
787 self.__dp( msg +
'waiting for calls...' )
790 if self.__stop:
break
792 msg =
'PyroProxy.run(%s): ' % self.__call_method_name
794 if self.__debug
and self.__verbose:
795 self.__dp( msg +
'now called (START)' )
798 if hasattr(self,
'_handler'):
799 remote_call( self.__debug, self.__verbose, self._handler,
802 self.__call_method_result.value = remote_call( self.__debug, self.__verbose,\
803 self.__pyro_proxy, self.__call_method_name, *self.__call_method_args )
808 self.__dp( msg +
''.join(Pyro.util.getPyroTraceback(e)) )
809 print msg +
'exception occurred remotely! [%s]' % self
810 print msg +
''.join(Pyro.util.getPyroTraceback(e))
813 self.__call_method_result.event.set()
817 if self.__debug
and self.__verbose:
818 self.__dp( msg +
'becoming idle...' )
820 self.__called.clear()
822 self.__call_method_name =
None
824 if self.__debug
and self.__verbose:
825 self.__dp( msg +
'now idle (FINISH)' )
829 if hasattr(self,
'_server')
and hasattr(self._server,
'grid') \
830 and self._selfrelease:
832 if self.__debug
and self.__verbose:
833 self.__dp( msg +
'releasing server %s for the grid' % self._server )
835 self._selfrelease =
False
837 self._server.grid._release_service(self)
841 self.__pyro_lock.release()
843 if not self.__debug_out
is sys.stdout:
844 self.__debug_out.close()
846 self.__stopped =
True
849 print msg +
' %s terminated' % self
852 return self.__stopped
854 def terminate(self, terminate_proxy = 0):
856 if self.__debug
and self.__verbose:
857 print 'PyroProxy.terminate: %s terminating...' % self
872 self.__pyro_lock.acquire()
877 self.__pyro_proxy._release()
878 self.__pyro_proxy._pyro_stop =
True
880 if not PyroUtils.is_stopped(self.__uri, ns = self.__nshost):
881 PyroUtils.unregister(self.__uri, ns = self.__nshost)
883 if hasattr(self,
'_handler'):
884 self._handler._release()
885 self._handler.instance_uri =
None
887 while not self.__stopped:
893 s =
'%s("%s")' % (self.__class__.__name__, self.__uri)
899 def remote_call(debug, verbose, proxy, method, *args):
901 for long calls when connection can fail
911 result = getattr(proxy, method)(*args)
914 except Pyro.errors.ConnectionClosedError,e:
917 e =
''.join(Pyro.util.getPyroTraceback(e))
920 print '%s\nPyroUtils.remote_call(%s): Connection to %s is lost, trying reconnect...' \
923 proxy.adapter.rebindURI(tries = 50)