Staging
v0.5.1
https://github.com/python/cpython
Raw File
Tip revision: d93605de7232da5e6a182fd1d5c220639e900159 authored by Łukasz Langa on 29 August 2019, 21:59:20 UTC
Python 3.8.0b4
Tip revision: d93605d
streams.py
__all__ = (
    'Stream', 'StreamMode',
    'open_connection', 'start_server',
    'connect', 'connect_read_pipe', 'connect_write_pipe',
    'StreamServer')

import enum
import socket
import sys
import warnings
import weakref

if hasattr(socket, 'AF_UNIX'):
    __all__ += ('open_unix_connection', 'start_unix_server',
                'connect_unix',
                'UnixStreamServer')

from . import coroutines
from . import events
from . import exceptions
from . import format_helpers
from . import protocols
from .log import logger
from . import tasks


_DEFAULT_LIMIT = 2 ** 16  # 64 KiB


class StreamMode(enum.Flag):
    READ = enum.auto()
    WRITE = enum.auto()
    READWRITE = READ | WRITE


def _ensure_can_read(mode):
    if not mode & StreamMode.READ:
        raise RuntimeError("The stream is write-only")


def _ensure_can_write(mode):
    if not mode & StreamMode.WRITE:
        raise RuntimeError("The stream is read-only")


class _ContextManagerHelper:
    __slots__ = ('_awaitable', '_result')

    def __init__(self, awaitable):
        self._awaitable = awaitable
        self._result = None

    def __await__(self):
        return self._awaitable.__await__()

    async def __aenter__(self):
        ret = await self._awaitable
        result = await ret.__aenter__()
        self._result = result
        return result

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        return await self._result.__aexit__(exc_type, exc_val, exc_tb)


def connect(host=None, port=None, *,
            limit=_DEFAULT_LIMIT,
            ssl=None, family=0, proto=0,
            flags=0, sock=None, local_addr=None,
            server_hostname=None,
            ssl_handshake_timeout=None,
            happy_eyeballs_delay=None, interleave=None):
    # Design note:
    # Don't use decorator approach but exilicit non-async
    # function to fail fast and explicitly
    # if passed arguments don't match the function signature
    return _ContextManagerHelper(_connect(host, port, limit,
                                          ssl, family, proto,
                                          flags, sock, local_addr,
                                          server_hostname,
                                          ssl_handshake_timeout,
                                          happy_eyeballs_delay,
                                          interleave))


async def _connect(host, port,
                  limit,
                  ssl, family, proto,
                  flags, sock, local_addr,
                  server_hostname,
                  ssl_handshake_timeout,
                  happy_eyeballs_delay, interleave):
    loop = events.get_running_loop()
    stream = Stream(mode=StreamMode.READWRITE,
                    limit=limit,
                    loop=loop,
                    _asyncio_internal=True)
    await loop.create_connection(
        lambda: _StreamProtocol(stream, loop=loop,
                                _asyncio_internal=True),
        host, port,
        ssl=ssl, family=family, proto=proto,
        flags=flags, sock=sock, local_addr=local_addr,
        server_hostname=server_hostname,
        ssl_handshake_timeout=ssl_handshake_timeout,
        happy_eyeballs_delay=happy_eyeballs_delay, interleave=interleave)
    return stream


def connect_read_pipe(pipe, *, limit=_DEFAULT_LIMIT):
    # Design note:
    # Don't use decorator approach but explicit non-async
    # function to fail fast and explicitly
    # if passed arguments don't match the function signature
    return _ContextManagerHelper(_connect_read_pipe(pipe, limit))


async def _connect_read_pipe(pipe, limit):
    loop = events.get_running_loop()
    stream = Stream(mode=StreamMode.READ,
                    limit=limit,
                    loop=loop,
                    _asyncio_internal=True)
    await loop.connect_read_pipe(
        lambda: _StreamProtocol(stream, loop=loop,
                                _asyncio_internal=True),
        pipe)
    return stream


def connect_write_pipe(pipe, *, limit=_DEFAULT_LIMIT):
    # Design note:
    # Don't use decorator approach but explicit non-async
    # function to fail fast and explicitly
    # if passed arguments don't match the function signature
    return _ContextManagerHelper(_connect_write_pipe(pipe, limit))


async def _connect_write_pipe(pipe, limit):
    loop = events.get_running_loop()
    stream = Stream(mode=StreamMode.WRITE,
                    limit=limit,
                    loop=loop,
                    _asyncio_internal=True)
    await loop.connect_write_pipe(
        lambda: _StreamProtocol(stream, loop=loop,
                                _asyncio_internal=True),
        pipe)
    return stream


async def open_connection(host=None, port=None, *,
                          loop=None, limit=_DEFAULT_LIMIT, **kwds):
    """A wrapper for create_connection() returning a (reader, writer) pair.

    The reader returned is a StreamReader instance; the writer is a
    StreamWriter instance.

    The arguments are all the usual arguments to create_connection()
    except protocol_factory; most common are positional host and port,
    with various optional keyword arguments following.

    Additional optional keyword arguments are loop (to set the event loop
    instance to use) and limit (to set the buffer limit passed to the
    StreamReader).

    (If you want to customize the StreamReader and/or
    StreamReaderProtocol classes, just copy the code -- there's
    really nothing special here except some convenience.)
    """
    warnings.warn("open_connection() is deprecated since Python 3.8 "
                  "in favor of connect(), and scheduled for removal "
                  "in Python 3.10",
                  DeprecationWarning,
                  stacklevel=2)
    if loop is None:
        loop = events.get_event_loop()
    else:
        warnings.warn("The loop argument is deprecated since Python 3.8, "
                      "and scheduled for removal in Python 3.10.",
                      DeprecationWarning, stacklevel=2)
    reader = StreamReader(limit=limit, loop=loop)
    protocol = StreamReaderProtocol(reader, loop=loop, _asyncio_internal=True)
    transport, _ = await loop.create_connection(
        lambda: protocol, host, port, **kwds)
    writer = StreamWriter(transport, protocol, reader, loop)
    return reader, writer


async def start_server(client_connected_cb, host=None, port=None, *,
                       loop=None, limit=_DEFAULT_LIMIT, **kwds):
    """Start a socket server, call back for each client connected.

    The first parameter, `client_connected_cb`, takes two parameters:
    client_reader, client_writer.  client_reader is a StreamReader
    object, while client_writer is a StreamWriter object.  This
    parameter can either be a plain callback function or a coroutine;
    if it is a coroutine, it will be automatically converted into a
    Task.

    The rest of the arguments are all the usual arguments to
    loop.create_server() except protocol_factory; most common are
    positional host and port, with various optional keyword arguments
    following.  The return value is the same as loop.create_server().

    Additional optional keyword arguments are loop (to set the event loop
    instance to use) and limit (to set the buffer limit passed to the
    StreamReader).

    The return value is the same as loop.create_server(), i.e. a
    Server object which can be used to stop the service.
    """
    warnings.warn("start_server() is deprecated since Python 3.8 "
                  "in favor of StreamServer(), and scheduled for removal "
                  "in Python 3.10",
                  DeprecationWarning,
                  stacklevel=2)
    if loop is None:
        loop = events.get_event_loop()
    else:
        warnings.warn("The loop argument is deprecated since Python 3.8, "
                      "and scheduled for removal in Python 3.10.",
                      DeprecationWarning, stacklevel=2)

    def factory():
        reader = StreamReader(limit=limit, loop=loop)
        protocol = StreamReaderProtocol(reader, client_connected_cb,
                                        loop=loop,
                                        _asyncio_internal=True)
        return protocol

    return await loop.create_server(factory, host, port, **kwds)


class _BaseStreamServer:
    # Design notes.
    # StreamServer and UnixStreamServer are exposed as FINAL classes,
    # not function factories.
    # async with serve(host, port) as server:
    #      server.start_serving()
    # looks ugly.
    # The class doesn't provide API for enumerating connected streams
    # It can be a subject for improvements in Python 3.9

    _server_impl = None

    def __init__(self, client_connected_cb,
                 /,
                 limit=_DEFAULT_LIMIT,
                 shutdown_timeout=60,
                 _asyncio_internal=False):
        if not _asyncio_internal:
            raise RuntimeError("_ServerStream is a private asyncio class")
        self._client_connected_cb = client_connected_cb
        self._limit = limit
        self._loop = events.get_running_loop()
        self._streams = {}
        self._shutdown_timeout = shutdown_timeout

    def __init_subclass__(cls):
        if not cls.__module__.startswith('asyncio.'):
            raise TypeError(f"asyncio.{cls.__name__} "
                            "class cannot be inherited from")

    async def bind(self):
        if self._server_impl is not None:
            return
        self._server_impl = await self._bind()

    def is_bound(self):
        return self._server_impl is not None

    @property
    def sockets(self):
        # multiple value for socket bound to both IPv4 and IPv6 families
        if self._server_impl is None:
            return ()
        return self._server_impl.sockets

    def is_serving(self):
        if self._server_impl is None:
            return False
        return self._server_impl.is_serving()

    async def start_serving(self):
        await self.bind()
        await self._server_impl.start_serving()

    async def serve_forever(self):
        await self.start_serving()
        await self._server_impl.serve_forever()

    async def close(self):
        if self._server_impl is None:
            return
        self._server_impl.close()
        streams = list(self._streams.keys())
        active_tasks = list(self._streams.values())
        if streams:
            await tasks.wait([stream.close() for stream in streams])
        await self._server_impl.wait_closed()
        self._server_impl = None
        await self._shutdown_active_tasks(active_tasks)

    async def abort(self):
        if self._server_impl is None:
            return
        self._server_impl.close()
        streams = list(self._streams.keys())
        active_tasks = list(self._streams.values())
        if streams:
            await tasks.wait([stream.abort() for stream in streams])
        await self._server_impl.wait_closed()
        self._server_impl = None
        await self._shutdown_active_tasks(active_tasks)

    async def __aenter__(self):
        await self.bind()
        return self

    async def __aexit__(self, exc_type, exc_value, exc_tb):
        await self.close()

    def _attach(self, stream, task):
        self._streams[stream] = task

    def _detach(self, stream, task):
        del self._streams[stream]

    async def _shutdown_active_tasks(self, active_tasks):
        if not active_tasks:
            return
        # NOTE: tasks finished with exception are reported
        # by the Task.__del__() method.
        done, pending = await tasks.wait(active_tasks,
                                         timeout=self._shutdown_timeout)
        if not pending:
            return
        for task in pending:
            task.cancel()
        done, pending = await tasks.wait(pending,
                                         timeout=self._shutdown_timeout)
        for task in pending:
            self._loop.call_exception_handler({
                "message": (f'{task!r} ignored cancellation request '
                            f'from a closing {self!r}'),
                "stream_server": self
            })

    def __repr__(self):
        ret = [f'{self.__class__.__name__}']
        if self.is_serving():
            ret.append('serving')
        if self.sockets:
            ret.append(f'sockets={self.sockets!r}')
        return '<' + ' '.join(ret) + '>'

    def __del__(self, _warn=warnings.warn):
        if self._server_impl is not None:
            _warn(f"unclosed stream server {self!r}",
                  ResourceWarning, source=self)
            self._server_impl.close()


class StreamServer(_BaseStreamServer):

    def __init__(self, client_connected_cb, /, host=None, port=None, *,
                 limit=_DEFAULT_LIMIT,
                 family=socket.AF_UNSPEC,
                 flags=socket.AI_PASSIVE, sock=None, backlog=100,
                 ssl=None, reuse_address=None, reuse_port=None,
                 ssl_handshake_timeout=None,
                 shutdown_timeout=60):
        super().__init__(client_connected_cb,
                         limit=limit,
                         shutdown_timeout=shutdown_timeout,
                         _asyncio_internal=True)
        self._host = host
        self._port = port
        self._family = family
        self._flags = flags
        self._sock = sock
        self._backlog = backlog
        self._ssl = ssl
        self._reuse_address = reuse_address
        self._reuse_port = reuse_port
        self._ssl_handshake_timeout = ssl_handshake_timeout

    async def _bind(self):
        def factory():
            protocol = _ServerStreamProtocol(self,
                                             self._limit,
                                             self._client_connected_cb,
                                             loop=self._loop,
                                             _asyncio_internal=True)
            return protocol
        return await self._loop.create_server(
            factory,
            self._host,
            self._port,
            start_serving=False,
            family=self._family,
            flags=self._flags,
            sock=self._sock,
            backlog=self._backlog,
            ssl=self._ssl,
            reuse_address=self._reuse_address,
            reuse_port=self._reuse_port,
            ssl_handshake_timeout=self._ssl_handshake_timeout)


if hasattr(socket, 'AF_UNIX'):
    # UNIX Domain Sockets are supported on this platform

    async def open_unix_connection(path=None, *,
                                   loop=None, limit=_DEFAULT_LIMIT, **kwds):
        """Similar to `open_connection` but works with UNIX Domain Sockets."""
        warnings.warn("open_unix_connection() is deprecated since Python 3.8 "
                      "in favor of connect_unix(), and scheduled for removal "
                      "in Python 3.10",
                      DeprecationWarning,
                      stacklevel=2)
        if loop is None:
            loop = events.get_event_loop()
        else:
            warnings.warn("The loop argument is deprecated since Python 3.8, "
                          "and scheduled for removal in Python 3.10.",
                          DeprecationWarning, stacklevel=2)
        reader = StreamReader(limit=limit, loop=loop)
        protocol = StreamReaderProtocol(reader, loop=loop,
                                        _asyncio_internal=True)
        transport, _ = await loop.create_unix_connection(
            lambda: protocol, path, **kwds)
        writer = StreamWriter(transport, protocol, reader, loop)
        return reader, writer


    def connect_unix(path=None, *,
                     limit=_DEFAULT_LIMIT,
                     ssl=None, sock=None,
                     server_hostname=None,
                     ssl_handshake_timeout=None):
        """Similar to `connect()` but works with UNIX Domain Sockets."""
        # Design note:
        # Don't use decorator approach but exilicit non-async
        # function to fail fast and explicitly
        # if passed arguments don't match the function signature
        return _ContextManagerHelper(_connect_unix(path,
                                                   limit,
                                                   ssl, sock,
                                                   server_hostname,
                                                   ssl_handshake_timeout))


    async def _connect_unix(path,
                           limit,
                           ssl, sock,
                           server_hostname,
                           ssl_handshake_timeout):
        """Similar to `connect()` but works with UNIX Domain Sockets."""
        loop = events.get_running_loop()
        stream = Stream(mode=StreamMode.READWRITE,
                        limit=limit,
                        loop=loop,
                        _asyncio_internal=True)
        await loop.create_unix_connection(
            lambda: _StreamProtocol(stream,
                                    loop=loop,
                                    _asyncio_internal=True),
            path,
            ssl=ssl,
            sock=sock,
            server_hostname=server_hostname,
            ssl_handshake_timeout=ssl_handshake_timeout)
        return stream


    async def start_unix_server(client_connected_cb, path=None, *,
                                loop=None, limit=_DEFAULT_LIMIT, **kwds):
        """Similar to `start_server` but works with UNIX Domain Sockets."""
        warnings.warn("start_unix_server() is deprecated since Python 3.8 "
                      "in favor of UnixStreamServer(), and scheduled "
                      "for removal in Python 3.10",
                      DeprecationWarning,
                      stacklevel=2)
        if loop is None:
            loop = events.get_event_loop()
        else:
            warnings.warn("The loop argument is deprecated since Python 3.8, "
                          "and scheduled for removal in Python 3.10.",
                          DeprecationWarning, stacklevel=2)

        def factory():
            reader = StreamReader(limit=limit, loop=loop)
            protocol = StreamReaderProtocol(reader, client_connected_cb,
                                            loop=loop,
                                            _asyncio_internal=True)
            return protocol

        return await loop.create_unix_server(factory, path, **kwds)

    class UnixStreamServer(_BaseStreamServer):

        def __init__(self, client_connected_cb, /, path=None, *,
                     limit=_DEFAULT_LIMIT,
                     sock=None,
                     backlog=100,
                     ssl=None,
                     ssl_handshake_timeout=None,
                     shutdown_timeout=60):
            super().__init__(client_connected_cb,
                             limit=limit,
                             shutdown_timeout=shutdown_timeout,
                             _asyncio_internal=True)
            self._path = path
            self._sock = sock
            self._backlog = backlog
            self._ssl = ssl
            self._ssl_handshake_timeout = ssl_handshake_timeout

        async def _bind(self):
            def factory():
                protocol = _ServerStreamProtocol(self,
                                                 self._limit,
                                                 self._client_connected_cb,
                                                 loop=self._loop,
                                                 _asyncio_internal=True)
                return protocol
            return await self._loop.create_unix_server(
                factory,
                self._path,
                start_serving=False,
                sock=self._sock,
                backlog=self._backlog,
                ssl=self._ssl,
                ssl_handshake_timeout=self._ssl_handshake_timeout)


class FlowControlMixin(protocols.Protocol):
    """Reusable flow control logic for StreamWriter.drain().

    This implements the protocol methods pause_writing(),
    resume_writing() and connection_lost().  If the subclass overrides
    these it must call the super methods.

    StreamWriter.drain() must wait for _drain_helper() coroutine.
    """

    def __init__(self, loop=None, *, _asyncio_internal=False):
        if loop is None:
            self._loop = events.get_event_loop()
        else:
            self._loop = loop
        if not _asyncio_internal:
            # NOTE:
            # Avoid inheritance from FlowControlMixin
            # Copy-paste the code to your project
            # if you need flow control helpers
            warnings.warn(f"{self.__class__} should be instaniated "
                          "by asyncio internals only, "
                          "please avoid its creation from user code",
                          DeprecationWarning)
        self._paused = False
        self._drain_waiter = None
        self._connection_lost = False

    def pause_writing(self):
        assert not self._paused
        self._paused = True
        if self._loop.get_debug():
            logger.debug("%r pauses writing", self)

    def resume_writing(self):
        assert self._paused
        self._paused = False
        if self._loop.get_debug():
            logger.debug("%r resumes writing", self)

        waiter = self._drain_waiter
        if waiter is not None:
            self._drain_waiter = None
            if not waiter.done():
                waiter.set_result(None)

    def connection_lost(self, exc):
        self._connection_lost = True
        # Wake up the writer if currently paused.
        if not self._paused:
            return
        waiter = self._drain_waiter
        if waiter is None:
            return
        self._drain_waiter = None
        if waiter.done():
            return
        if exc is None:
            waiter.set_result(None)
        else:
            waiter.set_exception(exc)

    async def _drain_helper(self):
        if self._connection_lost:
            raise ConnectionResetError('Connection lost')
        if not self._paused:
            return
        waiter = self._drain_waiter
        assert waiter is None or waiter.cancelled()
        waiter = self._loop.create_future()
        self._drain_waiter = waiter
        await waiter

    def _get_close_waiter(self, stream):
        raise NotImplementedError


# begin legacy stream APIs

class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
    """Helper class to adapt between Protocol and StreamReader.

    (This is a helper class instead of making StreamReader itself a
    Protocol subclass, because the StreamReader has other potential
    uses, and to prevent the user of the StreamReader to accidentally
    call inappropriate methods of the protocol.)
    """

    def __init__(self, stream_reader, client_connected_cb=None, loop=None,
                 *, _asyncio_internal=False):
        super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
        self._stream_reader = stream_reader
        self._stream_writer = None
        self._client_connected_cb = client_connected_cb
        self._over_ssl = False
        self._closed = self._loop.create_future()

    def connection_made(self, transport):
        self._stream_reader.set_transport(transport)
        self._over_ssl = transport.get_extra_info('sslcontext') is not None
        if self._client_connected_cb is not None:
            self._stream_writer = StreamWriter(transport, self,
                                               self._stream_reader,
                                               self._loop)
            res = self._client_connected_cb(self._stream_reader,
                                            self._stream_writer)
            if coroutines.iscoroutine(res):
                self._loop.create_task(res)

    def connection_lost(self, exc):
        if self._stream_reader is not None:
            if exc is None:
                self._stream_reader.feed_eof()
            else:
                self._stream_reader.set_exception(exc)
        if not self._closed.done():
            if exc is None:
                self._closed.set_result(None)
            else:
                self._closed.set_exception(exc)
        super().connection_lost(exc)
        self._stream_reader = None
        self._stream_writer = None

    def data_received(self, data):
        self._stream_reader.feed_data(data)

    def eof_received(self):
        self._stream_reader.feed_eof()
        if self._over_ssl:
            # Prevent a warning in SSLProtocol.eof_received:
            # "returning true from eof_received()
            # has no effect when using ssl"
            return False
        return True

    def __del__(self):
        # Prevent reports about unhandled exceptions.
        # Better than self._closed._log_traceback = False hack
        closed = self._closed
        if closed.done() and not closed.cancelled():
            closed.exception()


class StreamWriter:
    """Wraps a Transport.

    This exposes write(), writelines(), [can_]write_eof(),
    get_extra_info() and close().  It adds drain() which returns an
    optional Future on which you can wait for flow control.  It also
    adds a transport property which references the Transport
    directly.
    """

    def __init__(self, transport, protocol, reader, loop):
        self._transport = transport
        self._protocol = protocol
        # drain() expects that the reader has an exception() method
        assert reader is None or isinstance(reader, StreamReader)
        self._reader = reader
        self._loop = loop

    def __repr__(self):
        info = [self.__class__.__name__, f'transport={self._transport!r}']
        if self._reader is not None:
            info.append(f'reader={self._reader!r}')
        return '<{}>'.format(' '.join(info))

    @property
    def transport(self):
        return self._transport

    def write(self, data):
        self._transport.write(data)

    def writelines(self, data):
        self._transport.writelines(data)

    def write_eof(self):
        return self._transport.write_eof()

    def can_write_eof(self):
        return self._transport.can_write_eof()

    def close(self):
        return self._transport.close()

    def is_closing(self):
        return self._transport.is_closing()

    async def wait_closed(self):
        await self._protocol._closed

    def get_extra_info(self, name, default=None):
        return self._transport.get_extra_info(name, default)

    async def drain(self):
        """Flush the write buffer.

        The intended use is to write

          w.write(data)
          await w.drain()
        """
        if self._reader is not None:
            exc = self._reader.exception()
            if exc is not None:
                raise exc
        if self._transport.is_closing():
            # Yield to the event loop so connection_lost() may be
            # called.  Without this, _drain_helper() would return
            # immediately, and code that calls
            #     write(...); await drain()
            # in a loop would never call connection_lost(), so it
            # would not see an error when the socket is closed.
            await tasks.sleep(0, loop=self._loop)
        await self._protocol._drain_helper()


class StreamReader:

    def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
        # The line length limit is  a security feature;
        # it also doubles as half the buffer limit.

        if limit <= 0:
            raise ValueError('Limit cannot be <= 0')

        self._limit = limit
        if loop is None:
            self._loop = events.get_event_loop()
        else:
            self._loop = loop
        self._buffer = bytearray()
        self._eof = False    # Whether we're done.
        self._waiter = None  # A future used by _wait_for_data()
        self._exception = None
        self._transport = None
        self._paused = False

    def __repr__(self):
        info = ['StreamReader']
        if self._buffer:
            info.append(f'{len(self._buffer)} bytes')
        if self._eof:
            info.append('eof')
        if self._limit != _DEFAULT_LIMIT:
            info.append(f'limit={self._limit}')
        if self._waiter:
            info.append(f'waiter={self._waiter!r}')
        if self._exception:
            info.append(f'exception={self._exception!r}')
        if self._transport:
            info.append(f'transport={self._transport!r}')
        if self._paused:
            info.append('paused')
        return '<{}>'.format(' '.join(info))

    def exception(self):
        return self._exception

    def set_exception(self, exc):
        self._exception = exc

        waiter = self._waiter
        if waiter is not None:
            self._waiter = None
            if not waiter.cancelled():
                waiter.set_exception(exc)

    def _wakeup_waiter(self):
        """Wakeup read*() functions waiting for data or EOF."""
        waiter = self._waiter
        if waiter is not None:
            self._waiter = None
            if not waiter.cancelled():
                waiter.set_result(None)

    def set_transport(self, transport):
        assert self._transport is None, 'Transport already set'
        self._transport = transport

    def _maybe_resume_transport(self):
        if self._paused and len(self._buffer) <= self._limit:
            self._paused = False
            self._transport.resume_reading()

    def feed_eof(self):
        self._eof = True
        self._wakeup_waiter()

    def at_eof(self):
        """Return True if the buffer is empty and 'feed_eof' was called."""
        return self._eof and not self._buffer

    def feed_data(self, data):
        assert not self._eof, 'feed_data after feed_eof'

        if not data:
            return

        self._buffer.extend(data)
        self._wakeup_waiter()

        if (self._transport is not None and
                not self._paused and
                len(self._buffer) > 2 * self._limit):
            try:
                self._transport.pause_reading()
            except NotImplementedError:
                # The transport can't be paused.
                # We'll just have to buffer all data.
                # Forget the transport so we don't keep trying.
                self._transport = None
            else:
                self._paused = True

    async def _wait_for_data(self, func_name):
        """Wait until feed_data() or feed_eof() is called.

        If stream was paused, automatically resume it.
        """
        # StreamReader uses a future to link the protocol feed_data() method
        # to a read coroutine. Running two read coroutines at the same time
        # would have an unexpected behaviour. It would not possible to know
        # which coroutine would get the next data.
        if self._waiter is not None:
            raise RuntimeError(
                f'{func_name}() called while another coroutine is '
                f'already waiting for incoming data')

        assert not self._eof, '_wait_for_data after EOF'

        # Waiting for data while paused will make deadlock, so prevent it.
        # This is essential for readexactly(n) for case when n > self._limit.
        if self._paused:
            self._paused = False
            self._transport.resume_reading()

        self._waiter = self._loop.create_future()
        try:
            await self._waiter
        finally:
            self._waiter = None

    async def readline(self):
        """Read chunk of data from the stream until newline (b'\n') is found.

        On success, return chunk that ends with newline. If only partial
        line can be read due to EOF, return incomplete line without
        terminating newline. When EOF was reached while no bytes read, empty
        bytes object is returned.

        If limit is reached, ValueError will be raised. In that case, if
        newline was found, complete line including newline will be removed
        from internal buffer. Else, internal buffer will be cleared. Limit is
        compared against part of the line without newline.

        If stream was paused, this function will automatically resume it if
        needed.
        """
        sep = b'\n'
        seplen = len(sep)
        try:
            line = await self.readuntil(sep)
        except exceptions.IncompleteReadError as e:
            return e.partial
        except exceptions.LimitOverrunError as e:
            if self._buffer.startswith(sep, e.consumed):
                del self._buffer[:e.consumed + seplen]
            else:
                self._buffer.clear()
            self._maybe_resume_transport()
            raise ValueError(e.args[0])
        return line

    async def readuntil(self, separator=b'\n'):
        """Read data from the stream until ``separator`` is found.

        On success, the data and separator will be removed from the
        internal buffer (consumed). Returned data will include the
        separator at the end.

        Configured stream limit is used to check result. Limit sets the
        maximal length of data that can be returned, not counting the
        separator.

        If an EOF occurs and the complete separator is still not found,
        an IncompleteReadError exception will be raised, and the internal
        buffer will be reset.  The IncompleteReadError.partial attribute
        may contain the separator partially.

        If the data cannot be read because of over limit, a
        LimitOverrunError exception  will be raised, and the data
        will be left in the internal buffer, so it can be read again.
        """
        seplen = len(separator)
        if seplen == 0:
            raise ValueError('Separator should be at least one-byte string')

        if self._exception is not None:
            raise self._exception

        # Consume whole buffer except last bytes, which length is
        # one less than seplen. Let's check corner cases with
        # separator='SEPARATOR':
        # * we have received almost complete separator (without last
        #   byte). i.e buffer='some textSEPARATO'. In this case we
        #   can safely consume len(separator) - 1 bytes.
        # * last byte of buffer is first byte of separator, i.e.
        #   buffer='abcdefghijklmnopqrS'. We may safely consume
        #   everything except that last byte, but this require to
        #   analyze bytes of buffer that match partial separator.
        #   This is slow and/or require FSM. For this case our
        #   implementation is not optimal, since require rescanning
        #   of data that is known to not belong to separator. In
        #   real world, separator will not be so long to notice
        #   performance problems. Even when reading MIME-encoded
        #   messages :)

        # `offset` is the number of bytes from the beginning of the buffer
        # where there is no occurrence of `separator`.
        offset = 0

        # Loop until we find `separator` in the buffer, exceed the buffer size,
        # or an EOF has happened.
        while True:
            buflen = len(self._buffer)

            # Check if we now have enough data in the buffer for `separator` to
            # fit.
            if buflen - offset >= seplen:
                isep = self._buffer.find(separator, offset)

                if isep != -1:
                    # `separator` is in the buffer. `isep` will be used later
                    # to retrieve the data.
                    break

                # see upper comment for explanation.
                offset = buflen + 1 - seplen
                if offset > self._limit:
                    raise exceptions.LimitOverrunError(
                        'Separator is not found, and chunk exceed the limit',
                        offset)

            # Complete message (with full separator) may be present in buffer
            # even when EOF flag is set. This may happen when the last chunk
            # adds data which makes separator be found. That's why we check for
            # EOF *ater* inspecting the buffer.
            if self._eof:
                chunk = bytes(self._buffer)
                self._buffer.clear()
                raise exceptions.IncompleteReadError(chunk, None)

            # _wait_for_data() will resume reading if stream was paused.
            await self._wait_for_data('readuntil')

        if isep > self._limit:
            raise exceptions.LimitOverrunError(
                'Separator is found, but chunk is longer than limit', isep)

        chunk = self._buffer[:isep + seplen]
        del self._buffer[:isep + seplen]
        self._maybe_resume_transport()
        return bytes(chunk)

    async def read(self, n=-1):
        """Read up to `n` bytes from the stream.

        If n is not provided, or set to -1, read until EOF and return all read
        bytes. If the EOF was received and the internal buffer is empty, return
        an empty bytes object.

        If n is zero, return empty bytes object immediately.

        If n is positive, this function try to read `n` bytes, and may return
        less or equal bytes than requested, but at least one byte. If EOF was
        received before any byte is read, this function returns empty byte
        object.

        Returned value is not limited with limit, configured at stream
        creation.

        If stream was paused, this function will automatically resume it if
        needed.
        """

        if self._exception is not None:
            raise self._exception

        if n == 0:
            return b''

        if n < 0:
            # This used to just loop creating a new waiter hoping to
            # collect everything in self._buffer, but that would
            # deadlock if the subprocess sends more than self.limit
            # bytes.  So just call self.read(self._limit) until EOF.
            blocks = []
            while True:
                block = await self.read(self._limit)
                if not block:
                    break
                blocks.append(block)
            return b''.join(blocks)

        if not self._buffer and not self._eof:
            await self._wait_for_data('read')

        # This will work right even if buffer is less than n bytes
        data = bytes(self._buffer[:n])
        del self._buffer[:n]

        self._maybe_resume_transport()
        return data

    async def readexactly(self, n):
        """Read exactly `n` bytes.

        Raise an IncompleteReadError if EOF is reached before `n` bytes can be
        read. The IncompleteReadError.partial attribute of the exception will
        contain the partial read bytes.

        if n is zero, return empty bytes object.

        Returned value is not limited with limit, configured at stream
        creation.

        If stream was paused, this function will automatically resume it if
        needed.
        """
        if n < 0:
            raise ValueError('readexactly size can not be less than zero')

        if self._exception is not None:
            raise self._exception

        if n == 0:
            return b''

        while len(self._buffer) < n:
            if self._eof:
                incomplete = bytes(self._buffer)
                self._buffer.clear()
                raise exceptions.IncompleteReadError(incomplete, n)

            await self._wait_for_data('readexactly')

        if len(self._buffer) == n:
            data = bytes(self._buffer)
            self._buffer.clear()
        else:
            data = bytes(self._buffer[:n])
            del self._buffer[:n]
        self._maybe_resume_transport()
        return data

    def __aiter__(self):
        return self

    async def __anext__(self):
        val = await self.readline()
        if val == b'':
            raise StopAsyncIteration
        return val


# end legacy stream APIs


class _BaseStreamProtocol(FlowControlMixin, protocols.Protocol):
    """Helper class to adapt between Protocol and StreamReader.

    (This is a helper class instead of making StreamReader itself a
    Protocol subclass, because the StreamReader has other potential
    uses, and to prevent the user of the StreamReader to accidentally
    call inappropriate methods of the protocol.)
    """

    _stream = None  # initialized in derived classes

    def __init__(self, loop=None,
                 *, _asyncio_internal=False):
        super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
        self._transport = None
        self._over_ssl = False
        self._closed = self._loop.create_future()

    def connection_made(self, transport):
        self._transport = transport
        self._over_ssl = transport.get_extra_info('sslcontext') is not None

    def connection_lost(self, exc):
        stream = self._stream
        if stream is not None:
            if exc is None:
                stream.feed_eof()
            else:
                stream.set_exception(exc)
        if not self._closed.done():
            if exc is None:
                self._closed.set_result(None)
            else:
                self._closed.set_exception(exc)
        super().connection_lost(exc)
        self._transport = None

    def data_received(self, data):
        stream = self._stream
        if stream is not None:
            stream.feed_data(data)

    def eof_received(self):
        stream = self._stream
        if stream is not None:
            stream.feed_eof()
        if self._over_ssl:
            # Prevent a warning in SSLProtocol.eof_received:
            # "returning true from eof_received()
            # has no effect when using ssl"
            return False
        return True

    def _get_close_waiter(self, stream):
        return self._closed

    def __del__(self):
        # Prevent reports about unhandled exceptions.
        # Better than self._closed._log_traceback = False hack
        closed = self._get_close_waiter(self._stream)
        if closed.done() and not closed.cancelled():
            closed.exception()


class _StreamProtocol(_BaseStreamProtocol):
    _source_traceback = None

    def __init__(self, stream, loop=None,
                 *, _asyncio_internal=False):
        super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
        self._source_traceback = stream._source_traceback
        self._stream_wr = weakref.ref(stream, self._on_gc)
        self._reject_connection = False

    def _on_gc(self, wr):
        transport = self._transport
        if transport is not None:
            # connection_made was called
            context = {
                'message': ('An open stream object is being garbage '
                            'collected; call "stream.close()" explicitly.')
            }
            if self._source_traceback:
                context['source_traceback'] = self._source_traceback
            self._loop.call_exception_handler(context)
            transport.abort()
        else:
            self._reject_connection = True
        self._stream_wr = None

    @property
    def _stream(self):
        if self._stream_wr is None:
            return None
        return self._stream_wr()

    def connection_made(self, transport):
        if self._reject_connection:
            context = {
                'message': ('An open stream was garbage collected prior to '
                            'establishing network connection; '
                            'call "stream.close()" explicitly.')
            }
            if self._source_traceback:
                context['source_traceback'] = self._source_traceback
            self._loop.call_exception_handler(context)
            transport.abort()
            return
        super().connection_made(transport)
        stream = self._stream
        if stream is None:
            return
        stream.set_transport(transport)
        stream._protocol = self

    def connection_lost(self, exc):
        super().connection_lost(exc)
        self._stream_wr = None


class _ServerStreamProtocol(_BaseStreamProtocol):
    def __init__(self, server, limit, client_connected_cb, loop=None,
                 *, _asyncio_internal=False):
        super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
        assert self._closed
        self._client_connected_cb = client_connected_cb
        self._limit = limit
        self._server = server
        self._task = None

    def connection_made(self, transport):
        super().connection_made(transport)
        stream = Stream(mode=StreamMode.READWRITE,
                        transport=transport,
                        protocol=self,
                        limit=self._limit,
                        loop=self._loop,
                        is_server_side=True,
                        _asyncio_internal=True)
        self._stream = stream
        # If self._client_connected_cb(self._stream) fails
        # the exception is logged by transport
        self._task = self._loop.create_task(
            self._client_connected_cb(self._stream))
        self._server._attach(stream, self._task)

    def connection_lost(self, exc):
        super().connection_lost(exc)
        self._server._detach(self._stream, self._task)
        self._stream = None


class _OptionalAwait:
    # The class doesn't create a coroutine
    # if not awaited
    # It prevents "coroutine is never awaited" message

    __slots___ = ('_method',)

    def __init__(self, method):
        self._method = method

    def __await__(self):
        return self._method().__await__()


class Stream:
    """Wraps a Transport.

    This exposes write(), writelines(), [can_]write_eof(),
    get_extra_info() and close().  It adds drain() which returns an
    optional Future on which you can wait for flow control.  It also
    adds a transport property which references the Transport
    directly.
    """

    _source_traceback = None

    def __init__(self, mode, *,
                 transport=None,
                 protocol=None,
                 loop=None,
                 limit=_DEFAULT_LIMIT,
                 is_server_side=False,
                 _asyncio_internal=False):
        if not _asyncio_internal:
            raise RuntimeError(f"{self.__class__} should be instantiated "
                               "by asyncio internals only")
        self._mode = mode
        self._transport = transport
        self._protocol = protocol
        self._is_server_side = is_server_side

        # The line length limit is  a security feature;
        # it also doubles as half the buffer limit.

        if limit <= 0:
            raise ValueError('Limit cannot be <= 0')

        self._limit = limit
        if loop is None:
            self._loop = events.get_event_loop()
        else:
            self._loop = loop
        self._buffer = bytearray()
        self._eof = False    # Whether we're done.
        self._waiter = None  # A future used by _wait_for_data()
        self._exception = None
        self._paused = False
        self._complete_fut = self._loop.create_future()
        self._complete_fut.set_result(None)

        if self._loop.get_debug():
            self._source_traceback = format_helpers.extract_stack(
                sys._getframe(1))

    def __repr__(self):
        info = [self.__class__.__name__]
        info.append(f'mode={self._mode}')
        if self._buffer:
            info.append(f'{len(self._buffer)} bytes')
        if self._eof:
            info.append('eof')
        if self._limit != _DEFAULT_LIMIT:
            info.append(f'limit={self._limit}')
        if self._waiter:
            info.append(f'waiter={self._waiter!r}')
        if self._exception:
            info.append(f'exception={self._exception!r}')
        if self._transport:
            info.append(f'transport={self._transport!r}')
        if self._paused:
            info.append('paused')
        return '<{}>'.format(' '.join(info))

    @property
    def mode(self):
        return self._mode

    def is_server_side(self):
        return self._is_server_side

    @property
    def transport(self):
        return self._transport

    def write(self, data):
        _ensure_can_write(self._mode)
        self._transport.write(data)
        return self._fast_drain()

    def writelines(self, data):
        _ensure_can_write(self._mode)
        self._transport.writelines(data)
        return self._fast_drain()

    def _fast_drain(self):
        # The helper tries to use fast-path to return already existing
        # complete future object if underlying transport is not paused
        #and actual waiting for writing resume is not needed
        exc = self.exception()
        if exc is not None:
            fut = self._loop.create_future()
            fut.set_exception(exc)
            return fut
        if not self._transport.is_closing():
            if self._protocol._connection_lost:
                fut = self._loop.create_future()
                fut.set_exception(ConnectionResetError('Connection lost'))
                return fut
            if not self._protocol._paused:
                # fast path, the stream is not paused
                # no need to wait for resume signal
                return self._complete_fut
        return _OptionalAwait(self.drain)

    def write_eof(self):
        _ensure_can_write(self._mode)
        return self._transport.write_eof()

    def can_write_eof(self):
        if not self._mode.is_write():
            return False
        return self._transport.can_write_eof()

    def close(self):
        self._transport.close()
        return _OptionalAwait(self.wait_closed)

    def is_closing(self):
        return self._transport.is_closing()

    async def abort(self):
        self._transport.abort()
        await self.wait_closed()

    async def wait_closed(self):
        await self._protocol._get_close_waiter(self)

    def get_extra_info(self, name, default=None):
        return self._transport.get_extra_info(name, default)

    async def drain(self):
        """Flush the write buffer.

        The intended use is to write

          w.write(data)
          await w.drain()
        """
        _ensure_can_write(self._mode)
        exc = self.exception()
        if exc is not None:
            raise exc
        if self._transport.is_closing():
            # Wait for protocol.connection_lost() call
            # Raise connection closing error if any,
            # ConnectionResetError otherwise
            await tasks.sleep(0)
        await self._protocol._drain_helper()

    async def sendfile(self, file, offset=0, count=None, *, fallback=True):
        await self.drain()  # check for stream mode and exceptions
        return await self._loop.sendfile(self._transport, file,
                                         offset, count, fallback=fallback)

    async def start_tls(self, sslcontext, *,
                        server_hostname=None,
                        ssl_handshake_timeout=None):
        await self.drain()  # check for stream mode and exceptions
        transport = await self._loop.start_tls(
            self._transport, self._protocol, sslcontext,
            server_side=self._is_server_side,
            server_hostname=server_hostname,
            ssl_handshake_timeout=ssl_handshake_timeout)
        self._transport = transport
        self._protocol._transport = transport
        self._protocol._over_ssl = True

    def exception(self):
        return self._exception

    def set_exception(self, exc):
        self._exception = exc

        waiter = self._waiter
        if waiter is not None:
            self._waiter = None
            if not waiter.cancelled():
                waiter.set_exception(exc)

    def _wakeup_waiter(self):
        """Wakeup read*() functions waiting for data or EOF."""
        waiter = self._waiter
        if waiter is not None:
            self._waiter = None
            if not waiter.cancelled():
                waiter.set_result(None)

    def set_transport(self, transport):
        if transport is self._transport:
            return
        assert self._transport is None, 'Transport already set'
        self._transport = transport

    def _maybe_resume_transport(self):
        if self._paused and len(self._buffer) <= self._limit:
            self._paused = False
            self._transport.resume_reading()

    def feed_eof(self):
        self._eof = True
        self._wakeup_waiter()

    def at_eof(self):
        """Return True if the buffer is empty and 'feed_eof' was called."""
        return self._eof and not self._buffer

    def feed_data(self, data):
        _ensure_can_read(self._mode)
        assert not self._eof, 'feed_data after feed_eof'

        if not data:
            return

        self._buffer.extend(data)
        self._wakeup_waiter()

        if (self._transport is not None and
                not self._paused and
                len(self._buffer) > 2 * self._limit):
            try:
                self._transport.pause_reading()
            except NotImplementedError:
                # The transport can't be paused.
                # We'll just have to buffer all data.
                # Forget the transport so we don't keep trying.
                self._transport = None
            else:
                self._paused = True

    async def _wait_for_data(self, func_name):
        """Wait until feed_data() or feed_eof() is called.

        If stream was paused, automatically resume it.
        """
        # StreamReader uses a future to link the protocol feed_data() method
        # to a read coroutine. Running two read coroutines at the same time
        # would have an unexpected behaviour. It would not possible to know
        # which coroutine would get the next data.
        if self._waiter is not None:
            raise RuntimeError(
                f'{func_name}() called while another coroutine is '
                f'already waiting for incoming data')

        assert not self._eof, '_wait_for_data after EOF'

        # Waiting for data while paused will make deadlock, so prevent it.
        # This is essential for readexactly(n) for case when n > self._limit.
        if self._paused:
            self._paused = False
            self._transport.resume_reading()

        self._waiter = self._loop.create_future()
        try:
            await self._waiter
        finally:
            self._waiter = None

    async def readline(self):
        """Read chunk of data from the stream until newline (b'\n') is found.

        On success, return chunk that ends with newline. If only partial
        line can be read due to EOF, return incomplete line without
        terminating newline. When EOF was reached while no bytes read, empty
        bytes object is returned.

        If limit is reached, ValueError will be raised. In that case, if
        newline was found, complete line including newline will be removed
        from internal buffer. Else, internal buffer will be cleared. Limit is
        compared against part of the line without newline.

        If stream was paused, this function will automatically resume it if
        needed.
        """
        _ensure_can_read(self._mode)
        sep = b'\n'
        seplen = len(sep)
        try:
            line = await self.readuntil(sep)
        except exceptions.IncompleteReadError as e:
            return e.partial
        except exceptions.LimitOverrunError as e:
            if self._buffer.startswith(sep, e.consumed):
                del self._buffer[:e.consumed + seplen]
            else:
                self._buffer.clear()
            self._maybe_resume_transport()
            raise ValueError(e.args[0])
        return line

    async def readuntil(self, separator=b'\n'):
        """Read data from the stream until ``separator`` is found.

        On success, the data and separator will be removed from the
        internal buffer (consumed). Returned data will include the
        separator at the end.

        Configured stream limit is used to check result. Limit sets the
        maximal length of data that can be returned, not counting the
        separator.

        If an EOF occurs and the complete separator is still not found,
        an IncompleteReadError exception will be raised, and the internal
        buffer will be reset.  The IncompleteReadError.partial attribute
        may contain the separator partially.

        If the data cannot be read because of over limit, a
        LimitOverrunError exception  will be raised, and the data
        will be left in the internal buffer, so it can be read again.
        """
        _ensure_can_read(self._mode)
        seplen = len(separator)
        if seplen == 0:
            raise ValueError('Separator should be at least one-byte string')

        if self._exception is not None:
            raise self._exception

        # Consume whole buffer except last bytes, which length is
        # one less than seplen. Let's check corner cases with
        # separator='SEPARATOR':
        # * we have received almost complete separator (without last
        #   byte). i.e buffer='some textSEPARATO'. In this case we
        #   can safely consume len(separator) - 1 bytes.
        # * last byte of buffer is first byte of separator, i.e.
        #   buffer='abcdefghijklmnopqrS'. We may safely consume
        #   everything except that last byte, but this require to
        #   analyze bytes of buffer that match partial separator.
        #   This is slow and/or require FSM. For this case our
        #   implementation is not optimal, since require rescanning
        #   of data that is known to not belong to separator. In
        #   real world, separator will not be so long to notice
        #   performance problems. Even when reading MIME-encoded
        #   messages :)

        # `offset` is the number of bytes from the beginning of the buffer
        # where there is no occurrence of `separator`.
        offset = 0

        # Loop until we find `separator` in the buffer, exceed the buffer size,
        # or an EOF has happened.
        while True:
            buflen = len(self._buffer)

            # Check if we now have enough data in the buffer for `separator` to
            # fit.
            if buflen - offset >= seplen:
                isep = self._buffer.find(separator, offset)

                if isep != -1:
                    # `separator` is in the buffer. `isep` will be used later
                    # to retrieve the data.
                    break

                # see upper comment for explanation.
                offset = buflen + 1 - seplen
                if offset > self._limit:
                    raise exceptions.LimitOverrunError(
                        'Separator is not found, and chunk exceed the limit',
                        offset)

            # Complete message (with full separator) may be present in buffer
            # even when EOF flag is set. This may happen when the last chunk
            # adds data which makes separator be found. That's why we check for
            # EOF *ater* inspecting the buffer.
            if self._eof:
                chunk = bytes(self._buffer)
                self._buffer.clear()
                raise exceptions.IncompleteReadError(chunk, None)

            # _wait_for_data() will resume reading if stream was paused.
            await self._wait_for_data('readuntil')

        if isep > self._limit:
            raise exceptions.LimitOverrunError(
                'Separator is found, but chunk is longer than limit', isep)

        chunk = self._buffer[:isep + seplen]
        del self._buffer[:isep + seplen]
        self._maybe_resume_transport()
        return bytes(chunk)

    async def read(self, n=-1):
        """Read up to `n` bytes from the stream.

        If n is not provided, or set to -1, read until EOF and return all read
        bytes. If the EOF was received and the internal buffer is empty, return
        an empty bytes object.

        If n is zero, return empty bytes object immediately.

        If n is positive, this function try to read `n` bytes, and may return
        less or equal bytes than requested, but at least one byte. If EOF was
        received before any byte is read, this function returns empty byte
        object.

        Returned value is not limited with limit, configured at stream
        creation.

        If stream was paused, this function will automatically resume it if
        needed.
        """
        _ensure_can_read(self._mode)

        if self._exception is not None:
            raise self._exception

        if n == 0:
            return b''

        if n < 0:
            # This used to just loop creating a new waiter hoping to
            # collect everything in self._buffer, but that would
            # deadlock if the subprocess sends more than self.limit
            # bytes.  So just call self.read(self._limit) until EOF.
            blocks = []
            while True:
                block = await self.read(self._limit)
                if not block:
                    break
                blocks.append(block)
            return b''.join(blocks)

        if not self._buffer and not self._eof:
            await self._wait_for_data('read')

        # This will work right even if buffer is less than n bytes
        data = bytes(self._buffer[:n])
        del self._buffer[:n]

        self._maybe_resume_transport()
        return data

    async def readexactly(self, n):
        """Read exactly `n` bytes.

        Raise an IncompleteReadError if EOF is reached before `n` bytes can be
        read. The IncompleteReadError.partial attribute of the exception will
        contain the partial read bytes.

        if n is zero, return empty bytes object.

        Returned value is not limited with limit, configured at stream
        creation.

        If stream was paused, this function will automatically resume it if
        needed.
        """
        _ensure_can_read(self._mode)
        if n < 0:
            raise ValueError('readexactly size can not be less than zero')

        if self._exception is not None:
            raise self._exception

        if n == 0:
            return b''

        while len(self._buffer) < n:
            if self._eof:
                incomplete = bytes(self._buffer)
                self._buffer.clear()
                raise exceptions.IncompleteReadError(incomplete, n)

            await self._wait_for_data('readexactly')

        if len(self._buffer) == n:
            data = bytes(self._buffer)
            self._buffer.clear()
        else:
            data = bytes(self._buffer[:n])
            del self._buffer[:n]
        self._maybe_resume_transport()
        return data

    def __aiter__(self):
        _ensure_can_read(self._mode)
        return self

    async def __anext__(self):
        val = await self.readline()
        if val == b'':
            raise StopAsyncIteration
        return val

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.close()
back to top