Staging
v0.8.1
https://github.com/python/cpython
Raw File
Tip revision: 4893861ab58544091feab2946648245ac1271e2f authored by Ned Deily on 13 October 2018, 07:01:32 UTC
3.6.7rc2
Tip revision: 4893861
test_pep492.py
"""Tests support for new syntax introduced by PEP 492."""

import collections.abc
import types
import unittest

try:
    from test import support
except ImportError:
    from asyncio import test_support as support
from unittest import mock

import asyncio
from asyncio import test_utils


class BaseTest(test_utils.TestCase):

    def setUp(self):
        super().setUp()
        self.loop = asyncio.BaseEventLoop()
        self.loop._process_events = mock.Mock()
        self.loop._selector = mock.Mock()
        self.loop._selector.select.return_value = ()
        self.set_event_loop(self.loop)


class LockTests(BaseTest):

    def test_context_manager_async_with(self):
        primitives = [
            asyncio.Lock(loop=self.loop),
            asyncio.Condition(loop=self.loop),
            asyncio.Semaphore(loop=self.loop),
            asyncio.BoundedSemaphore(loop=self.loop),
        ]

        async def test(lock):
            await asyncio.sleep(0.01, loop=self.loop)
            self.assertFalse(lock.locked())
            async with lock as _lock:
                self.assertIs(_lock, None)
                self.assertTrue(lock.locked())
                await asyncio.sleep(0.01, loop=self.loop)
                self.assertTrue(lock.locked())
            self.assertFalse(lock.locked())

        for primitive in primitives:
            self.loop.run_until_complete(test(primitive))
            self.assertFalse(primitive.locked())

    def test_context_manager_with_await(self):
        primitives = [
            asyncio.Lock(loop=self.loop),
            asyncio.Condition(loop=self.loop),
            asyncio.Semaphore(loop=self.loop),
            asyncio.BoundedSemaphore(loop=self.loop),
        ]

        async def test(lock):
            await asyncio.sleep(0.01, loop=self.loop)
            self.assertFalse(lock.locked())
            with await lock as _lock:
                self.assertIs(_lock, None)
                self.assertTrue(lock.locked())
                await asyncio.sleep(0.01, loop=self.loop)
                self.assertTrue(lock.locked())
            self.assertFalse(lock.locked())

        for primitive in primitives:
            self.loop.run_until_complete(test(primitive))
            self.assertFalse(primitive.locked())


class StreamReaderTests(BaseTest):

    def test_readline(self):
        DATA = b'line1\nline2\nline3'

        stream = asyncio.StreamReader(loop=self.loop)
        stream.feed_data(DATA)
        stream.feed_eof()

        async def reader():
            data = []
            async for line in stream:
                data.append(line)
            return data

        data = self.loop.run_until_complete(reader())
        self.assertEqual(data, [b'line1\n', b'line2\n', b'line3'])


class CoroutineTests(BaseTest):

    def test_iscoroutine(self):
        async def foo(): pass

        f = foo()
        try:
            self.assertTrue(asyncio.iscoroutine(f))
        finally:
            f.close() # silence warning

        # Test that asyncio.iscoroutine() uses collections.abc.Coroutine
        class FakeCoro:
            def send(self, value): pass
            def throw(self, typ, val=None, tb=None): pass
            def close(self): pass
            def __await__(self): yield

        self.assertTrue(asyncio.iscoroutine(FakeCoro()))

    def test_iscoroutinefunction(self):
        async def foo(): pass
        self.assertTrue(asyncio.iscoroutinefunction(foo))

    def test_function_returning_awaitable(self):
        class Awaitable:
            def __await__(self):
                return ('spam',)

        @asyncio.coroutine
        def func():
            return Awaitable()

        coro = func()
        self.assertEqual(coro.send(None), 'spam')
        coro.close()

    def test_async_def_coroutines(self):
        async def bar():
            return 'spam'
        async def foo():
            return await bar()

        # production mode
        data = self.loop.run_until_complete(foo())
        self.assertEqual(data, 'spam')

        # debug mode
        self.loop.set_debug(True)
        data = self.loop.run_until_complete(foo())
        self.assertEqual(data, 'spam')

    @mock.patch('asyncio.coroutines.logger')
    def test_async_def_wrapped(self, m_log):
        async def foo():
            pass
        async def start():
            foo_coro = foo()
            self.assertRegex(
                repr(foo_coro),
                r'<CoroWrapper .*\.foo\(\) running at .*pep492.*>')

            with support.check_warnings((r'.*foo.*was never',
                                         RuntimeWarning)):
                foo_coro = None
                support.gc_collect()
                self.assertTrue(m_log.error.called)
                message = m_log.error.call_args[0][0]
                self.assertRegex(message,
                                 r'CoroWrapper.*foo.*was never')

        self.loop.set_debug(True)
        self.loop.run_until_complete(start())

        async def start():
            foo_coro = foo()
            task = asyncio.ensure_future(foo_coro, loop=self.loop)
            self.assertRegex(repr(task), r'Task.*foo.*running')

        self.loop.run_until_complete(start())


    def test_types_coroutine(self):
        def gen():
            yield from ()
            return 'spam'

        @types.coroutine
        def func():
            return gen()

        async def coro():
            wrapper = func()
            self.assertIsInstance(wrapper, types._GeneratorWrapper)
            return await wrapper

        data = self.loop.run_until_complete(coro())
        self.assertEqual(data, 'spam')

    def test_task_print_stack(self):
        T = None

        async def foo():
            f = T.get_stack(limit=1)
            try:
                self.assertEqual(f[0].f_code.co_name, 'foo')
            finally:
                f = None

        async def runner():
            nonlocal T
            T = asyncio.ensure_future(foo(), loop=self.loop)
            await T

        self.loop.run_until_complete(runner())

    def test_double_await(self):
        async def afunc():
            await asyncio.sleep(0.1, loop=self.loop)

        async def runner():
            coro = afunc()
            t = asyncio.Task(coro, loop=self.loop)
            try:
                await asyncio.sleep(0, loop=self.loop)
                await coro
            finally:
                t.cancel()

        self.loop.set_debug(True)
        with self.assertRaisesRegex(
            RuntimeError,
            r'Cannot await.*test_double_await.*\bafunc\b.*while.*\bsleep\b'):

            self.loop.run_until_complete(runner())


if __name__ == '__main__':
    unittest.main()
back to top