""" open/DurusWorks/qp/lib/site.py """ from code import interact from datetime import datetime from durus.client_storage import ClientStorage from durus.file_storage import FileStorage from durus.connection import Connection from durus.__main__ import stop_durus, configure_readline from durus.storage_server import wait_for_server from logging import StreamHandler, Formatter, getLogger from os import listdir, fork, kill, getpid, unlink, system from os import setuid, setgid, getuid, execve, setpgid from os.path import join, isdir, exists, basename, expanduser from qp.lib.util import import_object, trace from qp.lib.profiler import Profile from qp.pub.common import get_publisher from time import sleep import errno, signal import grp import pwd import qp.hub import sys import os class Site(object): sites_package_name = 'qp.sites' def __new__(klass, name): """ We don't recommend subclassing Site, but if you must, this method provides a way to force qp to use your subclass. """ if os.environ.get('QP_SITE_CLASS'): klass = import_object(os.environ.get('QP_SITE_CLASS')) return object.__new__(klass) def __init__(self, name): """(name:str) Name is the name of the site, expected to be the name of the corresponding package in the sites module directory. """ self.name = name def get_name(self): return self.name @classmethod def get_sites(klass): """() -> {name:str : site:Site} Note that this is a method of the class. It returns an index of all of the installed sites. """ sites_package_name = klass.sites_package_name sites_package = import_object(sites_package_name) sites = dict() for sites_directory in sites_package.__path__: for name in listdir(sites_directory): if '.' in name: continue if name == '__pycache__': continue if name in sites: continue site_dir_name = join(sites_directory, name) if not isdir(site_dir_name): continue try: site = Site(name) site.get_configuration() sites[name] = site except ImportError: e = sys.exc_info()[1] print("\nImport failed for site in %s\n %s\n" % ( site_dir_name, e)) return sites def get_package_name(self): """() -> str The name of the package that defines this site. """ return "%s.%s" % (self.sites_package_name, self.get_name()) def get_package(self): """() -> module The package that defines this site. """ return import_object(self.get_package_name()) def get_package_directory(self): """() -> str The full path to the directory of this site package. """ return self.get_package().__path__[0] def get_slash_module(self): """() -> module Return the module that defines the SitePublisher and (if required) the SiteDirectory class. """ return import_object(self.get_package_name() + ".slash") def get_publisher_class(self): """() -> class Returns the publisher subclass defined for this site. """ return self.get_slash_module().SitePublisher def get_root_directory_class(self): """() -> class Returns the root directory class defined for this site. """ return getattr(self.get_slash_module(), 'SiteDirectory', None) def get_configuration(self): """() -> dict Returns the 'configuration' attribute of this site's publisher class. """ return self.get_publisher_class().configuration def get(self, key, default=None): """ Gets the value of a site configuration variable. """ return self.get_configuration().get(key, default) def site_path(self, *rest): """ Builds a path to something in the site module directory, by joining the arguments. """ return join(self.get_package_directory(), *rest) def get_var_directory(self): """() -> str Get the directory where durus and pid files usually reside. """ name = self.get('var_directory') or self.site_path('var') if not os.path.exists(name): print("\n\nFor site %s var_directory does not exist." % self.name) print("Creating %s\n" % name) os.makedirs(name) return name def get_directory_for_temporary_files(self): """() -> str Get the directory where durus and pid files usually reside. """ name = self.get('directory_for_temporary_files') or ( self.site_path('var') + '/tmp') if not os.path.exists(name): print("\n\nFor site %s var_directory does not exist." % self.name) print("Creating %s\n" % name) os.makedirs(name) return name def get_static_directory(self): """() -> str Get the directory where static content is expected. """ return self.get('static_directory') or self.site_path('static') def get_durus_logginglevel(self): """() -> int Lower levels log more. """ return self.get('durus_logginglevel', 20) def get_logfile(self): """() -> str The name of the file where the site logs events. """ return (self.get('logfile') or join(self.get_var_directory(), '%s.log' % self.get_name())) def rotate_log(self): """ Rotates the log file if the configuration calls for it. """ log_rotation_size = self.get('log_rotation_size') if log_rotation_size is not None: logfile = self.get_logfile() if exists(logfile): size = os.stat(logfile).st_size if size > log_rotation_size: new_name = "%s.%s" % (logfile, datetime.now().isoformat()) os.rename(logfile, new_name) def get_durus_file(self): """() -> str The name of the file containing the Durus database. """ return (self.get('durus_file') or join(self.get_var_directory(), '%s.durus' % self.get_name())) def get_gcbytes(self): """() -> int If the number is positive, it is the number of commits between automatic garbage collections. The default value is 0, meaning there is no automatic garbage collection. """ return self.get('gcbytes', 0) def get_banned(self): """() -> str The name of the file containing addresses for which access to the site should be denied. """ return (self.get('banned') or join(self.get_var_directory(), '%s.banned' % self.get_name())) def get_dispatcher_command_prefix(self): """() -> str | None The uri prefix for dispatcher commands, if dispatcher commands are enabled. """ return self.get('dispatcher_command_prefix') def get_durus_address(self): """() -> (host:str, port:int)|str|None The address at which the Durus StorageServer listens. """ address = self.get('durus_address') if type(address) is tuple: return address if address: return join(self.get_var_directory(), address) else: return None def get_scgi_address(self): """() -> (host:str, port:int) | None The address at which the SCGI service listens. """ return self.get('scgi_address') def get_http_address(self): """() -> (host:str, port:int) | None The address at which the HTTP service listens. """ return self.get('http_address') def get_other_addresses(self): """() -> [(host:str, port:int)] | None The addresses at which other services listen. """ return self.get('other_addresses') def get_as_https_address(self): """() -> (host:str, port:int) | None The address at which the HTTP service listens and treats all connections as if they have been forwarded through an SSL tunnel. The cgi environment for these connections has HTTP set to 'on', so the url scheme is treated as 'https'. """ return self.get('as_https_address') def get_https_address(self): """() -> (host:str, port:int) | None The address at which the SSL tunnel or proxy accepts HTTPS requests. QP itself does not listen on this address. """ return self.get('https_address') def get_web_addresses(self): """() -> [(host:str, port:int)*] Returns the addresses on on which the QP web server listens. """ return [a for a in (self.get_scgi_address(), self.get_http_address(), self.get_as_https_address()) if a] def get_durus_pidfile(self): """() -> str The name of the file used to hold the process id of the running Durus StorageServer process. """ return (self.get('durus_pidfile') or join(self.get_var_directory(), '%s.durus.pid' % self.get_name())) def get_web_pidfile(self): """() -> str The name of the file used to hold the process id of the running QP web server process. """ return (self.get('web_pidfile') or join(self.get_var_directory(), '%s.web.pid' % self.get_name())) def is_live_pidfile(self, pidfile): """(pidfile:str) -> int | None Returns an int found in the named file, if there is one, and if there is a running process with that process id. Return None if no such process exists. """ def read_pidfile(filename): if exists(filename): try: return int(open(filename).read().strip()) except (ValueError, IOError): return None else: return None pid = read_pidfile(pidfile) if pid: try: kill(int(pid), 0) return pid except OSError: e = sys.exc_info()[1] if e.errno == errno.EPERM: return pid return None def is_durus_running(self): """() -> int | None Returns the pid of the Durus StorageServer, if it is running. """ return self.is_live_pidfile(self.get_durus_pidfile()) def is_web_running(self): """() -> int | None Returns the pid of the QP web server, if it is running. """ return self.is_live_pidfile(self.get_web_pidfile()) def get_max_children(self): """() -> int Returns the maximum number of child processes to be allowed for the QP web server. """ return self.get('max_children', 5) def get_busy_limit(self): """() -> int Returns the maximum number of child processes allowed to be busy at one time working on requests from the same IP address. """ return self.get('busy_limit', 1) def get_durus_cache_size(self): """() -> int The target number of loaded instances to keep in the Durus ClientStorage cache. """ return self.get('durus_cache_size', 500000) def get_live_host_name(self): return self.get('live_host') def get_staging_host_name(self): return self.get('staging_host') def get_create_publisher(self): """() -> callable Return a function that can be called in a child process to create and initialize a publisher instance. """ def create_publisher(**kwargs): assert get_publisher() is None log = open(self.get_logfile(), 'a', 1) sys.stdout = log sys.stderr = log return self.get_publisher(**kwargs) return create_publisher def get_publisher(self, **kwargs): """() -> Publisher Return the Publisher instance for this site. Make one if it does not already exist. """ if get_publisher() is None: publisher_class = self.get_publisher_class() durus_address = self.get_durus_address() if durus_address: self.start_durus() cache_size = self.get_durus_cache_size() connection = Connection(ClientStorage(address=durus_address), cache_size=cache_size) return publisher_class(connection=connection, site=self, **kwargs) else: return publisher_class(site=self, **kwargs) else: assert get_publisher().get_site().get_name() == self.get_name() return get_publisher() def make_file_connection(self, durus_file=None, **args): """() -> Connection This is intended for use by scripts that update the durus file directly. """ assert get_publisher() is None if durus_file is None: durus_file = self.get_durus_file() storage = FileStorage(durus_file, **args) connection = Connection(storage) self.get_publisher_class()(site=self, connection=connection) return connection def update_db_environment(self, namespace, durus_file=None, check_version=True, **args): """(dict, durus_file:str=None, **args) This is intended for use by scripts that update the durus file directly. It inserts values for 'connection', 'root', 'commit', 'trace', and 'pack' into the namespace. It also asserts that root['version'] != namespace['__doc__'], and then sets root['version'] = namespace['__doc__']. This helps ensure that scripts using this pattern are not executed twice on the same database. """ self.make_file_connection(durus_file=durus_file, **args) connection = self.get_connection() root = connection.get_root() namespace.update( connection=connection, root=root, commit = trace(connection.commit), trace=trace, pack=trace(connection.pack)) if check_version: assert root.get('version') != namespace['__doc__'], "already updated" root['version'] = namespace['__doc__'] def get_connection(self): """() -> Connection Return the Connection instance for this site. Make one if it does not already exist. """ return self.get_publisher().get_connection() def get_script_name(self): """() -> str Return value to use for CGI environment's SCRIPT_NAME variable. This does not normally end with a '/'. This is the part of the path that is not involved in the Publisher's traversal. """ return self.get('script_name', '') def start_web(self): """ Start the QP web server. """ if self.is_web_running(): return if fork() == 0: cmd = [sys.executable, __file__, self.get_name(), 'web_server'] execve(cmd[0], cmd, os.environ) for address in self.get_web_addresses(): wait_for_server(*address) def start_durus_logging(self): logger = getLogger('durus') file = open(self.get_logfile(), 'a+') sys.stdout = file handler = StreamHandler(file) handler.setFormatter(Formatter("durus[%(message)s]")) logger.handlers[:] = [] logger.addHandler(handler) logger.setLevel(int(self.get_durus_logginglevel())) logger.propagate = False def start_durus(self): """ Start the Durus StorageServer. """ if self.is_durus_running(): return if not self.get_durus_address(): return if fork() == 0: cmd = [sys.executable, __file__, self.get_name(), 'durus_server'] execve(cmd[0], cmd, os.environ) wait_for_server(address=self.get_durus_address()) def stop_durus(self): """ Stop the Durus StorageServer. """ if self.is_durus_running() and self.get_durus_address(): stop_durus(address=self.get_durus_address()) try: unlink(self.get_durus_pidfile()) except OSError: pass def stop_web(self): """ Stop the QP web server. """ pid = self.is_web_running() if pid: for j in range(10): if not self.is_web_running(): break kill(pid, signal.SIGTERM) if self.is_web_running(): sleep(1) else: print("\failed to kill web process %s" % pid) try: unlink(self.get_web_pidfile()) except OSError: pass def status(self): """ Show a summary of the current status of all servers. """ write = sys.stdout.write write("%-6s" % self.get_name()) if self.get_durus_address(): address = self.get_durus_address() if type(address) is tuple: short_address = "%s:%s" % address else: short_address = basename(address) pid = self.is_durus_running() if pid: write(" durus[%s]:%s" % (pid, short_address)) else: write(" durus:down") if self.is_web_running(): write(" web[%s]" % self.is_web_running()) if self.get_scgi_address(): write(" scgi:%s:%s" % self.get_scgi_address()) if self.get_http_address(): write(" http:%s:%s" % self.get_http_address()) if self.get_as_https_address(): write(" as_https:%s:%s" % self.get_as_https_address()) if self.get_https_address(): write(" https:%s:%s" % self.get_https_address()[:2]) else: write(" web:down") write("\n") def log_tail(self): """ Show the last 100 lines, and continue to show any new output in the logfile. This doesn't stop until the user types Control-C or takes some other action to stop it. """ system("tail -n 100 -f %s" % self.get_logfile()) def show(self): """ Display the configuration information for this site. """ print('\nSITE: %s' % self.get_name()) doc = self.get_root_directory_class().__doc__ if doc: print(doc) print('publisher %s' % self.get_publisher_class()) print('root_directory %s' % self.get_root_directory_class()) print('banned %s' % self.get_banned()) print('logfile : %r' % self.get_logfile()) print('durus_pidfile : %r' % self.get_durus_pidfile()) print('web_pidfile : %r' % self.get_web_pidfile()) print('durus_address : %r' % (self.get_durus_address(),)) print('http_address : %r' % (self.get_http_address(),)) print('as_https_address : %r' % (self.get_as_https_address(),)) print('https_address : %r' % (self.get_https_address(),)) print('scgi_address : %r' % (self.get_scgi_address(),)) print('other_addresses : %r' % (self.get_other_addresses(),)) def interaction(self): env = dict(site=self, Profile=Profile) if self.get_durus_address(): self.start_durus() connection=self.get_connection() root=connection.get_root() env.update(connection=connection, root=root) for key, value in root.items(): if isinstance(key, str): env[key] = value env['publisher'] = self.get_publisher() banner = ', '.join(sorted(env.keys())) configure_readline(env, expanduser("~/.qphistory")) interact(banner, None, env) def change_uid_gid(self, uid, gid=None): """(uid:str, gid:str=None) Try to change UID and GID to the provided values This will only work if this script is run by root. Try to convert uid and gid to integers, in case they're numeric. """ try: uid = int(uid) default_grp = pwd.getpwuid(uid)[3] except ValueError: uid, default_grp = pwd.getpwnam(uid)[2:4] if gid is None: gid = default_grp else: try: gid = int(gid) except ValueError: gid = grp.getgrnam(gid)[2] setgid(gid) setuid(uid) def ensure_uid_gid_not_root(self): """ Make sure that the current user/group is not root, changing if the current user *is* root, to the uid identified by the 'daemon_uid' configuration value. """ if getuid() == 0: uid = self.get('daemon_uid', 'nobody') gid = self.get('daemon_gid', None) self.change_uid_gid(uid, gid=gid) assert getuid() != 0 def main(site_name, server_type): from qp.lib.site import Site site = Site(site_name) # The server_type indicates a web or durus server. if server_type == 'web_server': server_pidfile = site.get_web_pidfile() elif server_type == 'durus_server': server_pidfile = site.get_durus_pidfile() else: raise RuntimeError( "Expected 'web_server' or 'durus_server': got %r" % server_type) pidfile = open(server_pidfile, 'w') pid = getpid() setpgid(pid, pid) pidfile.write(str(pid)) pidfile.close() log = open(site.get_logfile(), 'a') log.write("%s[%s] started at %s UTC\n" % (server_type, pid, datetime.utcnow())) log.close() try: if server_type == 'web_server': site.get_publisher_class().run_web(site) elif server_type == 'durus_server': site.start_durus_logging() site.get_publisher_class().run_durus(site) finally: try: unlink(server_pidfile) except OSError: pass if __name__ == '__main__': main(*sys.argv[-2:])