""" open/dulcinea/lib/ui/page_loader.py """ from dulcinea.common import set_test_environment from durus.utils import BytesIO, as_bytes from pprint import pformat from qp.lib.util import get_output from qp.pub.common import get_request, get_session, get_response, get_publisher, get_hit from qp.pub.publish import RespondNow from sancho.utest import UTest class PageLoader (object): def set_test_environment(self, **kwargs): return set_test_environment(**kwargs) def __init__(self, user=None, site_name=None): self.set_test_environment(site_name=site_name) self.site_name = site_name session = get_session() for key, value in get_publisher().get_sessions().items(): if value is session: self.session_key = key break else: assert 0, "no session!" if user is None: self.user = self.get_admin_user() else: self.user = user self.cookies = get_request().get_environ('HTTP_COOKIE') session.set_authenticated(self.user) self.session = session def process(self, PATH_INFO='/', input_string = '', SCRIPT_NAME='', SERVER_NAME='test@example.org', HTTPS='on', **kwargs): env = dict(PATH_INFO=PATH_INFO, SCRIPT_NAME=SCRIPT_NAME, SERVER_NAME=SERVER_NAME, HTTPS=HTTPS, **kwargs) get_publisher().process(BytesIO(as_bytes(input_string)), env) request = get_request() return request def get_admin_user(self): for user in get_publisher().get_users().itervalues(): if user.is_admin(): return user return None def get_paths(self, user): def get_paths_inner(directory, base_path): get_request().environ['PATH_INFO'] = base_path + '/' get_request().environ['SERVER_NAME'] = "example.org" get_request().scheme = 'https' self.session.set_authenticated(user) get_hit().set_session(self.session) exports = [] try: for export in directory.get_exports(): exports.append(export[0]) except RespondNow: pass exports = sorted(exports) for name in exports: objname = directory._q_translate(name) if objname is None: obj = directory._q_lookup(name) else: obj = getattr(directory, objname) if hasattr(obj, '_q_traverse'): for path in get_paths_inner(obj, base_path + '/' + name): yield path else: yield base_path +'/' + name set_test_environment(site_name=self.site_name) self.session.set_authenticated(user) get_hit().set_session(self.session) return [path for path in get_paths_inner( get_publisher().get_root_directory(), '')] def get_processed_request(self, path, follow_redirects=(301,302), **kwargs): """ Returns a request after processing, after one redirect if necessary. """ if 'HTTP_COOKIE' not in kwargs: kwargs['HTTP_COOKIE'] = self.cookies self.session.set_authenticated(self.user) get_publisher().get_sessions()[self.session_key] = self.session request = self.process(path, **kwargs) n = 4 while n > 0 and get_response().get_status_code() in follow_redirects: n -= 1 location = get_response().get_header('location') path = '/' + location.split('/', 3)[-1] # strip http://server if '?' in path: path, query = path.split('?') kwargs['QUERY_STRING'] = query self.session.set_authenticated(self.user) get_hit().set_session(self.session) get_publisher().get_sessions()[self.session_key] = self.session request = self.process(path, **kwargs) return request def is_reportable(self, line): if not line: return False if 'Warning: ' in line: warning = line.split('Warning: ', 1)[1].strip() if warning in ( ' lacks "summary" attribute', ' attribute "href" lacks value', '
attribute "action" lacks value', ' attribute "name" lacks value', 'trimming empty