IMP  2.0.1
The Integrative Modeling Platform
PyroGrid.py
1 ##
2 ## The Inferential Structure Determination (ISD) software library
3 ##
4 ## Authors: Darima Lamazhapova and Wolfgang Rieping
5 ##
6 ## Copyright (C) Michael Habeck and Wolfgang Rieping
7 ##
8 ## All rights reserved.
9 ##
10 ## NO WARRANTY. This library is provided'as is' without warranty of any
11 ## kind, expressed or implied, including, but not limited to the implied
12 ## warranties of merchantability and fitness for a particular purpose or
13 ## a warranty of non-infringement.
14 ##
15 ## Distribution of substantively modified versions of this module is
16 ## prohibited without the explicit permission of the copyright holders.
17 ##
18 
19 import os
20 import sys
21 import socket
22 import threading
23 import time
24 import Queue
25 import Pyro.core, Pyro.errors, Pyro.util
26 import atexit
27 import string
28 
29 from IMP.isd.utils import WatchDog
30 from IMP.isd.AbstractGrid import AbstractGrid, Server, Result
31 from IMP.isd import PyroUtils
32 
33 Pyro.config.PYRO_PRINT_REMOTE_TRACEBACK = 1
34 Pyro.config.PYRO_DETAILED_TRACEBACK = 1
35 Pyro.config.PYRO_MOBILE_CODE = 1
36 
37 def get_sge_host_list():
38  """parse the sge variable $PE_HOSTFILE"""
39  hostlist = []
40  for line in open(os.environ['PE_HOSTFILE']):
41  tokens=line.split()
42  rep = int(tokens[1])
43  for i in xrange(rep):
44  hostlist.append(tokens[0])
45  return hostlist
46 
47 class PyroGrid(AbstractGrid):
48 
49  def __init__(self, hosts, src_path, display, X11_delay,
50  debug, verbose,
51  shared_temp_path, nshost, terminate_during_publish,
52  method='ssh'):
53 
54  AbstractGrid.__init__(self, hosts, src_path, display, X11_delay,\
55  debug, verbose, shared_temp_path)
56 
57  #copy remaining files specific for the pyrogrid
58  self.initialise(src_path, ['PyroGrid','PyroUtils',
59  'PyroHandlerLoader'])
60 
61  #set the pyro loader as the loader
62  self.set_loader(src_path, 'PyroHandlerLoader')
63 
64  #method is either ssh or qsub
65  if method != 'ssh' and method != 'qsub':
66  raise ValueError, "unknown method: %s. Try ssh or qsub" % method
67  self.method = method
68 
69  #number of hosts
70  self.n_hosts = len(hosts)
71 
72  #name of the host having the pyro-ns nameserver
73  self.nshost = nshost
74 
75  self.timeout = 60.0 ## (for WatchDog, in minutes)
76 
77  self.handlers = {} ## maps instance (URI) to a handler (proxy)
78 
79  self.__terminate_during_publish = terminate_during_publish
80  self.__published = {}
81 
82  self.comm_acquired = False
83 
84  self.__stopped = False
85 
86  def _launch_handler(self, handler_uri, host):
87  """start up handlers in parallel. Handlers have URI 'handler_uri' and
88  are launched on 'host'.
89  """
90 
91  handler_script = os.path.join(host.temp_path,
92  self.loader_filename)
93 
94  #arguments to be passed to the handler script launched on each client.
95  argv = "%s %s %s %s %s %s %s %s" \
96  % (handler_script, host.niceness, self.nshost, handler_uri, \
97  self.timeout, self.debug, self.display, host.temp_path)
98 
99  #if x11 forwarding is requested
100  if self.display:
101  try:
102  title = socket.getfqdn(host.name).split('.')[0]
103  except:
104  title = host.name
105  cmd = "xterm -T %s -geometry %s -hold -e \"%s -i %s\"" \
106  % (title, self.window_size, host.python, argv)
107 
108  else: #no x11 forwarding
109  cmd = "%s %s" % (host.python, argv)
110 
111  #if requested, add required init commands prior to
112  #launching anything else on the target host.
113  if host.init_cmd != '':
114  if host.init_cmd.rstrip().endswith(';'):
115  cmd = host.init_cmd + cmd
116  elif host.init_cmd.rstrip().endswith('!'):
117  cmd = host.init_cmd.rstrip()[:-1] + cmd
118  else:
119  cmd = host.init_cmd + ';' + cmd
120 
121  #now wrap everything for ssh
122  if self.method == 'ssh':
123  cmd = "ssh %s '%s' " % (host.name, cmd)
124  else:
125  cmd = "qrsh -inherit %s '%s' " % (host.name, cmd)
126 
127  #if debug then be verbose. In any case do this in background.
128  if self.debug:
129  cmd = cmd + " > /dev/null &"
130  print 'PYRO echo: %s' % cmd
131  else:
132  cmd = cmd + "&"
133 
134  #finally issue command to the shell.
135  os.system(cmd)
136 
137  def _connect_handlers(self, handler_uris, handlers, handlers_lock, hosts_map):
138 
139  #handlers: a FIFO queue,
140  #handlers_lock: a Condition lock
141  #handler (see below): the PyroHandler proxy
142 
143  ## connect to handlers sequentially
144 
145  _ns = PyroUtils.get_nameserver(self.nshost)
146 
147  for i in range(self.n_hosts):
148  try:
149  handler = PyroUtils.get_proxy(handler_uris[i], ns = _ns)
150  except:
151  print 'PyroGrid._connect_handlers: failed to connect to "%s"' % handler_uris[i]
152  raise
153 
154  #tell the pyro proxy to close the socket for now.
155  handler._release()
156  #tell the handler on which host he's running.
157  handler.host = hosts_map[handler_uris[i]]
158 
159  if self.debug:
160  print 'PyroGrid._connect_handlers: handler "%s" on "%s" is ready' \
161  % (handler_uris[i], handler._pyro_suppl['host'])
162 
163  #acquire the lock
164  handlers_lock.acquire()
165  #put the handler proxy object into the queue
166  handlers.put(handler)
167 
168  if self.debug:
169  print 'PyroGrid._connect_handlers: notifies "%s" is in the queue' % handler_uris[i]
170  #update queue status and release lock
171  handlers_lock.notify()
172  handlers_lock.release()
173 
174  def _launch_instances(self, instance, instance_uris, \
175  handlers, handlers_lock, \
176  launched_instances, launched_instances_lock, \
177  self_handlers_lock):
178 
179  ## launch instances in parallel (handler.publish launch instance in a thread)
180 
181  #handlers: PyroHandler proxies in a Queue(). It's different from
182  # self.handlers, which is a dict['URI']=handler proxy.
183  #handlers_lock: the condition lock associated to filling this queue.
184 
185  ns_clear_interval = 5
186 
187  for i in range(self.n_hosts):
188 
189  #get the lock, and obtain the next queue element, or wait for it to be filled.
190  handlers_lock.acquire()
191  if handlers.empty(): handlers_lock.wait()
192  handler = handlers.get()
193  handlers_lock.release()
194 
195  instance_uri = instance_uris[i]
196 
197  if self.debug:
198  print 'PyroGrid._launch_instances: "%s" is being published remotely...' % instance_uri
199 
200  #TODO: remove the release(), since it was done previously
201  handler._release()
202  handler.publish(instance, instance_uri, delegate = True)
203 
204  #The self.handlers dict is a shared resource hence the acquire / release mechanism.
205  self_handlers_lock.acquire()
206  self.handlers[instance_uri] = handler
207  self_handlers_lock.release()
208 
209  launched_instances_lock.acquire()
210  launched_instances.put(instance_uri)
211 
212  if self.debug:
213  print 'PyroGrid._launch_instances: notifies "%s" is being published' % instance_uri
214 
215  launched_instances_lock.notify()
216  launched_instances_lock.release()
217 
218  #if i and not i % ns_clear_interval:
219  # print 'Pausing for 1 min'
220  # sleep(60.)
221 
222  def _connect_instances(self, service_id, \
223  launched_instances, launched_instances_lock, \
224  self_handlers_lock, servers_lock):
225 
226  ## connect to instances sequentially
227 
228  _ns = PyroUtils.get_nameserver(self.nshost)
229 
230  for i in range(self.n_hosts):
231 
232  launched_instances_lock.acquire()
233  if launched_instances.empty(): launched_instances_lock.wait()
234  instance_uri = launched_instances.get()
235  launched_instances_lock.release()
236 
237  try:
238  instance_proxy = PyroUtils.get_proxy(instance_uri, ns = _ns)
239  except:
240  print 'PyroGrid._connect_instances: failed to connect to "%s"' % instance_uri
241  raise
242 
243  if self.debug:
244  print 'PyroGrid._connect_instances: instance "%s" on "%s" is ready' \
245  % (instance_uri, instance_proxy._pyro_suppl['host'])
246 
247  self._append_instance(service_id, instance_proxy, \
248  self_handlers_lock, servers_lock)
249 
250  self.__published[service_id] = True
251 
252  def _append_instance(self, service_id, instance_proxy, \
253  self_handlers_lock, servers_lock):
254 
255  ## extracts parameters needed for PyroProxy below
256 
257  instance_proxy._release()
258  instance_uri = instance_proxy._pyro_suppl['uri']
259 
260  self_handlers_lock.acquire()
261  handler = self.handlers[instance_uri]
262  handler._release()
263  host = handler.host
264  self_handlers_lock.release()
265 
266  ## wrap raw Pyro proxy in PyroProxy (smart remote object)
267  ## and then in PyroServer (for grid interface)
268 
269  proxy = PyroProxy(instance_proxy, debug = self.debug, verbose = self.verbose)
270  server = PyroServer(proxy, service_id, host, self.debug)
271 
272  ## register the handler in the proxy (to monitor whether the grid is alive)
273 
274  self_handlers_lock.acquire()
275  proxy.__dict__['_handler'] = handler
276  self_handlers_lock.release()
277 
278  ## register the server in the grid
279 
280  #add the grid to server.grid and fill in self.queues and self.servers
281  #self.queues: a dict[service_id of instance
282  #to be published]=Queue(-1) containing the DerivedServers
283  #self.servers: dict[service_id of instance
284  #to be published]=[list of PyroServer instances]
285  servers_lock.acquire()
286  self.add_server(server)
287 
288  if self.debug:
289  print 'PyroGrid._append_instance: notifies %s is ready' % server
290 
291  servers_lock.notify()
292  servers_lock.release()
293 
294  def start_handlers(self, service_id):
295  """
296  Starts PyroHandler on each host (non-blocking) and
297  returns a Condition lock object and
298  a FIFO queue that is being filled with their
299  Pyro proxies by a separate thread
300 
301  """
302  #create a list of unique URIs for each host
303  handler_uris = PyroUtils.create_unique_uri(\
304  string.join(('PyroHandler', service_id),'.'),\
305  n = self.n_hosts, ns = self.nshost)
306 
307  #create a FIFO queue of infinite capacity
308  handlers = Queue.Queue(-1)
309  #create the condition lock, see effbot.org/zone/thread-synchronization.htm
310  handlers_lock = threading.Condition()
311 
312  hosts_map = {}
313 
314  for i in range(self.n_hosts):
315  #ssh to each host and fork a PyroHandler thread.
316  self._launch_handler(handler_uris[i], self.hosts[i])
317  hosts_map[handler_uris[i]] = self.hosts[i]
318 
319  ## sleep here is to avoid errors like
320  ## "Warning: No xauth data; using fake
321  ## authentication data for X11 forwarding."
322 
323  #TODO: speedup things by checking if it is necessary
324  time.sleep(self.X11_delay)
325 
326  #fill the queue asynchronously with the handler proxy objects that were launched.
327  t = threading.Thread(target = self._connect_handlers, \
328  args = (handler_uris, handlers, handlers_lock, hosts_map))
329  t.start()
330 
331  return handlers, handlers_lock
332 
333  def start_instances(self, service_id, instance, handlers, handlers_lock):
334 
335  #create n unique URIs for the instance, out of its service_id.
336  instance_uris = PyroUtils.create_unique_uri(\
337  service_id, n = self.n_hosts, ns = self.nshost)
338 
339  #queue and lock for managing the launched instances of the 'instance' object
340  #via the PyroHandler
341  launched_instances = Queue.Queue(-1)
342  launched_instances_lock = threading.Condition()
343 
344  instances = Queue.Queue(-1)
345  #lock for the self.handlers dict
346  self_handlers_lock = threading.Condition()
347 
348  servers_lock = threading.Condition()
349 
350  #Call PyroHandler.publish(instance) on each host
351  t0 = threading.Thread(target = self._launch_instances, \
352  args = (instance, instance_uris, handlers, handlers_lock,\
353  launched_instances, launched_instances_lock, \
354  self_handlers_lock))
355  t0.start()
356 
357  #Create a PyroServer instance for each host, and put
358  #them into
359  #self.queues: a dict[service_id of instance
360  #to be published]=Queue(-1) containing the DerivedServers
361  #self.servers: dict[service_id of instance
362  #to be published]=[list of PyroServer instances]
363  t1 = threading.Thread(target = self._connect_instances, \
364  args = (service_id, \
365  launched_instances, launched_instances_lock, \
366  self_handlers_lock, servers_lock))
367  t1.start()
368 
369  return servers_lock
370 
371  def publish(self, instance):
372  """
373  Publish instance on all the hosts of the grid
374  (specific for the communication method)
375 
376  Returns: service_id
377 
378  Internally:
379 
380  It creates list of servers performing the job
381  specified by this service_id
382 
383  """
384 
385  #get unique identifier for this instance
386  service_id = self.create_service_id(instance)
387 
388  self.__published[service_id] = False
389 
390  #start PyroHandlers and give them an URI which contains service_id)
391  handlers, handlers_lock = self.start_handlers(service_id)
392 
393  #start the PyroServer instances which control the underlying 'instance' objects
394  #published on each host by the PyroHandler
395  servers_lock = self.start_instances(service_id, instance,
396  handlers, handlers_lock)
397 
398  if self.debug:
399  print 'PyroGrid.publish: waiting for service %s to become available' % service_id
400 
401  print 'Distributing jobs on the grid...'
402 
403  servers_lock.acquire()
404  if not service_id in self.servers: servers_lock.wait()
405  servers_lock.release()
406 
407  #now that there is at least one server in the queue that can accept a job, launch the sims.
408  print 'Start calculations on the grid... '
409 
410  return service_id
411 
412  def check_published(self):
413 
414  def publishing():
415 
416  return False in self.__published.values() or \
417  False in [ len(self.hosts) == len(self.servers[service_id])
418  for service_id in self.__published.keys() ]
419 
420  if publishing():
421 
422  print 'WARNING: Attempting shutdown while publishing!'
423 
424  if not self.__terminate_during_publish:
425  print ' Wait until publishing is complete...'
426  while publishing():
427  time.sleep(0.5)
428  else:
429  print ' Do not wait until publishing is complete...'
430  print ' Please, stop remote processes manually!'
431 
432  def ishalted(self):
433  return self.__stopped
434 
435  def terminate(self):
436 
437  ## AbstractGrid.terminate:
438  ##
439  ## 1. prevents server from being acquired [AbstractGrid.terminate]
440  ## 2. terminates server and its proxy:
441  ## AbstractGrid.terminate -> PyroServer.terminate ->
442  ## -> PyroProxy.terminate,
443  ## where PyroProxy.terminate does
444  ##
445  ## PyroGrid.terminate:
446  ##
447  ## 3. terminates handler [ PyroHandler.terminate ]
448 
449  self.check_published()
450 
451  AbstractGrid.terminate(self)
452 
453  for instance_uri, handler in self.handlers.items():
454 
455  try:
456  handler._release()
457  handler.terminate()
458  self.handlers.pop(instance_uri)
459 
460  except Pyro.errors.PyroError,e:
461  print ''.join(Pyro.util.getPyroTraceback(e))
462 
463  self.__stopped = True
464 
465  if self.debug: ## please do not remove.
466  print 'PyroGrid: terminated'
467 
468  def broadcast(self, sfo_id, funcname, *args, **kw):
469  if self.comm_acquired == False:
470  self.comm_proxies = [ self.acquire_service(sfo_id) for i in \
471  xrange(self.n_hosts)]
472  self.comm_acquired = True
473  return [getattr(prox, funcname)(*args, **kw) for prox in \
474  self.comm_proxies]
475 
476  def scatter(self, sfo_id, funcname, arglist, kwlist=None):
477  if self.comm_acquired == False:
478  self.comm_proxies = [ self.acquire_service(sfo_id) for i in \
479  xrange(self.n_hosts)]
480  self.comm_acquired = True
481  results = []
482  if kwlist is None:
483  kwlist=[{} for i in xrange(len(arglist))]
484  if not hasattr(arglist[0],'__iter__'):
485  arglist = [[i] for i in arglist]
486  for prox,args,kw in zip(self.comm_proxies,arglist,kwlist):
487  func=getattr(prox, funcname)
488  if type(args) is dict:
489  results.append(func(args,**kw))
490  else:
491  results.append(func(*args, **kw))
492  return results
493 
494  def gather(self, results):
495  retval=[]
496  for server in results:
497  retval.append(server.get())
498  return retval
499 
500  def release_all(self):
501  for prox in self.comm_proxies:
502  self.release_service(prox)
503  self.comm_acquired = False
504 
505 class PyroHandler(Pyro.core.ObjBase):
506  """
507  Runs on remote side, non-specific object.
508 
509  It is a tool to launch instances of any picklable object
510  on the remote host (where it resides)
511 
512  If PyroHandler is idle longer than .timeout time, it kills itself
513  on the remote side (together with all registered instances)
514 
515  Returns: raw Pyro proxy
516 
517  """
518 
519  def __init__(self, timeout, nshost, debug):
520 
521  Pyro.core.ObjBase.__init__(self)
522 
523  self.debug = debug
524  self.nshost = nshost
525  self.instance_uri = None
526  self.host = None
527 
528  #fork a WatchDog thread, which sleeps all the time and quits if it
529  #hasn't been pinged after timeout minutes, by calling self.watchdog.set(xxx)
530  #where xxx is a float (the current time).
531  self.watchdog = WatchDog(timeout, debug, logfile='watchdog.debug')
532  self.watchdog.start()
533 
534  def set(self, time):
535  self.watchdog.set(time)
536 
537  def publish(self, instance, instance_uri, delegate):
538  """
539  delegate = False for descendants of Pyro.core.ObjBase
540 
541  Publishes a copy of the instance on the host where PyroHandler
542  is running.
543 
544  """
545 
546  if self.debug:
547  print 'PyroHandler.publish: publishing instance %s with URI %s' % (instance, instance_uri)
548 
549  self.t = threading.Thread(target = PyroUtils.launch_instance, \
550  args = (instance, instance_uri, delegate,
551  self.nshost, self.debug, False))
552  self.t.start()
553 
554  self.instance_uri = instance_uri
555 
556  ## to enable cleaning up upon remote shutdown
557 
558  atexit.register(self.terminate)
559 
560  def terminate(self):
561 
562  ## called from PyroProxy.terminate or on sys.exit (WatchDog call)
563 
564  if self.debug:
565  print 'PyroHandler.terminate: %s terminating... ' % self
566 
567  ##--- call terminate only
568  ## if the handler is already terminated (e.g. by hands)
569  ## then function is not called on the exit
570 
571  topop = [atexit._exithandlers[i][0] == self.terminate
572  for i in range(len(atexit._exithandlers))]
573  if True in topop:
574  atexit._exithandlers.pop( topop.index(True) )
575 
576  ##--- unregister from the nameserver
577  ## it will waits PyroUtils.default_timeout to let it
578  ## finish self-unregistration
579 
580  if not self.instance_uri == None:
581  PyroUtils.unregister(self.instance_uri, ns = self.nshost)
582 
583  ##--- shuts down remote process
584  ## will leave interpreter alive if debug = True
585 
586  ## PyroHandler itself is an object instance managed
587  ## by Pyro.core.Daemon, hence will be stopped by setting
588  ## PyroHandler._pyro_stop to True
589 
590  self._pyro_stop = True
591 
592  ## PyroUtils.is_stopped(self._pyro_suppl['uri'], ns = self.nshost)
593  ##
594  ## but also it may cause problems when running without xterm (unconfirmed)
595  ## because it shuts down the python process (daemon.shutdown()) causing
596  ## connection lost error
597 
598  def __str__(self):
599 
600  if hasattr(self, '_pyro_suppl'):
601  s = '%s(uri="%s", host="%s")' % (self.__class__.__name__, \
602  self._pyro_suppl['uri'], self._pyro_suppl['host'])
603  else:
604  s = self.__class__.__name__
605 
606  return s
607 
608 class PyroServer(Server):
609 
610  def terminate(self):
611 
612  if self.debug:
613  print 'PyroServer.terminate: is terminating %s...' % self.proxy
614 
615  self.proxy.terminate()
616 
617 
618 class PyroProxy(threading.Thread):
619  """
620  This is high level wrapper of Pyro proxy objects,
621  returns immediately with an empty Result object
622 
623  Used to be able to be used alone (without a grid) since it could take care
624  of clashing invocations.
625 
626  REMARK:
627 
628  Calls always has to be in the following order:
629 
630  proxy = g.acquire_service(service_id) ## returns proxy object
631 
632  proxy.set_parameters(p) # these functions will be performed sequentally
633  proxy.f() # and service won't be freed
634  proxy.g() # unless .release_service() is called
635 
636  g.release_service(proxy)
637 
638  However if g.acquire_service call is followed immediately by
639  g.release_service, without calling proxy.f() or other proxy functions,
640  service will never be actually freed. Because service is freed from
641  inside PyroProxy in the run() cycle.
642 
643  """
644 
645  def __init__(self, pyro_proxy, debug = False, verbose = False):
646  """
647  pyro_proxy = raw Pyro proxy object
648 
649  """
650  threading.Thread.__init__(self)
651  Pyro.core.initClient()
652 
653  self.__verbose = verbose
654  self.__debug = debug
655 
656  self.__called = threading.Event()
657  self.__call_method_name = None
658  self.__call_method_args = None
659  self.__call_method_result = None
660 
661  pyro_proxy._release()
662  self.__uri = pyro_proxy._pyro_suppl['uri']
663  self.__host = pyro_proxy._pyro_suppl['host']
664  self.__nshost = pyro_proxy._pyro_suppl['nshost']
665  self.__callable = pyro_proxy._pyro_suppl['callable']
666 
667  self.__stop = False
668  self.__stopped = False
669 
670  self.__debug_out = sys.stdout
671  if self.__verbose:
672  self.__debug_out = open('./DEBUG_%s.log' % self.__uri, 'w')
673 
674  ## every attribute declared (assigned) after self.__pyro_proxy will
675  ## be declared (assigned) in the remote object
676 
677  self.__pyro_lock = threading.Lock()
678  self.__pyro_proxy = pyro_proxy
679 
680  if self.__debug and self.__verbose:
681  self.__dp('%s is created' % self)
682 
683  self.start()
684 
685  def __dp(self, str):
686 
687  if self.__verbose: s0 = '%s ' % self.__uri
688  else: s0 = ''
689 
690  if self.__pyro_lock.locked(): s1 = ' locked'
691  else: s1 = 'unlocked'
692 
693  if self.__called.isSet(): s2 = 'called'
694  else: s2 = ' idle'
695 
696  print >>self.__debug_out, '[%s %s, %s] %s' \
697  % (s0, s1, s2, str)
698 
699  if self.__verbose:
700  self.__debug_out.flush()
701 
702  def __setattr__(self, name, val):
703 
704  if '__pyro_proxy' in self.__dict__.keys() and \
705  not name in self.__dict__.keys():
706 
707  self.__pyro_lock.acquire()
708  self.__pyro_proxy._release()
709  setattr(self.__pyro_proxy, name, val)
710  self.__pyro_lock.release()
711 
712  else:
713 
714  threading.Thread.__setattr__(self, name, val)
715 
716  def __getattr__(self, name):
717 
718  ## Called when an attribute lookup has not found the attribute
719  ## in the usual places (i.e. it is not an instance attribute
720  ## nor is it found in the class tree for self)
721 
722  msg = 'PyroProxy.__getattr__(%s): ' % name
723 
724  if name in self.__callable:
725 
726  if self.__callable[name]:
727 
728  ## function '_call_method' will be invoked instead of function 'name'
729  ## and arguments of function 'name' will go into '*args'
730 
731  if self.__debug and self.__verbose:
732  self.__dp( msg + 'is callable')
733 
734  return lambda *args: self.__call_method(name, *args)
735 
736  else:
737  self.__pyro_lock.acquire()
738  self.__pyro_proxy._release()
739  attr = getattr(self.__pyro_proxy, name)
740  self.__pyro_lock.release()
741 
742  return attr
743  else:
744  raise ValueError, 'Proxy object does not have attribute "%s"' % name
745 
746  def __call_method(self, name, *args):
747 
748  msg = 'PyroProxy.__call_method(%s): ' % name
749 
750  if self.__debug and self.__verbose:
751  self.__dp( msg + 'acquiring the lock "%s", args = <%s>' \
752  % (name, args) )
753 
754  self.__pyro_lock.acquire()
755 
756  ## NOTE: we do not have to _release here because
757  ## it is done in remote_call (also it is faster if we do not
758  ## reconnect when call is from within of the same thread)
759  ##
760  ## self.__pyro_proxy._release()
761 
762  if self.__debug and self.__verbose:
763  self.__dp( msg + 'acquired the lock "%s", args = <%s>' % (name, args) )
764 
765  self.__call_method_name = name
766  self.__call_method_args = args
767  self.__call_method_result = Result(self)
768 
769  if self.__debug and self.__verbose:
770  self.__dp( msg + '"%s", args = <%s>' \
771  % (self.__call_method_name, self.__call_method_args) )
772 
773  self.__called.set() ## "called"
774 
775  if self.__debug and self.__verbose:
776  self.__dp( msg + 'now called (triggers START )')
777 
778  return self.__call_method_result
779 
780  def run(self):
781 
782  while not self.__stop:
783 
784  msg = 'PyroProxy.run(%s): ' % self.__call_method_name
785 
786  if self.__debug and self.__verbose:
787  self.__dp( msg + 'waiting for calls...' )
788 
789  self.__called.wait() ## wait until "called"
790  if self.__stop: break
791 
792  msg = 'PyroProxy.run(%s): ' % self.__call_method_name
793 
794  if self.__debug and self.__verbose:
795  self.__dp( msg + 'now called (START)' )
796 
797  try:
798  if hasattr(self, '_handler'):
799  remote_call( self.__debug, self.__verbose, self._handler,
800  'set', time.time() )
801 
802  self.__call_method_result.value = remote_call( self.__debug, self.__verbose,\
803  self.__pyro_proxy, self.__call_method_name, *self.__call_method_args )
804 
805  except Exception,e:
806 
807  if not self.__stop:
808  self.__dp( msg + ''.join(Pyro.util.getPyroTraceback(e)) )
809  print msg + 'exception occurred remotely! [%s]' % self
810  print msg + ''.join(Pyro.util.getPyroTraceback(e))
811  raise
812 
813  self.__call_method_result.event.set()
814 
815  ## when the result is ready
816 
817  if self.__debug and self.__verbose:
818  self.__dp( msg + 'becoming idle...' )
819 
820  self.__called.clear() ## "idle"
821 
822  self.__call_method_name = None
823 
824  if self.__debug and self.__verbose:
825  self.__dp( msg + 'now idle (FINISH)' )
826 
827  ## calculations are finished -> release the service in the grid
828 
829  if hasattr(self, '_server') and hasattr(self._server, 'grid') \
830  and self._selfrelease:
831 
832  if self.__debug and self.__verbose:
833  self.__dp( msg + 'releasing server %s for the grid' % self._server )
834 
835  self._selfrelease = False
836 
837  self._server.grid._release_service(self)
838 
839  ## now, when everything is assigned, can be free
840 
841  self.__pyro_lock.release()
842 
843  if not self.__debug_out is sys.stdout:
844  self.__debug_out.close()
845 
846  self.__stopped = True
847 
848  if self.__debug:
849  print msg + ' %s terminated' % self
850 
851  def ishalted(self):
852  return self.__stopped
853 
854  def terminate(self, terminate_proxy = 0):
855 
856  if self.__debug and self.__verbose:
857  print 'PyroProxy.terminate: %s terminating...' % self
858 
859  self.__stop = True
860  self.__called.set()
861 
862  ## if this proxy is used within PyroGrid,
863  ## PyroHandler.terminate will clean up by:
864  ##
865  ## 1. unregister Pyro URI from nameserver
866  ## 2. close remote python process
867  ##
868  ## therefore terminate_proxy should be 0
869 
870  if terminate_proxy:
871 
872  self.__pyro_lock.acquire()
873 
874  ## 1. unregister Pyro URI from nameserver and handler
875  ## 2. leaves remote python process intact
876 
877  self.__pyro_proxy._release()
878  self.__pyro_proxy._pyro_stop = True
879 
880  if not PyroUtils.is_stopped(self.__uri, ns = self.__nshost):
881  PyroUtils.unregister(self.__uri, ns = self.__nshost)
882 
883  if hasattr(self, '_handler'):
884  self._handler._release()
885  self._handler.instance_uri = None
886 
887  while not self.__stopped:
888  time.sleep(0.1)
889 
890  def __str__(self):
891 
892  #s = '%s(uri="%s", host="%s")' % (self.__class__.__name__, self.__uri, self.__host)
893  s = '%s("%s")' % (self.__class__.__name__, self.__uri)
894 
895  return s
896 
897  __repr__ = __str__
898 
899 def remote_call(debug, verbose, proxy, method, *args):
900  """
901  for long calls when connection can fail
902  during the call
903 
904  """
905 
906  i = 1
907  while i <= 5:
908 
909  try:
910  proxy._release()
911  result = getattr(proxy, method)(*args)
912  return result
913 
914  except Pyro.errors.ConnectionClosedError,e:
915 
916  if verbose:
917  e = ''.join(Pyro.util.getPyroTraceback(e))
918 
919  if debug:
920  print '%s\nPyroUtils.remote_call(%s): Connection to %s is lost, trying reconnect...' \
921  % (e, method, proxy)
922 
923  proxy.adapter.rebindURI(tries = 50)
924  i += 1