Staging
v0.5.1
https://github.com/python/cpython
Raw File
Tip revision: b484871ba707ae2f9898e22effcd297c19d4c5ea authored by Ɓukasz Langa on 09 June 2020, 18:52:10 UTC
Python 3.9.0b3
Tip revision: b484871
win_utils.py
import _winapi
import math
import msvcrt
import os
import subprocess
import uuid
import winreg
from test import support
from test.libregrtest.utils import print_warning


# Max size of asynchronous reads
BUFSIZE = 8192
# Seconds per measurement
SAMPLING_INTERVAL = 1
# Exponential damping factor to compute exponentially weighted moving average
# on 1 minute (60 seconds)
LOAD_FACTOR_1 = 1 / math.exp(SAMPLING_INTERVAL / 60)
# Initialize the load using the arithmetic mean of the first NVALUE values
# of the Processor Queue Length
NVALUE = 5
# Windows registry subkey of HKEY_LOCAL_MACHINE where the counter names
# of typeperf are registered
COUNTER_REGISTRY_KEY = (r"SOFTWARE\Microsoft\Windows NT\CurrentVersion"
                        r"\Perflib\CurrentLanguage")


class WindowsLoadTracker():
    """
    This class asynchronously interacts with the `typeperf` command to read
    the system load on Windows. Multiprocessing and threads can't be used
    here because they interfere with the test suite's cases for those
    modules.
    """

    def __init__(self):
        self._values = []
        self._load = None
        self._buffer = ''
        self._popen = None
        self.start()

    def start(self):
        # Create a named pipe which allows for asynchronous IO in Windows
        pipe_name =  r'\\.\pipe\typeperf_output_' + str(uuid.uuid4())

        open_mode =  _winapi.PIPE_ACCESS_INBOUND
        open_mode |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
        open_mode |= _winapi.FILE_FLAG_OVERLAPPED

        # This is the read end of the pipe, where we will be grabbing output
        self.pipe = _winapi.CreateNamedPipe(
            pipe_name, open_mode, _winapi.PIPE_WAIT,
            1, BUFSIZE, BUFSIZE, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL
        )
        # The write end of the pipe which is passed to the created process
        pipe_write_end = _winapi.CreateFile(
            pipe_name, _winapi.GENERIC_WRITE, 0, _winapi.NULL,
            _winapi.OPEN_EXISTING, 0, _winapi.NULL
        )
        # Open up the handle as a python file object so we can pass it to
        # subprocess
        command_stdout = msvcrt.open_osfhandle(pipe_write_end, 0)

        # Connect to the read end of the pipe in overlap/async mode
        overlap = _winapi.ConnectNamedPipe(self.pipe, overlapped=True)
        overlap.GetOverlappedResult(True)

        # Spawn off the load monitor
        counter_name = self._get_counter_name()
        command = ['typeperf', counter_name, '-si', str(SAMPLING_INTERVAL)]
        self._popen = subprocess.Popen(' '.join(command), stdout=command_stdout, cwd=support.SAVEDCWD)

        # Close our copy of the write end of the pipe
        os.close(command_stdout)

    def _get_counter_name(self):
        # accessing the registry to get the counter localization name
        with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, COUNTER_REGISTRY_KEY) as perfkey:
            counters = winreg.QueryValueEx(perfkey, 'Counter')[0]

        # Convert [key1, value1, key2, value2, ...] list
        # to {key1: value1, key2: value2, ...} dict
        counters = iter(counters)
        counters_dict = dict(zip(counters, counters))

        # System counter has key '2' and Processor Queue Length has key '44'
        system = counters_dict['2']
        process_queue_length = counters_dict['44']
        return f'"\\{system}\\{process_queue_length}"'

    def close(self, kill=True):
        if self._popen is None:
            return

        self._load = None

        if kill:
            self._popen.kill()
        self._popen.wait()
        self._popen = None

    def __del__(self):
        self.close()

    def _parse_line(self, line):
        # typeperf outputs in a CSV format like this:
        # "07/19/2018 01:32:26.605","3.000000"
        # (date, process queue length)
        tokens = line.split(',')
        if len(tokens) != 2:
            raise ValueError

        value = tokens[1]
        if not value.startswith('"') or not value.endswith('"'):
            raise ValueError
        value = value[1:-1]
        return float(value)

    def _read_lines(self):
        overlapped, _ = _winapi.ReadFile(self.pipe, BUFSIZE, True)
        bytes_read, res = overlapped.GetOverlappedResult(False)
        if res != 0:
            return ()

        output = overlapped.getbuffer()
        output = output.decode('oem', 'replace')
        output = self._buffer + output
        lines = output.splitlines(True)

        # bpo-36670: typeperf only writes a newline *before* writing a value,
        # not after. Sometimes, the written line in incomplete (ex: only
        # timestamp, without the process queue length). Only pass the last line
        # to the parser if it's a valid value, otherwise store it in
        # self._buffer.
        try:
            self._parse_line(lines[-1])
        except ValueError:
            self._buffer = lines.pop(-1)
        else:
            self._buffer = ''

        return lines

    def getloadavg(self):
        if self._popen is None:
            return None

        returncode = self._popen.poll()
        if returncode is not None:
            self.close(kill=False)
            return None

        try:
            lines = self._read_lines()
        except BrokenPipeError:
            self.close()
            return None

        for line in lines:
            line = line.rstrip()

            # Ignore the initial header:
            # "(PDH-CSV 4.0)","\\\\WIN\\System\\Processor Queue Length"
            if 'PDH-CSV' in line:
                continue

            # Ignore blank lines
            if not line:
                continue

            try:
                processor_queue_length = self._parse_line(line)
            except ValueError:
                print_warning("Failed to parse typeperf output: %a" % line)
                continue

            # We use an exponentially weighted moving average, imitating the
            # load calculation on Unix systems.
            # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation
            # https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
            if self._load is not None:
                self._load = (self._load * LOAD_FACTOR_1
                              + processor_queue_length  * (1.0 - LOAD_FACTOR_1))
            elif len(self._values) < NVALUE:
                self._values.append(processor_queue_length)
            else:
                self._load = sum(self._values) / len(self._values)

        return self._load
back to top