19 import os, sys, string
20 from threading
import Thread, Event, Lock
21 from Queue
import Queue
22 from fnmatch
import fnmatch
28 from hosts
import abspath, remote_mkdirs
29 from utils
import atexit_register, atexit_unregister,Pipe,average
35 Grid main functionality: manage services
37 It contains list of servers, which provide services.
38 When instance is published in the grid, corresponding servers object
39 are created on each host and saved as list in the dictionary, with
40 a key corresponding to their service_id.
42 Servers has part specific for communication method
43 (at the creation stage), thus have to be defined in derived classes.
48 g.copy_files(isd_src_path, ['*.pyc', '*.so'])
50 some_obj_instance = some_obj_class(..)
52 service_id = g.publish(some_obj_instance)
54 -> a) assigns some_service_id to this instance
55 b) creates servers that manage the execution of this instance
56 in remote hosts: servers[some_service_id] = [server1, server2, ...]
58 proxy = g.acquire_service(service_id) ## returns proxy object
60 proxy.set_parameters(p) # these functions will be performed sequentally
61 proxy.f() # and service won't be freed
62 proxy.g() # unless .release_service() is called
64 g.release_service(proxy)
68 def __init__(self, hosts, src_path, display, X11_delay, debug, verbose, shared_temp_path):
71 not shared_temp_path: temp_path == None
72 look for individual temp_paths in host list
78 self.verbose = verbose
80 self.debug_out = sys.stderr
82 self.display = display
83 self.X11_delay = X11_delay
84 self.window_size =
'30x10'
90 self.shared_temp_path = shared_temp_path
95 self.
initialise(src_path, [
'AbstractGrid',
'utils',
'hosts',
'logfile'])
104 atexit_register(self.terminate)
108 print >>self.debug_out, str
109 self.debug_out.flush()
111 def set_loader(self, src_path, loader_filename):
112 "sets the name of the compiled python file of the loader"
114 if os.path.exists(os.path.join(src_path,
'%s.pyo' % loader_filename)):
115 self.loader_filename =
'%s.pyo' % loader_filename
117 self.loader_filename =
'%s.pyc' % loader_filename
119 def set_debug(self, debug, verbose = None):
122 if not verbose ==
None: self.verbose = verbose
126 Publish instance on all the hosts of the grid
127 (specific for the communication method)
133 It creates list of servers performing the job
134 specified by this service_id
137 raise NotImplementedError
139 def create_service_id(self, instance):
141 return string.join( (instance.__class__.__name__, str(abs(id(instance)))),
'.' )
147 Registers the server within a grid
149 1) register itself (grid) in the server
150 2) proxy must have _selfrelease attribute to be able
151 to release service (i.e. put the connected to it server
152 in the .queues[service_id]), once the last function
153 call before invoking .release_service() has finished
156 self.queues: a dict[DerivedServer.service_id]=Queue(-1) containing the DerivedServers
157 self.servers: dict[DerivedServer.service_id]=[list of DerivedServer instances]
161 server.proxy.__dict__[
'_selfrelease'] =
False
163 service_id = server.service_id
165 if not service_id
in self.queues:
166 self.queues[service_id] = Queue(-1)
167 self.servers[service_id] = []
169 self.queues[service_id].put(server)
170 self.servers[service_id].append(server)
173 self.dp(
'AbstractGrid.add_server: %s' % server)
175 def find_server(self, service_id):
179 Selects a server that is capable of executing
180 the requested service. If they are all busy,
181 waits until there will be one available.
185 if not service_id
in self.queues:
186 raise StandardError,
'Service type "%s" not known.' % service_id
188 queue = self.queues[service_id]
195 def acquire_service(self, service_id):
197 For a given service, it finds available server and
198 returns corresponding proxy worker.
202 server = self.find_server(service_id)
204 server.acquired =
True
206 if self.debug
and self.verbose:
207 msg =
'Grid.acquire_service: service "%s" on host "%s" acquired [%s]'
208 self.dp( msg % (service_id, server.host, server.proxy) )
212 def release_service(self, proxy):
214 Used to inform server/proxy that we are done using the service,
215 and it should be released, when the last funciton call of the
218 However, it means that someone (the proxy or the grid)
219 will have to call _release_service() itself and also
220 restore ._selfrelease to the previos state (FALSE)
224 proxy._selfrelease =
True
226 def _release_service(self, proxy):
232 proxy._server.acquired =
False
234 if proxy._server
in self.servers[proxy._server.service_id]:
235 self.queues[proxy._server.service_id].put(proxy._server)
237 if self.debug
and self.verbose:
238 msg =
'Grid.release_service: service "%s" on host "%s" released [%s]'
239 self.dp( msg % (proxy._server.service_id, proxy._server.host, proxy) )
241 def add_time(self, proxy, time):
243 proxy._server.time.put(time)
245 def copy_files(self, rootdir, filelist, hosts = None,
246 exclude = [
'cns/',
'.svn/']):
250 Copies source files to a common folder so that
251 they were accessible to the grid on remote hosts
255 src_path = os.path.join(os.environ['ISD_ROOT'],'/src/py')
257 from compileall import compile_dir
258 compile_dir(src_path, maxlevels=0, force=True, quiet=True)
260 g.copy_files(src_path, ['*.pyc','*.so'])
262 The method will replicate the directory structure of
263 items in the filelist, e.g. will create data folder
264 in the destination for
266 g.copy_files(src_path, ['data/*.txt'])
268 also, it will copy all subfolders matching the required name, i.e. it
269 will copy all files called foo.txt from all existing subfolders it can
272 g.copy_files(src_path, ['Isd/foo.txt'])
273 will copy files like 'Isd/foo.txt', 'Isd/a/foo.txt', 'Isd/a/b/c/foo.txt' etc.
275 the exclude keyword excludes paths containing the given strings.
279 if hosts ==
None: hosts = self.hosts
280 elif type(hosts).__name__ ==
'Host': hosts = [hosts]
284 print 'AbstractGrid.copy_files: filelist = %s' % filelist
286 if self.shared_temp_path:
287 print 'AbstractGrid.copy_files: to %s' % self.hosts[0].temp_path
290 print 'AbstractGrid.copy_files: to %s on %s' % (host.temp_path, host.name)
302 folder, pattern = os.path.split(f)
303 if folder==
'': folder =
'./'
304 for root,useless,filematch
in os.walk(folder):
306 for file
in filematch:
307 fullfile = os.path.join(rootdir, root, file)
308 if fnmatch(file, pattern)
and (
309 not False in [fullfile.find(ex) < 0
for ex
in exclude] ):
310 if not root
in subroots:
311 self.create_subroot(root)
313 subroots[root].append(os.path.join(rootdir, root, file))
315 for subroot, subfilelist
in subroots.items():
317 _from = string.join(subfilelist)
319 if self.shared_temp_path:
321 _to = os.path.join(self.hosts[0].temp_path, subroot)
322 os.system(
'cp %s %s' % (_from, _to))
328 _to = os.path.join(host.temp_path, subroot)
330 os.system(
'scp -r %s %s:%s > /dev/null' % (_from, host.name, _to))
333 print 'Host %s: done. (%s)' % (host.name, _from)
334 self.copied_files = subroots
336 self.others = {
'rootdir':rootdir,
'root':root,
'files':files}
343 def create_subroot(self, subroot):
345 if self.shared_temp_path:
347 tempdir = os.path.join( self.hosts[0].temp_path, subroot)
349 if not os.path.exists(tempdir):
354 for host
in self.hosts:
356 tempdir = abspath(os.path.join(host.temp_path, subroot))
358 if not tempdir
in self.temp_paths[host.name]:
360 remote_mkdirs(host, tempdir,
False)
362 while not tempdir
in self.temp_paths[host.name]:
364 self.temp_paths[host.name].append(tempdir)
365 tempdir = os.path.split(tempdir)[0]
367 def initialise(self, src_path, src_files):
370 Create temp paths if needed, either shared or remotely on each host
371 Copy source files to this folder
376 if self.shared_temp_path:
377 if not os.path.exists(self.hosts[0].temp_path):
378 os.makedirs(self.hosts[0].temp_path)
381 if not hasattr(self,
'temp_paths'): self.temp_paths = {}
383 for host
in self.hosts:
384 if not host.name
in self.temp_paths:
386 remote_mkdirs(host, host.temp_path, self.debug)
387 self.temp_paths[host.name] = [host.temp_path,]
394 py_compile.compile(os.path.join(src_path,
'%s.py' % f))
396 if os.path.exists(os.path.join(src_path,
'%s.pyo' % f)):
397 compiled_files += [
'%s.pyo' % f]
399 compiled_files += [
'%s.pyc' % f]
401 self.copy_files(src_path, compiled_files)
404 raise NotImplementedError
406 def terminate(self, service_id = None):
411 if self.ishalted():
return
413 atexit_unregister( self.terminate )
415 if service_id ==
None: service_ids = self.servers.keys()
416 else: service_ids = [service_id]
418 for service_id
in service_ids:
420 servers = self.servers[service_id]
422 while len(servers) > 0:
425 print 'AbstractGrid.terminate: terminating %s' % servers[-1]
428 server = servers.pop()
434 print 'AbstractGrid: terminated.'
438 Contains all the information that is required
439 to use a remote object (alone and within a grid)
441 Also it does all the job to launch the instance on remote side
442 and wrap it within a Grid specific Proxy object
451 def __init__(self, proxy, service_id, host, debug):
454 self.debug_out = sys.stdout
457 self.service_id = service_id
462 self.proxy.__dict__[
'_server'] = self
469 self.acquired =
False
473 print >>self.debug_out, str
474 self.debug_out.flush()
478 It terminates self.proxy if needed
481 raise NotImplementedError
485 s =
'%s(busy=%s, <time>=%.2f, jobs=%d, host="%s", proxy="%s"' \
486 % (self.__class__.__name__, self.acquired, average(self.time), self.jobs, \
487 self.host.name, self.proxy)
493 class TimeoutError(Exception):
496 class Result(object):
498 A Result object is returned by a Proxy object, when some remote
501 When results of the remote function calculations are ready,
502 the .event is set and the result of calculations is stored
503 in the .value field. These has to be done externally,
504 for example in the Proxy object.
510 ## assigning result value
512 result.value = result of the function call
515 ## collecting the result
517 result.get(timeout = 60)
521 def __init__(self, proxy):
529 def get(self, timeout = None):
531 self.event.wait(timeout)
533 if timeout
is not None:
535 if not self.event.isSet():
544 if hasattr(self,
'proxy'): sx += [
'proxy="%s"' % self.proxy]
545 if hasattr(self,
'value'): sx += [
'value=%s' % self.value]
547 s =
'%s(%s)' % ( self.__class__.__name__, string.join(sx,
',') )
556 class AbstractService(object):
558 Wrapper around Grid services, facilitating non parameter-specific
559 usage of the remote objects.
561 AbstractService resides on the local side and is parameter-specific.
562 It provides the interface for remote services using grid.
564 self._set_methods - are used to set up parameters specific to the
565 local service to the remote objects
567 self._get_methods - are used to get updated parameters from remote side
569 self._parameters - parameters that have to be set remotely
570 1. kept on local side
571 2. sets to the remote object before each call
575 heatbath = GridService(grid, service_id)
576 heatbath.parameters = {'T': 30, etc}
577 heatbath.set_methods = {'T': 'set_temperature' }
578 heatbath.get_methods = {'T': 'get_temperature' }
579 heatbath.generate_sample(x,y,z)
583 def __init__(self, grid, service_id, debug = False,
584 return_remote_attributes =
False):
586 object.__init__(self)
588 if not service_id
in grid.queues:
589 raise StandardError,
'Service type "%s" not known in the grid' % service_id
591 self.service_id = service_id
594 self._set_methods =
None
595 self._get_methods =
None
596 self._parameters = {}
598 self.__proxy_lock = Lock()
600 self.__parameters_last = {}
601 self.__parameters_lock = Lock()
603 self.__return_remote_attributes = return_remote_attributes
609 Makes sure that the parameters requested (locally)
610 were updated before being returned
612 Example: (in derived class)
614 >> def get_temperature(self):
615 >> return self.get_parameter('T')
618 if self._get_methods
is not None:
619 self.__get_parameters()
621 return self._parameters[attr_name]
623 def __get_parameters(self):
625 self.__parameters_lock.acquire()
627 for attr_name
in self.__parameters_last:
628 self._parameters[attr_name] = self.__parameters_last[attr_name].get()
630 self.__parameters_last = {}
632 def __proxy_acquire(self):
635 print 'AbstractService.__proxy_acquire(): %s' % (self.service_id)
637 self.__proxy_lock.acquire()
639 self.proxy = self.grid.acquire_service(self.service_id)
641 def __proxy_release(self):
644 print 'AbstractService.__proxy_release(): ' % (self.proxy)
646 self.grid.release_service(self.proxy)
649 self.__proxy_lock.release()
651 def __proxy_release_now(self):
653 self.grid._release_service(self.proxy)
656 self.__proxy_lock.release()
658 def __getattr__(self, name):
664 if not 'grid' in self.__dict__:
665 return getattr(object, name)
667 self.__proxy_acquire()
670 print 'AbstractService.__getattr__(%s): acquired %s [%s]' % (name, self.proxy, id(self))
673 attr = getattr(self.proxy, name)
676 self.__proxy_release_now()
682 print 'AbstractService.__getattr__(%s): is callable' % name
684 return lambda *args: self.__call_method(name, *args)
688 print 'AbstractService.__getattr__(%s): is not callable' % name
690 if self.__return_remote_attributes:
692 attr = getattr(self.proxy, name)
694 self.__proxy_release_now()
699 raise 'AbstractService must not return remote attributes ' +\
700 '(because remote workers are parameter not specific)'
702 def __call_method(self, name, *args):
705 print 'AbstractService.__call_method: ', self.proxy
709 if self._get_methods
is not None:
712 print 'AbstractService.__call_method(%s): renew local parameters [%s]' % (name, id(self))
714 self.__get_parameters()
716 self.__parameters_lock.acquire()
720 for attr_name, set_method
in self._set_methods.items():
721 getattr(self.proxy, set_method)( self._parameters[attr_name] )
726 print 'AbstractService.__call_method(%s): actual call %s [%s]' % (name, self.proxy, id(self))
729 result = getattr(self.proxy, name)(*args)
730 result.proxy = self.proxy
733 self.__proxy_release()
739 if self._get_methods
is not None:
742 print 'AbstractService.__call_method(%s): parameter call %s [%s]' % (name, self.proxy, id(self))
744 t = Thread(target = self.__call_parameters, args = ())
748 self.__proxy_release()
751 print 'AbstractService.__call_method(%s): returning result %s [%s]' % (name, self.proxy, id(self))
755 def __call_parameters(self):
761 for attr_name, get_method
in self._get_methods.items():
762 self.__parameters_last[attr_name] = getattr(self.proxy, get_method)()
764 self.__parameters_lock.release()
767 print 'AbstractService.__call_parameters: releasing %s [%s]' % (self.proxy, id(self))
771 self.__proxy_release()