""" open/dulcinea/lib/ui/debug.qpy """ import os, os.path, sys from datetime import timedelta, datetime from dulcinea.ui.table import Table from dulcinea.ui.user.util import ensure_admin_access from dulcinea.ui.util import item_list, dl, boxtitle from durus.persistent import PersistentObject from pprint import pformat from qp.fill.directory import Directory from qp.fill.form import Form from qp.fill.html import href, nl2br from qp.lib.util import get_memory_usage, get_output from qp.pub.common import get_publisher, get_request, get_session, get_response from qp.pub.common import redirect, get_connection, header, footer, get_fields from qpy import xml, xml_quote class DurusDirectory(Directory): def __init__(self, obj=None): self.obj = obj def get_exports(self): ensure_admin_access() yield ('', '_q_index', self.get_object().__class__.__name__, None) def get_object(self): if self.obj is None: return get_publisher().get_connection().get_root() else: return self.obj def _q_index:xml(self): self.display(self.get_object()) def _q_lookup(self, component): try: durus_id = int(component) except ValueError: return None object = get_publisher().get_connection().get(durus_id) if object is not None: return DurusDirectory(object) else: return None def display:xml(self, obj): def ref_repr(obj): return str(href(obj._p_format_durus_id() + '/', obj._repr())) str_obj = str(obj) def format(value): pformatted = pformat(value) if isinstance(value, datetime): return xml_quote(pformatted) elif " at " in pformatted: return xml('
%s
') % pformatted else: return nl2br(xml(pformatted)) PersistentObject._repr = PersistentObject.__repr__ PersistentObject.__repr__ = ref_repr try: title = "Object: %s" % obj._p_format_durus_id() header(title) '' footer(title) finally: PersistentObject.__repr__ = PersistentObject._repr del PersistentObject._repr class DebugDirectory(Directory): def get_exports(self): ensure_admin_access() yield ('', '_q_index', 'Tools', 'Debugging Tools') yield ('dumpenv', 'dumpenv', 'Environment', 'Dump request environment variables') yield ('exception', 'exception', 'Exception', 'Raise an exception') yield ('browse_sessions', 'browse_sessions', 'Sessions', 'Browse open sessions') yield ('browse', 'browse', 'Exports', 'Browse exported names') if os.name == 'posix': yield ('client', 'client', 'Client', 'Client process info') yield ('durusfile', 'durusfile', '.durus', 'Stat/Pack the data file.') yield ('view_publisher_config', 'view_publisher_configuration', 'Config', 'View publisher configuration') yield ('log', 'log', 'Log', 'The tail of the log.') yield ('durus', 'durus', 'Browse', 'Browse Durus Database') def _q_index:xml(self): title = "Debugging Tools" header(title) '' footer(title) def _q_lookup(self, component): session = get_publisher().get_sessions().get(component) if session is None: return None get_response().set_header('Content-type', 'text/plain') return "%s:\n%s\n\n" % (pformat(session), pformat(vars(session))) def client:xml(self): title = "Client Information" header(title) '
' 'pid = %s
' % os.getpid() real, virtual = get_memory_usage() 'real = %s
' % real 'virtual = %s
' % virtual 'Cache target size = %s
' % get_connection().get_cache_size() '
' by_class = {} for x in get_connection().get_cache(): name = x.__class__.__name__ if name not in by_class: by_class[name] = [0, 0, 0] by_class[name][x._p_status + 1] += 1 table = Table() table.column(class_name='Class') table.column(ghost='Ghost') table.column(saved='Saved') table.column(unsaved='Unsaved') table.column(total="Total") ghost_total = 0 saved_total = 0 unsaved_total = 0 total_total = 0 for name, items in sorted(by_class.items()): ghost_total += items[0] saved_total += items[1] unsaved_total += items[2] total_total += sum(items) table.row(class_name=name, ghost=items[0], saved=items[1], unsaved=items[2], total=sum(items)) table.set_title(ghost="Ghost
Σ=%s" % ghost_total) table.set_title(saved="Saved
Σ=%s" % saved_total) table.set_title(unsaved="Unsaved
Σ=%s" % unsaved_total) table.set_title(total="Total
Σ=%s" % total_total) def tfooter:xml(): '' '' 'Totals' '%s' % ghost_total '%s' % saved_total '%s' % unsaved_total '%s' % total_total '' '' table.set_footer(tfooter()) '
' '' footer(title) def durusfile:xml(self): title = 'Durus File' header(title) durus_file_name = get_publisher().get_site().get_durus_file() '' # popup footer(title) def dumpenv:xml(self): title = 'Enviroment Dump' header(title) lines = [] '' footer(title=title) def exception(self): assert 0, 'relax, just testing' def browse_sessions:xml(self): header('Open Sessions') table = Table() table.column(id='Id') table.column(user='User') table.column(last_user='Authenticated Owner') table.column(address='Address') table.column(authentication="Authentication") table.column(access="Access") table.column(action="Action") sessions = get_publisher().get_sessions() to_remove = [] form = Form(use_tokens=False) form.add_submit('delete_old', 'Delete all sessions not accessed in 24 hours.') form.render() '
' if form.get('delete_old'): cutoff = get_session().get_access_time() - timedelta(hours=24) to_remove = [key for key, s in sessions.items() if s.get_access_time() < cutoff] for key in to_remove: del sessions[key] redirect('') for id, session in sessions.iteritems(): access = session.get_access_time() form = Form(use_tokens=False, css_class="nopopup") form.add_submit(id, value="Delete") if form.get(id): to_remove.append(id) continue table.row( id=id, last_user=session.get_owner().get_id(), address=session.get_remote_address(), authentication=session.get_authentication_time(), access=session.get_access_time(), action=form.render(), user=session.get_effective_user().get_id()) for id in to_remove: if id in sessions: del sessions[id] '
' '' footer(title='Open Sessions') def view_publisher_configuration:xml(self): title = 'Publisher Configuration' header(title) '' footer(title) def log(self): filename = get_publisher().get_site().get_logfile() get_response().set_content_type('text/plain', 'utf-8') return get_output("tail -1000 %s" % filename) def browse:xml(self): title = 'Browse' header(title) '' # popup footer(title) durus = DurusDirectory()