Staging
v0.5.1
https://github.com/python/cpython
Raw File
Tip revision: 3556b2db57ab3b17b3d4c6b28cdc604a3e0168e8 authored by Anthony Baxter on 27 April 2006, 02:13:13 UTC
2.5a2
Tip revision: 3556b2d
fork_wait.py
"""This test case provides support for checking forking and wait behavior.

To test different wait behavior, overrise the wait_impl method.

We want fork1() semantics -- only the forking thread survives in the
child after a fork().

On some systems (e.g. Solaris without posix threads) we find that all
active threads survive in the child after a fork(); this is an error.

While BeOS doesn't officially support fork and native threading in
the same application, the present example should work just fine.  DC
"""

import os, sys, time, thread, unittest
from test.test_support import TestSkipped

LONGSLEEP = 2
SHORTSLEEP = 0.5
NUM_THREADS = 4

class ForkWait(unittest.TestCase):

    def setUp(self):
        self.alive = {}
        self.stop = 0

    def f(self, id):
        while not self.stop:
            self.alive[id] = os.getpid()
            try:
                time.sleep(SHORTSLEEP)
            except IOError:
                pass

    def wait_impl(self, cpid):
        spid, status = os.waitpid(cpid, 0)
        self.assertEquals(spid, cpid)
        self.assertEquals(status, 0, "cause = %d, exit = %d" % (status&0xff, status>>8))

    def test_wait(self):
        for i in range(NUM_THREADS):
            thread.start_new(self.f, (i,))

        time.sleep(LONGSLEEP)

        a = self.alive.keys()
        a.sort()
        self.assertEquals(a, range(NUM_THREADS))

        prefork_lives = self.alive.copy()

        if sys.platform in ['unixware7']:
            cpid = os.fork1()
        else:
            cpid = os.fork()

        if cpid == 0:
            # Child
            time.sleep(LONGSLEEP)
            n = 0
            for key in self.alive:
                if self.alive[key] != prefork_lives[key]:
                    n += 1
            os._exit(n)
        else:
            # Parent
            self.wait_impl(cpid)
            # Tell threads to die
            self.stop = 1
            time.sleep(2*SHORTSLEEP) # Wait for threads to die
back to top