Staging
v0.5.1
https://github.com/python/cpython
Raw File
Tip revision: 8f51bb436f8adfd139cad046b91cd462c7f27f6c authored by Ned Deily on 19 September 2017, 05:09:03 UTC
Bump to 3.6.0a1
Tip revision: 8f51bb4
test_windows_utils.py
"""Tests for window_utils"""

import socket
import sys
import unittest
import warnings
from unittest import mock

if sys.platform != 'win32':
    raise unittest.SkipTest('Windows only')

import _winapi

from asyncio import _overlapped
from asyncio import windows_utils
try:
    from test import support
except ImportError:
    from asyncio import test_support as support


class WinsocketpairTests(unittest.TestCase):

    def check_winsocketpair(self, ssock, csock):
        csock.send(b'xxx')
        self.assertEqual(b'xxx', ssock.recv(1024))
        csock.close()
        ssock.close()

    def test_winsocketpair(self):
        ssock, csock = windows_utils.socketpair()
        self.check_winsocketpair(ssock, csock)

    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled')
    def test_winsocketpair_ipv6(self):
        ssock, csock = windows_utils.socketpair(family=socket.AF_INET6)
        self.check_winsocketpair(ssock, csock)

    @unittest.skipIf(hasattr(socket, 'socketpair'),
                     'socket.socketpair is available')
    @mock.patch('asyncio.windows_utils.socket')
    def test_winsocketpair_exc(self, m_socket):
        m_socket.AF_INET = socket.AF_INET
        m_socket.SOCK_STREAM = socket.SOCK_STREAM
        m_socket.socket.return_value.getsockname.return_value = ('', 12345)
        m_socket.socket.return_value.accept.return_value = object(), object()
        m_socket.socket.return_value.connect.side_effect = OSError()

        self.assertRaises(OSError, windows_utils.socketpair)

    def test_winsocketpair_invalid_args(self):
        self.assertRaises(ValueError,
                          windows_utils.socketpair, family=socket.AF_UNSPEC)
        self.assertRaises(ValueError,
                          windows_utils.socketpair, type=socket.SOCK_DGRAM)
        self.assertRaises(ValueError,
                          windows_utils.socketpair, proto=1)

    @unittest.skipIf(hasattr(socket, 'socketpair'),
                     'socket.socketpair is available')
    @mock.patch('asyncio.windows_utils.socket')
    def test_winsocketpair_close(self, m_socket):
        m_socket.AF_INET = socket.AF_INET
        m_socket.SOCK_STREAM = socket.SOCK_STREAM
        sock = mock.Mock()
        m_socket.socket.return_value = sock
        sock.bind.side_effect = OSError
        self.assertRaises(OSError, windows_utils.socketpair)
        self.assertTrue(sock.close.called)


class PipeTests(unittest.TestCase):

    def test_pipe_overlapped(self):
        h1, h2 = windows_utils.pipe(overlapped=(True, True))
        try:
            ov1 = _overlapped.Overlapped()
            self.assertFalse(ov1.pending)
            self.assertEqual(ov1.error, 0)

            ov1.ReadFile(h1, 100)
            self.assertTrue(ov1.pending)
            self.assertEqual(ov1.error, _winapi.ERROR_IO_PENDING)
            ERROR_IO_INCOMPLETE = 996
            try:
                ov1.getresult()
            except OSError as e:
                self.assertEqual(e.winerror, ERROR_IO_INCOMPLETE)
            else:
                raise RuntimeError('expected ERROR_IO_INCOMPLETE')

            ov2 = _overlapped.Overlapped()
            self.assertFalse(ov2.pending)
            self.assertEqual(ov2.error, 0)

            ov2.WriteFile(h2, b"hello")
            self.assertIn(ov2.error, {0, _winapi.ERROR_IO_PENDING})

            res = _winapi.WaitForMultipleObjects([ov2.event], False, 100)
            self.assertEqual(res, _winapi.WAIT_OBJECT_0)

            self.assertFalse(ov1.pending)
            self.assertEqual(ov1.error, ERROR_IO_INCOMPLETE)
            self.assertFalse(ov2.pending)
            self.assertIn(ov2.error, {0, _winapi.ERROR_IO_PENDING})
            self.assertEqual(ov1.getresult(), b"hello")
        finally:
            _winapi.CloseHandle(h1)
            _winapi.CloseHandle(h2)

    def test_pipe_handle(self):
        h, _ = windows_utils.pipe(overlapped=(True, True))
        _winapi.CloseHandle(_)
        p = windows_utils.PipeHandle(h)
        self.assertEqual(p.fileno(), h)
        self.assertEqual(p.handle, h)

        # check garbage collection of p closes handle
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore", "",  ResourceWarning)
            del p
            support.gc_collect()
        try:
            _winapi.CloseHandle(h)
        except OSError as e:
            self.assertEqual(e.winerror, 6)     # ERROR_INVALID_HANDLE
        else:
            raise RuntimeError('expected ERROR_INVALID_HANDLE')


class PopenTests(unittest.TestCase):

    def test_popen(self):
        command = r"""if 1:
            import sys
            s = sys.stdin.readline()
            sys.stdout.write(s.upper())
            sys.stderr.write('stderr')
            """
        msg = b"blah\n"

        p = windows_utils.Popen([sys.executable, '-c', command],
                                stdin=windows_utils.PIPE,
                                stdout=windows_utils.PIPE,
                                stderr=windows_utils.PIPE)

        for f in [p.stdin, p.stdout, p.stderr]:
            self.assertIsInstance(f, windows_utils.PipeHandle)

        ovin = _overlapped.Overlapped()
        ovout = _overlapped.Overlapped()
        overr = _overlapped.Overlapped()

        ovin.WriteFile(p.stdin.handle, msg)
        ovout.ReadFile(p.stdout.handle, 100)
        overr.ReadFile(p.stderr.handle, 100)

        events = [ovin.event, ovout.event, overr.event]
        # Super-long timeout for slow buildbots.
        res = _winapi.WaitForMultipleObjects(events, True, 10000)
        self.assertEqual(res, _winapi.WAIT_OBJECT_0)
        self.assertFalse(ovout.pending)
        self.assertFalse(overr.pending)
        self.assertFalse(ovin.pending)

        self.assertEqual(ovin.getresult(), len(msg))
        out = ovout.getresult().rstrip()
        err = overr.getresult().rstrip()

        self.assertGreater(len(out), 0)
        self.assertGreater(len(err), 0)
        # allow for partial reads...
        self.assertTrue(msg.upper().rstrip().startswith(out))
        self.assertTrue(b"stderr".startswith(err))

        # The context manager calls wait() and closes resources
        with p:
            pass


if __name__ == '__main__':
    unittest.main()
back to top