"""
open/dulcinea/lib/ui/debug.qpy
"""
import os, os.path, sys
from datetime import timedelta, datetime
from dulcinea.ui.table import Table
from dulcinea.ui.user.util import ensure_admin_access
from dulcinea.ui.util import item_list, dl, boxtitle
from durus.persistent import PersistentObject
from pprint import pformat
from qp.fill.directory import Directory
from qp.fill.form import Form
from qp.fill.html import href, nl2br
from qp.lib.util import get_memory_usage, get_output
from qp.pub.common import get_publisher, get_request, get_session, get_response
from qp.pub.common import redirect, get_connection, header, footer, get_fields
from qpy import xml, xml_quote
class DurusDirectory(Directory):
def __init__(self, obj=None):
self.obj = obj
def get_exports(self):
ensure_admin_access()
yield ('', '_q_index', self.get_object().__class__.__name__, None)
def get_object(self):
if self.obj is None:
return get_publisher().get_connection().get_root()
else:
return self.obj
def _q_index:xml(self):
self.display(self.get_object())
def _q_lookup(self, component):
try:
durus_id = int(component)
except ValueError:
return None
object = get_publisher().get_connection().get(durus_id)
if object is not None:
return DurusDirectory(object)
else:
return None
def display:xml(self, obj):
def ref_repr(obj):
return str(href(obj._p_format_durus_id() + '/', obj._repr()))
str_obj = str(obj)
def format(value):
pformatted = pformat(value)
if isinstance(value, datetime):
return xml_quote(pformatted)
elif " at " in pformatted:
return xml('
%s
') % pformatted
else:
return nl2br(xml(pformatted))
PersistentObject._repr = PersistentObject.__repr__
PersistentObject.__repr__ = ref_repr
try:
title = "Object: %s" % obj._p_format_durus_id()
header(title)
''
footer(title)
finally:
PersistentObject.__repr__ = PersistentObject._repr
del PersistentObject._repr
class DebugDirectory(Directory):
def get_exports(self):
ensure_admin_access()
yield ('', '_q_index', 'Tools', 'Debugging Tools')
yield ('dumpenv', 'dumpenv', 'Environment',
'Dump request environment variables')
yield ('exception', 'exception', 'Exception', 'Raise an exception')
yield ('browse_sessions', 'browse_sessions', 'Sessions',
'Browse open sessions')
yield ('browse', 'browse', 'Exports',
'Browse exported names')
if os.name == 'posix':
yield ('client', 'client', 'Client', 'Client process info')
yield ('durusfile', 'durusfile', '.durus',
'Stat/Pack the data file.')
yield ('view_publisher_config', 'view_publisher_configuration',
'Config', 'View publisher configuration')
yield ('log', 'log', 'Log', 'The tail of the log.')
yield ('durus', 'durus', 'Browse', 'Browse Durus Database')
def _q_index:xml(self):
title = "Debugging Tools"
header(title)
''
footer(title)
def _q_lookup(self, component):
session = get_publisher().get_sessions().get(component)
if session is None:
return None
get_response().set_header('Content-type', 'text/plain')
return "%s:\n%s\n\n" % (pformat(session), pformat(vars(session)))
def client:xml(self):
title = "Client Information"
header(title)
''
'pid = %s
' % os.getpid()
real, virtual = get_memory_usage()
'real = %s
' % real
'virtual = %s
' % virtual
'Cache target size = %s
' % get_connection().get_cache_size()
'
'
by_class = {}
for x in get_connection().get_cache():
name = x.__class__.__name__
if name not in by_class:
by_class[name] = [0, 0, 0]
by_class[name][x._p_status + 1] += 1
table = Table()
table.column(class_name='Class')
table.column(ghost='Ghost')
table.column(saved='Saved')
table.column(unsaved='Unsaved')
table.column(total="Total")
ghost_total = 0
saved_total = 0
unsaved_total = 0
total_total = 0
for name, items in sorted(by_class.items()):
ghost_total += items[0]
saved_total += items[1]
unsaved_total += items[2]
total_total += sum(items)
table.row(class_name=name, ghost=items[0], saved=items[1],
unsaved=items[2], total=sum(items))
table.set_title(ghost="Ghost
Σ=%s" % ghost_total)
table.set_title(saved="Saved
Σ=%s" % saved_total)
table.set_title(unsaved="Unsaved
Σ=%s" % unsaved_total)
table.set_title(total="Total
Σ=%s" % total_total)
def tfooter:xml():
''
''
'Totals | '
'%s | ' % ghost_total
'%s | ' % saved_total
'%s | ' % unsaved_total
'%s | ' % total_total
'
'
''
table.set_footer(tfooter())
''
''
footer(title)
def durusfile:xml(self):
title = 'Durus File'
header(title)
durus_file_name = get_publisher().get_site().get_durus_file()
'' # popup
footer(title)
def dumpenv:xml(self):
title = 'Enviroment Dump'
header(title)
lines = []
''
footer(title=title)
def exception(self):
assert 0, 'relax, just testing'
def browse_sessions:xml(self):
header('Open Sessions')
table = Table()
table.column(id='Id')
table.column(user='User')
table.column(last_user='Authenticated Owner')
table.column(address='Address')
table.column(authentication="Authentication")
table.column(access="Access")
table.column(action="Action")
sessions = get_publisher().get_sessions()
to_remove = []
form = Form(use_tokens=False)
form.add_submit('delete_old',
'Delete all sessions not accessed in 24 hours.')
form.render()
'
'
if form.get('delete_old'):
cutoff = get_session().get_access_time() - timedelta(hours=24)
to_remove = [key for key, s in sessions.items()
if s.get_access_time() < cutoff]
for key in to_remove:
del sessions[key]
redirect('')
for id, session in sessions.iteritems():
access = session.get_access_time()
form = Form(use_tokens=False, css_class="nopopup")
form.add_submit(id, value="Delete")
if form.get(id):
to_remove.append(id)
continue
table.row(
id=id,
last_user=session.get_owner().get_id(),
address=session.get_remote_address(),
authentication=session.get_authentication_time(),
access=session.get_access_time(),
action=form.render(),
user=session.get_effective_user().get_id())
for id in to_remove:
if id in sessions:
del sessions[id]
''
''
footer(title='Open Sessions')
def view_publisher_configuration:xml(self):
title = 'Publisher Configuration'
header(title)
''
footer(title)
def log(self):
filename = get_publisher().get_site().get_logfile()
get_response().set_content_type('text/plain', 'utf-8')
return get_output("tail -1000 %s" % filename)
def browse:xml(self):
title = 'Browse'
header(title)
'' # popup
footer(title)
durus = DurusDirectory()