#!/usr/bin/env python """ open/DurusWorks/qp/bin/qpcensus.py """ from datetime import datetime from durus.connection import Connection, ROOT_DURUS_ID from durus.file_storage import FileStorage from durus.serialize import unpack_record, split_durus_ids from durus.utils import loads from qp.lib.site import Site from qp.lib.spec import get_spec_problems, get_specs, get_spec_doc from qp.lib.spec import persistent_vars import sys def list_validation_errors(obj): """() -> [ string ] Return instance validation errors reported by the instance method named 'list_validation_errors'. If present, this method should check higher level data constraints, not ordinary attribute problems that are already covered by get_spec_problems(). Note that the qpcensus.py environment that runs this method may have multiple instances of the same PersistentObject, so "is" tests involving such instances should be avoided here. """ checker = getattr(obj, 'list_validation_errors', None) if checker is not None: result = checker() if type(result) == list: return result else: return list(repr(result)) return [] class ObjectLocator: def __init__(self, connection, reference_index): self.connection = connection self.reference_index = reference_index def where_is(self, obj): root = self.connection.get_root() def locate(obj, seen): if len(seen) > 50: return '...' if obj is root: return 'root' durus_id = obj._p_durus_id for referring_durus_id in self.reference_index.get(durus_id, []): if referring_durus_id not in seen: seen.add(referring_durus_id) break else: return repr(obj) # give up referring = self.connection.get(referring_durus_id) ref = locate(referring, seen) for key, value in persistent_vars(referring).items(): if obj is value: return "%s.%s" % (ref, key) try: for k, v in value.items(): if obj is v: return "%s[%r]" % (ref, k) except AttributeError: pass try: for k, v in enumerate(value): if obj is v: return "%s[%r]" % (ref, k) except TypeError: pass return '%s ... ' % ref return locate(obj, set()) def format_specs(klass): if klass.__bases__: bases = '(%s):' % ', '.join( [b.__name__ for b in klass.__bases__]) else: bases = ":" s = "class %s%s\n " % (klass.__name__, bases) s += get_spec_doc(klass).replace('\n', '\n ') or 'pass' s += "\n\n" return s def check_durus(site, schema, klasses, show_all, show_max, show_path, show_shadows): file = site.get_durus_file() storage = FileStorage(file, readonly=True) publisher = site.get_publisher() connection = Connection(storage, cache_size=100) publisher.connection = connection if schema: schema_output = open(schema, 'w') problem_record = {} specs = {} counts = {} size_min = {} size_max = {} maximal = {} size_sum = {} refs = {} instance_count = 0 start = datetime.now() for durus_id, record in storage.gen_durus_id_record(start_durus_id=ROOT_DURUS_ID): obj = connection.cache.get(durus_id) record_durus_id, data, refdata = unpack_record(record) if show_path: for ref_durus_id in split_durus_ids(refdata): refs.setdefault(ref_durus_id, []).append(durus_id) if obj is None: klass = loads(data) obj = connection.cache.get_instance(durus_id, klass, connection) if obj._p_is_ghost(): state = connection.reader.get_state(data, load=True) obj.__setstate__(state) obj._p_set_status_saved() size = len(record) klass = obj.__class__ if klass not in counts: counts[klass] = 1 size_min[klass] = size size_max[klass] = size size_sum[klass] = size maximal[klass] = obj specs[klass] = get_specs(obj) if schema: schema_output.write(format_specs(klass)) else: counts[klass] += 1 size_min[klass] = min(size, size_min[klass]) if size > size_max[klass]: maximal[klass] = obj size_max[klass] = size size_sum[klass] += size if show_all or klass not in problem_record: problems = get_spec_problems(obj, specs=specs[klass]) if not problems: problems = list_validation_errors(obj) if problems: problem_record.setdefault(klass, []).append((obj, problems)) obj._p_set_status_ghost() instance_count += 1 if instance_count % 10000 == 0: sys.stdout.write('.') sys.stdout.flush() connection.abort() end = datetime.now() print("") print(file) print(end) seconds = (end - start).seconds if seconds > 60: minutes, seconds = divmod(seconds, 60) fmt = "%sm, %ss" % (minutes, seconds) else: fmt = "%ss" % seconds print("Scanned %s instances in in %s." % (instance_count, fmt)) klasses = sorted(counts, key=lambda x: x.__name__) if show_path: where_is = ObjectLocator(connection, refs).where_is else: def where_is(obj): return obj._p_format_durus_id() for klass in klasses: if klass in problem_record: for obj, problem in problem_record[klass]: print("") print("Problem with instance of %s:" % (klass.__name__)) print(' %s' % where_is(obj)) for line in problem: if len(line) > 200: line = line[:200] + ' ...' print(' ' + line) print("") print("%7s %-35s %7s %7s %7s %12s" % ( "Count", "Class", "Min", "Max", "Ave", "Sum")) for klass in klasses: print("%7s %-35s %7s %7s %7s %12s" % ( counts[klass], klass.__name__, size_min[klass], size_max[klass], int(size_sum[klass] / counts[klass]), size_sum[klass])) if show_max: print("") print('Maximal Size Instances:') for klass in klasses: print(klass.__name__) print(' %s' % where_is(maximal[klass])) if show_shadows: for klass in get_all_classes(): print_shadows(klass) def get_all_classes(): all_classes = set() todo = [object] while todo: klass = todo.pop() all_classes.add(klass) for subclass in type.__subclasses__(klass): if subclass not in all_classes: todo.append(subclass) return all_classes def print_shadows(klass): """ Report attributes of klass whose value depends on base class order. We try to avoid having any of these. """ warnings = [] # There are a few names where shadows seem to pose no problem. skip_names = ['__slots__', '__dict__', '__weakref__'] for name in dir(klass): if name not in skip_names and name not in vars(klass): first_parent = None for parent in klass.__mro__[1:]: if name in vars(parent): if first_parent is None: first_parent = parent elif not issubclass(first_parent, parent): warnings.append((name, first_parent, parent)) if warnings: print('\nIn %s' % klass) for name, first, other in warnings: print(' %s from %s shadows the one from %s.' % ( name, first.__name__, other.__name__)) print("") def check_durus_main(): from optparse import OptionParser parser = OptionParser() parser.set_description( "Searches for spec and validation problems in a durus database. " "The method 'list_validation_errors(), when present, is called to " "identify validation errors other than spec-related errros. " "Specific site names may be given as extra arguments. ") parser.add_option( '--file', dest="file", default=None, help="If this is given, check this file only.") parser.add_option( '--schema', dest="schema", default=None, help="If given, a schema will be written to this file.") parser.add_option( '--class', dest="classes", default=[], action="append", help="If given, limit study to this class.") parser.add_option( '--all', dest="all", action='store_true', help="If given, all errors are shown not just the first in each class.") parser.add_option( '--showmax', dest="show_max", action='store_true', help=("If given, an instance of maximal size is identified for" " each class.")) parser.add_option( '--shadows', dest="show_shadows", action='store_true', help=("If given, also report known class attributes whose value " "depends on the base class order.")) parser.add_option( '--showpath', dest="show_path", action='store_true', help="If given, work harder to show a path to instances.") (options, args) = parser.parse_args() if options.file: check_durus(options.file, options.schema, options.classes, options.all, options.show_max, options.show_path, options.show_shadows) elif args: for name in args: print("%s:" % name) site = Site(name) check_durus(site, options.schema, options.classes, options.all, options.show_max, options.show_path, options.show_shadows) if __name__ == '__main__': check_durus_main()