Staging
v0.5.1
https://github.com/python/cpython
Raw File
Tip revision: 426b022776672fdf3d71ddd98d89af341c88080f authored by Larry Hastings on 05 September 2020, 07:22:07 UTC
Version bump for 3.5.10.
Tip revision: 426b022
test_locks.py
from . import util as test_util

init = test_util.import_importlib('importlib')

import sys
import time
import unittest
import weakref

from test import support

try:
    import threading
except ImportError:
    threading = None
else:
    from test import lock_tests

if threading is not None:
    class ModuleLockAsRLockTests:
        locktype = classmethod(lambda cls: cls.LockType("some_lock"))

        # _is_owned() unsupported
        test__is_owned = None
        # acquire(blocking=False) unsupported
        test_try_acquire = None
        test_try_acquire_contended = None
        # `with` unsupported
        test_with = None
        # acquire(timeout=...) unsupported
        test_timeout = None
        # _release_save() unsupported
        test_release_save_unacquired = None
        # lock status in repr unsupported
        test_repr = None
        test_locked_repr = None

    LOCK_TYPES = {kind: splitinit._bootstrap._ModuleLock
                  for kind, splitinit in init.items()}

    (Frozen_ModuleLockAsRLockTests,
     Source_ModuleLockAsRLockTests
     ) = test_util.test_both(ModuleLockAsRLockTests, lock_tests.RLockTests,
                             LockType=LOCK_TYPES)
else:
    LOCK_TYPES = {}

    class Frozen_ModuleLockAsRLockTests(unittest.TestCase):
        pass

    class Source_ModuleLockAsRLockTests(unittest.TestCase):
        pass


if threading is not None:
    class DeadlockAvoidanceTests:

        def setUp(self):
            try:
                self.old_switchinterval = sys.getswitchinterval()
                sys.setswitchinterval(0.000001)
            except AttributeError:
                self.old_switchinterval = None

        def tearDown(self):
            if self.old_switchinterval is not None:
                sys.setswitchinterval(self.old_switchinterval)

        def run_deadlock_avoidance_test(self, create_deadlock):
            NLOCKS = 10
            locks = [self.LockType(str(i)) for i in range(NLOCKS)]
            pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)]
            if create_deadlock:
                NTHREADS = NLOCKS
            else:
                NTHREADS = NLOCKS - 1
            barrier = threading.Barrier(NTHREADS)
            results = []

            def _acquire(lock):
                """Try to acquire the lock. Return True on success,
                False on deadlock."""
                try:
                    lock.acquire()
                except self.DeadlockError:
                    return False
                else:
                    return True

            def f():
                a, b = pairs.pop()
                ra = _acquire(a)
                barrier.wait()
                rb = _acquire(b)
                results.append((ra, rb))
                if rb:
                    b.release()
                if ra:
                    a.release()
            lock_tests.Bunch(f, NTHREADS).wait_for_finished()
            self.assertEqual(len(results), NTHREADS)
            return results

        def test_deadlock(self):
            results = self.run_deadlock_avoidance_test(True)
            # At least one of the threads detected a potential deadlock on its
            # second acquire() call.  It may be several of them, because the
            # deadlock avoidance mechanism is conservative.
            nb_deadlocks = results.count((True, False))
            self.assertGreaterEqual(nb_deadlocks, 1)
            self.assertEqual(results.count((True, True)), len(results) - nb_deadlocks)

        def test_no_deadlock(self):
            results = self.run_deadlock_avoidance_test(False)
            self.assertEqual(results.count((True, False)), 0)
            self.assertEqual(results.count((True, True)), len(results))


    DEADLOCK_ERRORS = {kind: splitinit._bootstrap._DeadlockError
                       for kind, splitinit in init.items()}

    (Frozen_DeadlockAvoidanceTests,
     Source_DeadlockAvoidanceTests
     ) = test_util.test_both(DeadlockAvoidanceTests,
                             LockType=LOCK_TYPES,
                             DeadlockError=DEADLOCK_ERRORS)
else:
    DEADLOCK_ERRORS = {}

    class Frozen_DeadlockAvoidanceTests(unittest.TestCase):
        pass

    class Source_DeadlockAvoidanceTests(unittest.TestCase):
        pass


class LifetimeTests:

    @property
    def bootstrap(self):
        return self.init._bootstrap

    def test_lock_lifetime(self):
        name = "xyzzy"
        self.assertNotIn(name, self.bootstrap._module_locks)
        lock = self.bootstrap._get_module_lock(name)
        self.assertIn(name, self.bootstrap._module_locks)
        wr = weakref.ref(lock)
        del lock
        support.gc_collect()
        self.assertNotIn(name, self.bootstrap._module_locks)
        self.assertIsNone(wr())

    def test_all_locks(self):
        support.gc_collect()
        self.assertEqual(0, len(self.bootstrap._module_locks),
                         self.bootstrap._module_locks)


(Frozen_LifetimeTests,
 Source_LifetimeTests
 ) = test_util.test_both(LifetimeTests, init=init)


@support.reap_threads
def test_main():
    support.run_unittest(Frozen_ModuleLockAsRLockTests,
                         Source_ModuleLockAsRLockTests,
                         Frozen_DeadlockAvoidanceTests,
                         Source_DeadlockAvoidanceTests,
                         Frozen_LifetimeTests,
                         Source_LifetimeTests)


if __name__ == '__main__':
    test_main()
back to top