Staging
v0.5.1
https://github.com/python/cpython
Raw File
Tip revision: 550e4673be538d98b6ddf5550b3922539cf5c4b2 authored by Victor Stinner on 08 December 2020, 23:32:54 UTC
bpo-32381: Add _PyRun_SimpleFileObject() (GH-23709)
Tip revision: 550e467
tables.py
import csv

from . import NOT_SET, strutil, fsutil


EMPTY = '-'
UNKNOWN = '???'


def parse_markers(markers, default=None):
    if markers is NOT_SET:
        return default
    if not markers:
        return None
    if type(markers) is not str:
        return markers
    if markers == markers[0] * len(markers):
        return [markers]
    return list(markers)


def fix_row(row, **markers):
    if isinstance(row, str):
        raise NotImplementedError(row)
    empty = parse_markers(markers.pop('empty', ('-',)))
    unknown = parse_markers(markers.pop('unknown', ('???',)))
    row = (val if val else None for val in row)
    if not empty:
        if unknown:
            row = (UNKNOWN if val in unknown else val for val in row)
    elif not unknown:
        row = (EMPTY if val in empty else val for val in row)
    else:
        row = (EMPTY if val in empty else (UNKNOWN if val in unknown else val)
               for val in row)
    return tuple(row)


def _fix_read_default(row):
    for value in row:
        yield value.strip()


def _fix_write_default(row, empty=''):
    for value in row:
        yield empty if value is None else str(value)


def _normalize_fix_read(fix):
    if fix is None:
        fix = ''
    if callable(fix):
        def fix_row(row):
            values = fix(row)
            return _fix_read_default(values)
    elif isinstance(fix, str):
        def fix_row(row):
            values = _fix_read_default(row)
            return (None if v == fix else v
                    for v in values)
    else:
        raise NotImplementedError(fix)
    return fix_row


def _normalize_fix_write(fix, empty=''):
    if fix is None:
        fix = empty
    if callable(fix):
        def fix_row(row):
            values = fix(row)
            return _fix_write_default(values, empty)
    elif isinstance(fix, str):
        def fix_row(row):
            return _fix_write_default(row, fix)
    else:
        raise NotImplementedError(fix)
    return fix_row


def read_table(infile, header, *,
               sep='\t',
               fix=None,
               _open=open,
               _get_reader=csv.reader,
               ):
    """Yield each row of the given ???-separated (e.g. tab) file."""
    if isinstance(infile, str):
        with _open(infile, newline='') as infile:
            yield from read_table(
                infile,
                header,
                sep=sep,
                fix=fix,
                _open=_open,
                _get_reader=_get_reader,
            )
            return
    lines = strutil._iter_significant_lines(infile)

    # Validate the header.
    if not isinstance(header, str):
        header = sep.join(header)
    try:
        actualheader = next(lines).strip()
    except StopIteration:
        actualheader = ''
    if actualheader != header:
        raise ValueError(f'bad header {actualheader!r}')

    fix_row = _normalize_fix_read(fix)
    for row in _get_reader(lines, delimiter=sep or '\t'):
        yield tuple(fix_row(row))


def write_table(outfile, header, rows, *,
                sep='\t',
                fix=None,
                backup=True,
                _open=open,
                _get_writer=csv.writer,
                ):
    """Write each of the rows to the given ???-separated (e.g. tab) file."""
    if backup:
        fsutil.create_backup(outfile, backup)
    if isinstance(outfile, str):
        with _open(outfile, 'w', newline='') as outfile:
            return write_table(
                outfile,
                header,
                rows,
                sep=sep,
                fix=fix,
                backup=backup,
                _open=_open,
                _get_writer=_get_writer,
            )

    if isinstance(header, str):
        header = header.split(sep or '\t')
    fix_row = _normalize_fix_write(fix)
    writer = _get_writer(outfile, delimiter=sep or '\t')
    writer.writerow(header)
    for row in rows:
        writer.writerow(
            tuple(fix_row(row))
        )


def parse_table(entries, sep, header=None, rawsep=None, *,
                default=NOT_SET,
                strict=True,
                ):
    header, sep = _normalize_table_file_props(header, sep)
    if not sep:
        raise ValueError('missing "sep"')

    ncols = None
    if header:
        if strict:
            ncols = len(header.split(sep))
        cur_file = None
    for line, filename in strutil.parse_entries(entries, ignoresep=sep):
        _sep = sep
        if filename:
            if header and cur_file != filename:
                cur_file = filename
                # Skip the first line if it's the header.
                if line.strip() == header:
                    continue
                else:
                    # We expected the header.
                    raise NotImplementedError((header, line))
        elif rawsep and sep not in line:
            _sep = rawsep

        row = _parse_row(line, _sep, ncols, default)
        if strict and not ncols:
            ncols = len(row)
        yield row, filename


def parse_row(line, sep, *, ncols=None, default=NOT_SET):
    if not sep:
        raise ValueError('missing "sep"')
    return _parse_row(line, sep, ncols, default)


def _parse_row(line, sep, ncols, default):
    row = tuple(v.strip() for v in line.split(sep))
    if (ncols or 0) > 0:
        diff = ncols - len(row)
        if diff:
            if default is NOT_SET or diff < 0:
                raise Exception(f'bad row (expected {ncols} columns, got {row!r})')
            row += (default,) * diff
    return row


def _normalize_table_file_props(header, sep):
    if not header:
        return None, sep

    if not isinstance(header, str):
        if not sep:
            raise NotImplementedError(header)
        header = sep.join(header)
    elif not sep:
        for sep in ('\t', ',', ' '):
            if sep in header:
                break
        else:
            sep = None
    return header, sep
back to top