Staging
v0.5.1
https://github.com/python/cpython
Raw File
Tip revision: de50cdfcf9e3ba415fa09a3c9203d4d95a14e5b3 authored by Benjamin Peterson on 31 May 2014, 18:22:47 UTC
fix news header
Tip revision: de50cdf
__init__.py
import os, sys, unittest, getopt, time

use_resources = []

class ResourceDenied(Exception):
    """Test skipped because it requested a disallowed resource.

    This is raised when a test calls requires() for a resource that
    has not be enabled.  Resources are defined by test modules.
    """

def is_resource_enabled(resource):
    """Test whether a resource is enabled.

    If the caller's module is __main__ then automatically return True."""
    if sys._getframe().f_back.f_globals.get("__name__") == "__main__":
        return True
    result = use_resources is not None and \
           (resource in use_resources or "*" in use_resources)
    if not result:
        _unavail[resource] = None
    return result

_unavail = {}
def requires(resource, msg=None):
    """Raise ResourceDenied if the specified resource is not available.

    If the caller's module is __main__ then automatically return True."""
    # see if the caller's module is __main__ - if so, treat as if
    # the resource was set
    if sys._getframe().f_back.f_globals.get("__name__") == "__main__":
        return
    if not is_resource_enabled(resource):
        if msg is None:
            msg = "Use of the `%s' resource not enabled" % resource
        raise ResourceDenied(msg)

def find_package_modules(package, mask):
    import fnmatch
    if (hasattr(package, "__loader__") and
            hasattr(package.__loader__, '_files')):
        path = package.__name__.replace(".", os.path.sep)
        mask = os.path.join(path, mask)
        for fnm in package.__loader__._files.iterkeys():
            if fnmatch.fnmatchcase(fnm, mask):
                yield os.path.splitext(fnm)[0].replace(os.path.sep, ".")
    else:
        path = package.__path__[0]
        for fnm in os.listdir(path):
            if fnmatch.fnmatchcase(fnm, mask):
                yield "%s.%s" % (package.__name__, os.path.splitext(fnm)[0])

def get_tests(package, mask, verbosity, exclude=()):
    """Return a list of skipped test modules, and a list of test cases."""
    tests = []
    skipped = []
    for modname in find_package_modules(package, mask):
        if modname.split(".")[-1] in exclude:
            skipped.append(modname)
            if verbosity > 1:
                print >> sys.stderr, "Skipped %s: excluded" % modname
            continue
        try:
            mod = __import__(modname, globals(), locals(), ['*'])
        except (ResourceDenied, unittest.SkipTest) as detail:
            skipped.append(modname)
            if verbosity > 1:
                print >> sys.stderr, "Skipped %s: %s" % (modname, detail)
            continue
        for name in dir(mod):
            if name.startswith("_"):
                continue
            o = getattr(mod, name)
            if type(o) is type(unittest.TestCase) and issubclass(o, unittest.TestCase):
                tests.append(o)
    return skipped, tests

def usage():
    print __doc__
    return 1

def test_with_refcounts(runner, verbosity, testcase):
    """Run testcase several times, tracking reference counts."""
    import gc
    import ctypes
    ptc = ctypes._pointer_type_cache.copy()
    cfc = ctypes._c_functype_cache.copy()
    wfc = ctypes._win_functype_cache.copy()

    # when searching for refcount leaks, we have to manually reset any
    # caches that ctypes has.
    def cleanup():
        ctypes._pointer_type_cache = ptc.copy()
        ctypes._c_functype_cache = cfc.copy()
        ctypes._win_functype_cache = wfc.copy()
        gc.collect()

    test = unittest.makeSuite(testcase)
    for i in range(5):
        rc = sys.gettotalrefcount()
        runner.run(test)
        cleanup()
    COUNT = 5
    refcounts = [None] * COUNT
    for i in range(COUNT):
        rc = sys.gettotalrefcount()
        runner.run(test)
        cleanup()
        refcounts[i] = sys.gettotalrefcount() - rc
    if filter(None, refcounts):
        print "%s leaks:\n\t" % testcase, refcounts
    elif verbosity:
        print "%s: ok." % testcase

class TestRunner(unittest.TextTestRunner):
    def run(self, test, skipped):
        "Run the given test case or test suite."
        # Same as unittest.TextTestRunner.run, except that it reports
        # skipped tests.
        result = self._makeResult()
        startTime = time.time()
        test(result)
        stopTime = time.time()
        timeTaken = stopTime - startTime
        result.printErrors()
        self.stream.writeln(result.separator2)
        run = result.testsRun
        if _unavail: #skipped:
            requested = _unavail.keys()
            requested.sort()
            self.stream.writeln("Ran %d test%s in %.3fs (%s module%s skipped)" %
                                (run, run != 1 and "s" or "", timeTaken,
                                 len(skipped),
                                 len(skipped) != 1 and "s" or ""))
            self.stream.writeln("Unavailable resources: %s" % ", ".join(requested))
        else:
            self.stream.writeln("Ran %d test%s in %.3fs" %
                                (run, run != 1 and "s" or "", timeTaken))
        self.stream.writeln()
        if not result.wasSuccessful():
            self.stream.write("FAILED (")
            failed, errored = map(len, (result.failures, result.errors))
            if failed:
                self.stream.write("failures=%d" % failed)
            if errored:
                if failed: self.stream.write(", ")
                self.stream.write("errors=%d" % errored)
            self.stream.writeln(")")
        else:
            self.stream.writeln("OK")
        return result


def main(*packages):
    try:
        opts, args = getopt.getopt(sys.argv[1:], "rqvu:x:")
    except getopt.error:
        return usage()

    verbosity = 1
    search_leaks = False
    exclude = []
    for flag, value in opts:
        if flag == "-q":
            verbosity -= 1
        elif flag == "-v":
            verbosity += 1
        elif flag == "-r":
            try:
                sys.gettotalrefcount
            except AttributeError:
                print >> sys.stderr, "-r flag requires Python debug build"
                return -1
            search_leaks = True
        elif flag == "-u":
            use_resources.extend(value.split(","))
        elif flag == "-x":
            exclude.extend(value.split(","))

    mask = "test_*.py"
    if args:
        mask = args[0]

    for package in packages:
        run_tests(package, mask, verbosity, search_leaks, exclude)


def run_tests(package, mask, verbosity, search_leaks, exclude):
    skipped, testcases = get_tests(package, mask, verbosity, exclude)
    runner = TestRunner(verbosity=verbosity)

    suites = [unittest.makeSuite(o) for o in testcases]
    suite = unittest.TestSuite(suites)
    result = runner.run(suite, skipped)

    if search_leaks:
        # hunt for refcount leaks
        runner = BasicTestRunner()
        for t in testcases:
            test_with_refcounts(runner, verbosity, t)

    return bool(result.errors)

class BasicTestRunner:
    def run(self, test):
        result = unittest.TestResult()
        test(result)
        return result
back to top