Staging
v0.5.1
https://github.com/python/cpython
Revision ded18d634f2e1615b746dca2698d6a98ff63045d authored by Guido van Rossum on 28 March 2002, 20:21:21 UTC, committed by Guido van Rossum on 28 March 2002, 20:21:21 UTC
this is what Zope 2 will be using in the foreseeable future).

Fix an issue that was reported in but unrelated to the main problem of
SF bug 535905 (Evil Trashcan and GC interaction).

The SETLOCAL() macro should not DECREF the local variable in-place and
then store the new value; it should copy the old value to a temporary
value, then store the new value, and then DECREF the temporary value.
This is because it is possible that during the DECREF the frame is
accessed by other code (e.g. a __del__ method or gc.collect()) and the
variable would be pointing to already-freed memory.

BUGFIX CANDIDATE!
1 parent 00be96e
Raw File
Tip revision: ded18d634f2e1615b746dca2698d6a98ff63045d authored by Guido van Rossum on 28 March 2002, 20:21:21 UTC
Backport for 2.1.3 (if we ever release it; we may have to because
Tip revision: ded18d6
threading.py
"""Proposed new threading module, emulating a subset of Java's threading model."""

import sys
import time
import thread
import traceback
import StringIO

# Rename some stuff so "from threading import *" is safe

_sys = sys
del sys

_time = time.time
_sleep = time.sleep
del time

_start_new_thread = thread.start_new_thread
_allocate_lock = thread.allocate_lock
_get_ident = thread.get_ident
ThreadError = thread.error
del thread

_print_exc = traceback.print_exc
del traceback

_StringIO = StringIO.StringIO
del StringIO


# Debug support (adapted from ihooks.py)

_VERBOSE = 0

if __debug__:

    class _Verbose:

        def __init__(self, verbose=None):
            if verbose is None:
                verbose = _VERBOSE
            self.__verbose = verbose

        def _note(self, format, *args):
            if self.__verbose:
                format = format % args
                format = "%s: %s\n" % (
                    currentThread().getName(), format)
                _sys.stderr.write(format)

else:
    # Disable this when using "python -O"
    class _Verbose:
        def __init__(self, verbose=None):
            pass
        def _note(self, *args):
            pass


# Synchronization classes

Lock = _allocate_lock

def RLock(*args, **kwargs):
    return apply(_RLock, args, kwargs)

class _RLock(_Verbose):

    def __init__(self, verbose=None):
        _Verbose.__init__(self, verbose)
        self.__block = _allocate_lock()
        self.__owner = None
        self.__count = 0

    def __repr__(self):
        return "<%s(%s, %d)>" % (
                self.__class__.__name__,
                self.__owner and self.__owner.getName(),
                self.__count)

    def acquire(self, blocking=1):
        me = currentThread()
        if self.__owner is me:
            self.__count = self.__count + 1
            if __debug__:
                self._note("%s.acquire(%s): recursive success", self, blocking)
            return 1
        rc = self.__block.acquire(blocking)
        if rc:
            self.__owner = me
            self.__count = 1
            if __debug__:
                self._note("%s.acquire(%s): initial succes", self, blocking)
        else:
            if __debug__:
                self._note("%s.acquire(%s): failure", self, blocking)
        return rc

    def release(self):
        me = currentThread()
        assert self.__owner is me, "release() of un-acquire()d lock"
        self.__count = count = self.__count - 1
        if not count:
            self.__owner = None
            self.__block.release()
            if __debug__:
                self._note("%s.release(): final release", self)
        else:
            if __debug__:
                self._note("%s.release(): non-final release", self)

    # Internal methods used by condition variables

    def _acquire_restore(self, (count, owner)):
        self.__block.acquire()
        self.__count = count
        self.__owner = owner
        if __debug__:
            self._note("%s._acquire_restore()", self)

    def _release_save(self):
        if __debug__:
            self._note("%s._release_save()", self)
        count = self.__count
        self.__count = 0
        owner = self.__owner
        self.__owner = None
        self.__block.release()
        return (count, owner)

    def _is_owned(self):
        return self.__owner is currentThread()


def Condition(*args, **kwargs):
    return apply(_Condition, args, kwargs)

class _Condition(_Verbose):

    def __init__(self, lock=None, verbose=None):
        _Verbose.__init__(self, verbose)
        if lock is None:
            lock = RLock()
        self.__lock = lock
        # Export the lock's acquire() and release() methods
        self.acquire = lock.acquire
        self.release = lock.release
        # If the lock defines _release_save() and/or _acquire_restore(),
        # these override the default implementations (which just call
        # release() and acquire() on the lock).  Ditto for _is_owned().
        try:
            self._release_save = lock._release_save
        except AttributeError:
            pass
        try:
            self._acquire_restore = lock._acquire_restore
        except AttributeError:
            pass
        try:
            self._is_owned = lock._is_owned
        except AttributeError:
            pass
        self.__waiters = []

    def __repr__(self):
        return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))

    def _release_save(self):
        self.__lock.release()           # No state to save

    def _acquire_restore(self, x):
        self.__lock.acquire()           # Ignore saved state

    def _is_owned(self):
        if self.__lock.acquire(0):
            self.__lock.release()
            return 0
        else:
            return 1

    def wait(self, timeout=None):
        me = currentThread()
        assert self._is_owned(), "wait() of un-acquire()d lock"
        waiter = _allocate_lock()
        waiter.acquire()
        self.__waiters.append(waiter)
        saved_state = self._release_save()
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                waiter.acquire()
                if __debug__:
                    self._note("%s.wait(): got it", self)
            else:
                endtime = _time() + timeout
                delay = 0.000001 # 1 usec
                while 1:
                    gotit = waiter.acquire(0)
                    if gotit or _time() >= endtime:
                        break
                    _sleep(delay)
                    if delay < 1.0:
                        delay = delay * 2.0
                if not gotit:
                    if __debug__:
                        self._note("%s.wait(%s): timed out", self, timeout)
                    try:
                        self.__waiters.remove(waiter)
                    except ValueError:
                        pass
                else:
                    if __debug__:
                        self._note("%s.wait(%s): got it", self, timeout)
        finally:
            self._acquire_restore(saved_state)

    def notify(self, n=1):
        me = currentThread()
        assert self._is_owned(), "notify() of un-acquire()d lock"
        __waiters = self.__waiters
        waiters = __waiters[:n]
        if not waiters:
            if __debug__:
                self._note("%s.notify(): no waiters", self)
            return
        self._note("%s.notify(): notifying %d waiter%s", self, n,
                   n!=1 and "s" or "")
        for waiter in waiters:
            waiter.release()
            try:
                __waiters.remove(waiter)
            except ValueError:
                pass

    def notifyAll(self):
        self.notify(len(self.__waiters))


def Semaphore(*args, **kwargs):
    return apply(_Semaphore, args, kwargs)

class _Semaphore(_Verbose):

    # After Tim Peters' semaphore class, but not quite the same (no maximum)

    def __init__(self, value=1, verbose=None):
        assert value >= 0, "Semaphore initial value must be >= 0"
        _Verbose.__init__(self, verbose)
        self.__cond = Condition(Lock())
        self.__value = value

    def acquire(self, blocking=1):
        rc = 0
        self.__cond.acquire()
        while self.__value == 0:
            if not blocking:
                break
            self.__cond.wait()
        else:
            self.__value = self.__value - 1
            rc = 1
        self.__cond.release()
        return rc

    def release(self):
        self.__cond.acquire()
        self.__value = self.__value + 1
        self.__cond.notify()
        self.__cond.release()


def Event(*args, **kwargs):
    return apply(_Event, args, kwargs)

class _Event(_Verbose):

    # After Tim Peters' event class (without is_posted())

    def __init__(self, verbose=None):
        _Verbose.__init__(self, verbose)
        self.__cond = Condition(Lock())
        self.__flag = 0

    def isSet(self):
        return self.__flag

    def set(self):
        self.__cond.acquire()
        self.__flag = 1
        self.__cond.notifyAll()
        self.__cond.release()

    def clear(self):
        self.__cond.acquire()
        self.__flag = 0
        self.__cond.release()

    def wait(self, timeout=None):
        self.__cond.acquire()
        if not self.__flag:
            self.__cond.wait(timeout)
        self.__cond.release()


# Helper to generate new thread names
_counter = 0
def _newname(template="Thread-%d"):
    global _counter
    _counter = _counter + 1
    return template % _counter

# Active thread administration
_active_limbo_lock = _allocate_lock()
_active = {}
_limbo = {}


# Main class for threads

class Thread(_Verbose):

    __initialized = 0

    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, verbose=None):
        assert group is None, "group argument must be None for now"
        _Verbose.__init__(self, verbose)
        self.__target = target
        self.__name = str(name or _newname())
        self.__args = args
        self.__kwargs = kwargs
        self.__daemonic = self._set_daemon()
        self.__started = 0
        self.__stopped = 0
        self.__block = Condition(Lock())
        self.__initialized = 1

    def _set_daemon(self):
        # Overridden in _MainThread and _DummyThread
        return currentThread().isDaemon()

    def __repr__(self):
        assert self.__initialized, "Thread.__init__() was not called"
        status = "initial"
        if self.__started:
            status = "started"
        if self.__stopped:
            status = "stopped"
        if self.__daemonic:
            status = status + " daemon"
        return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)

    def start(self):
        assert self.__initialized, "Thread.__init__() not called"
        assert not self.__started, "thread already started"
        if __debug__:
            self._note("%s.start(): starting thread", self)
        _active_limbo_lock.acquire()
        _limbo[self] = self
        _active_limbo_lock.release()
        _start_new_thread(self.__bootstrap, ())
        self.__started = 1
        _sleep(0.000001)    # 1 usec, to let the thread run (Solaris hack)

    def run(self):
        if self.__target:
            apply(self.__target, self.__args, self.__kwargs)

    def __bootstrap(self):
        try:
            self.__started = 1
            _active_limbo_lock.acquire()
            _active[_get_ident()] = self
            del _limbo[self]
            _active_limbo_lock.release()
            if __debug__:
                self._note("%s.__bootstrap(): thread started", self)
            try:
                self.run()
            except SystemExit:
                if __debug__:
                    self._note("%s.__bootstrap(): raised SystemExit", self)
            except:
                if __debug__:
                    self._note("%s.__bootstrap(): unhandled exception", self)
                s = _StringIO()
                _print_exc(file=s)
                _sys.stderr.write("Exception in thread %s:\n%s\n" %
                                 (self.getName(), s.getvalue()))
            else:
                if __debug__:
                    self._note("%s.__bootstrap(): normal return", self)
        finally:
            self.__stop()
            try:
                self.__delete()
            except:
                pass

    def __stop(self):
        self.__block.acquire()
        self.__stopped = 1
        self.__block.notifyAll()
        self.__block.release()

    def __delete(self):
        _active_limbo_lock.acquire()
        del _active[_get_ident()]
        _active_limbo_lock.release()

    def join(self, timeout=None):
        assert self.__initialized, "Thread.__init__() not called"
        assert self.__started, "cannot join thread before it is started"
        assert self is not currentThread(), "cannot join current thread"
        if __debug__:
            if not self.__stopped:
                self._note("%s.join(): waiting until thread stops", self)
        self.__block.acquire()
        if timeout is None:
            while not self.__stopped:
                self.__block.wait()
            if __debug__:
                self._note("%s.join(): thread stopped", self)
        else:
            deadline = _time() + timeout
            while not self.__stopped:
                delay = deadline - _time()
                if delay <= 0:
                    if __debug__:
                        self._note("%s.join(): timed out", self)
                    break
                self.__block.wait(delay)
            else:
                if __debug__:
                    self._note("%s.join(): thread stopped", self)
        self.__block.release()

    def getName(self):
        assert self.__initialized, "Thread.__init__() not called"
        return self.__name

    def setName(self, name):
        assert self.__initialized, "Thread.__init__() not called"
        self.__name = str(name)

    def isAlive(self):
        assert self.__initialized, "Thread.__init__() not called"
        return self.__started and not self.__stopped

    def isDaemon(self):
        assert self.__initialized, "Thread.__init__() not called"
        return self.__daemonic

    def setDaemon(self, daemonic):
        assert self.__initialized, "Thread.__init__() not called"
        assert not self.__started, "cannot set daemon status of active thread"
        self.__daemonic = daemonic


# Special thread class to represent the main thread
# This is garbage collected through an exit handler

class _MainThread(Thread):

    def __init__(self):
        Thread.__init__(self, name="MainThread")
        self._Thread__started = 1
        _active_limbo_lock.acquire()
        _active[_get_ident()] = self
        _active_limbo_lock.release()
        import atexit
        atexit.register(self.__exitfunc)

    def _set_daemon(self):
        return 0

    def __exitfunc(self):
        self._Thread__stop()
        t = _pickSomeNonDaemonThread()
        if t:
            if __debug__:
                self._note("%s: waiting for other threads", self)
        while t:
            t.join()
            t = _pickSomeNonDaemonThread()
        if __debug__:
            self._note("%s: exiting", self)
        self._Thread__delete()

def _pickSomeNonDaemonThread():
    for t in enumerate():
        if not t.isDaemon() and t.isAlive():
            return t
    return None


# Dummy thread class to represent threads not started here.
# These aren't garbage collected when they die,
# nor can they be waited for.
# Their purpose is to return *something* from currentThread().
# They are marked as daemon threads so we won't wait for them
# when we exit (conform previous semantics).

class _DummyThread(Thread):

    def __init__(self):
        Thread.__init__(self, name=_newname("Dummy-%d"))
        self._Thread__started = 1
        _active_limbo_lock.acquire()
        _active[_get_ident()] = self
        _active_limbo_lock.release()

    def _set_daemon(self):
        return 1

    def join(self):
        assert 0, "cannot join a dummy thread"


# Global API functions

def currentThread():
    try:
        return _active[_get_ident()]
    except KeyError:
        ##print "currentThread(): no current thread for", _get_ident()
        return _DummyThread()

def activeCount():
    _active_limbo_lock.acquire()
    count = len(_active) + len(_limbo)
    _active_limbo_lock.release()
    return count

def enumerate():
    _active_limbo_lock.acquire()
    active = _active.values() + _limbo.values()
    _active_limbo_lock.release()
    return active


# Create the main thread object

_MainThread()


# Self-test code

def _test():

    import random

    class BoundedQueue(_Verbose):

        def __init__(self, limit):
            _Verbose.__init__(self)
            self.mon = RLock()
            self.rc = Condition(self.mon)
            self.wc = Condition(self.mon)
            self.limit = limit
            self.queue = []

        def put(self, item):
            self.mon.acquire()
            while len(self.queue) >= self.limit:
                self._note("put(%s): queue full", item)
                self.wc.wait()
            self.queue.append(item)
            self._note("put(%s): appended, length now %d",
                       item, len(self.queue))
            self.rc.notify()
            self.mon.release()

        def get(self):
            self.mon.acquire()
            while not self.queue:
                self._note("get(): queue empty")
                self.rc.wait()
            item = self.queue[0]
            del self.queue[0]
            self._note("get(): got %s, %d left", item, len(self.queue))
            self.wc.notify()
            self.mon.release()
            return item

    class ProducerThread(Thread):

        def __init__(self, queue, quota):
            Thread.__init__(self, name="Producer")
            self.queue = queue
            self.quota = quota

        def run(self):
            from random import random
            counter = 0
            while counter < self.quota:
                counter = counter + 1
                self.queue.put("%s.%d" % (self.getName(), counter))
                _sleep(random() * 0.00001)


    class ConsumerThread(Thread):

        def __init__(self, queue, count):
            Thread.__init__(self, name="Consumer")
            self.queue = queue
            self.count = count

        def run(self):
            while self.count > 0:
                item = self.queue.get()
                print item
                self.count = self.count - 1

    import time

    NP = 3
    QL = 4
    NI = 5

    Q = BoundedQueue(QL)
    P = []
    for i in range(NP):
        t = ProducerThread(Q, NI)
        t.setName("Producer-%d" % (i+1))
        P.append(t)
    C = ConsumerThread(Q, NI*NP)
    for t in P:
        t.start()
        _sleep(0.000001)
    C.start()
    for t in P:
        t.join()
    C.join()

if __name__ == '__main__':
    _test()
back to top