Staging
v0.5.1
https://github.com/python/cpython
Raw File
Tip revision: 550e4673be538d98b6ddf5550b3922539cf5c4b2 authored by Victor Stinner on 08 December 2020, 23:32:54 UTC
bpo-32381: Add _PyRun_SimpleFileObject() (GH-23709)
Tip revision: 550e467
fsutil.py
import fnmatch
import glob
import os
import os.path
import shutil
import stat

from .iterutil import iter_many


USE_CWD = object()


C_SOURCE_SUFFIXES = ('.c', '.h')


def create_backup(old, backup=None):
    if isinstance(old, str):
        filename = old
    else:
        filename = getattr(old, 'name', None)
    if not filename:
        return None
    if not backup or backup is True:
        backup = f'{filename}.bak'
    try:
        shutil.copyfile(filename, backup)
    except FileNotFoundError as exc:
        if exc.filename != filename:
            raise   # re-raise
        backup = None
    return backup


##################################
# filenames

def fix_filename(filename, relroot=USE_CWD, *,
                 fixroot=True,
                 _badprefix=f'..{os.path.sep}',
                 ):
    """Return a normalized, absolute-path copy of the given filename."""
    if not relroot or relroot is USE_CWD:
        return os.path.abspath(filename)
    if fixroot:
        relroot = os.path.abspath(relroot)
    return _fix_filename(filename, relroot)


def _fix_filename(filename, relroot, *,
                  _badprefix=f'..{os.path.sep}',
                  ):
    orig = filename

    # First we normalize.
    filename = os.path.normpath(filename)
    if filename.startswith(_badprefix):
        raise ValueError(f'bad filename {orig!r} (resolves beyond relative root')

    # Now make sure it is absolute (relative to relroot).
    if not os.path.isabs(filename):
        filename = os.path.join(relroot, filename)
    else:
        relpath = os.path.relpath(filename, relroot)
        if os.path.join(relroot, relpath) != filename:
            raise ValueError(f'expected {relroot!r} as lroot, got {orig!r}')

    return filename


def fix_filenames(filenames, relroot=USE_CWD):
    if not relroot or relroot is USE_CWD:
        filenames = (os.path.abspath(v) for v in filenames)
    else:
        relroot = os.path.abspath(relroot)
        filenames = (_fix_filename(v, relroot) for v in filenames)
    return filenames, relroot


def format_filename(filename, relroot=USE_CWD, *,
                    fixroot=True,
                    normalize=True,
                    _badprefix=f'..{os.path.sep}',
                    ):
    """Return a consistent relative-path representation of the filename."""
    orig = filename
    if normalize:
        filename = os.path.normpath(filename)
    if relroot is None:
        # Otherwise leave it as-is.
        return filename
    elif relroot is USE_CWD:
        # Make it relative to CWD.
        filename = os.path.relpath(filename)
    else:
        # Make it relative to "relroot".
        if fixroot:
            relroot = os.path.abspath(relroot)
        elif not relroot:
            raise ValueError('missing relroot')
        filename = os.path.relpath(filename, relroot)
    if filename.startswith(_badprefix):
        raise ValueError(f'bad filename {orig!r} (resolves beyond relative root')
    return filename


##################################
# find files

def match_glob(filename, pattern):
    if fnmatch.fnmatch(filename, pattern):
        return True

    # fnmatch doesn't handle ** quite right.  It will not match the
    # following:
    #
    #  ('x/spam.py', 'x/**/*.py')
    #  ('spam.py', '**/*.py')
    #
    # though it *will* match the following:
    #
    #  ('x/y/spam.py', 'x/**/*.py')
    #  ('x/spam.py', '**/*.py')

    if '**/' not in pattern:
        return False

    # We only accommodate the single-"**" case.
    return fnmatch.fnmatch(filename, pattern.replace('**/', '', 1))


def process_filenames(filenames, *,
                      start=None,
                      include=None,
                      exclude=None,
                      relroot=USE_CWD,
                      ):
    if relroot and relroot is not USE_CWD:
        relroot = os.path.abspath(relroot)
    if start:
        start = fix_filename(start, relroot, fixroot=False)
    if include:
        include = set(fix_filename(v, relroot, fixroot=False)
                      for v in include)
    if exclude:
        exclude = set(fix_filename(v, relroot, fixroot=False)
                      for v in exclude)

    onempty = Exception('no filenames provided')
    for filename, solo in iter_many(filenames, onempty):
        filename = fix_filename(filename, relroot, fixroot=False)
        relfile = format_filename(filename, relroot, fixroot=False, normalize=False)
        check, start = _get_check(filename, start, include, exclude)
        yield filename, relfile, check, solo


def expand_filenames(filenames):
    for filename in filenames:
        # XXX Do we need to use glob.escape (a la commit 9355868458, GH-20994)?
        if '**/' in filename:
            yield from glob.glob(filename.replace('**/', ''))
        yield from glob.glob(filename)


def _get_check(filename, start, include, exclude):
    if start and filename != start:
        return (lambda: '<skipped>'), start
    else:
        def check():
            if _is_excluded(filename, exclude, include):
                return '<excluded>'
            return None
        return check, None


def _is_excluded(filename, exclude, include):
    if include:
        for included in include:
            if match_glob(filename, included):
                return False
        return True
    elif exclude:
        for excluded in exclude:
            if match_glob(filename, excluded):
                return True
        return False
    else:
        return False


def _walk_tree(root, *,
               _walk=os.walk,
               ):
    # A wrapper around os.walk that resolves the filenames.
    for parent, _, names in _walk(root):
        for name in names:
            yield os.path.join(parent, name)


def walk_tree(root, *,
              suffix=None,
              walk=_walk_tree,
              ):
    """Yield each file in the tree under the given directory name.

    If "suffix" is provided then only files with that suffix will
    be included.
    """
    if suffix and not isinstance(suffix, str):
        raise ValueError('suffix must be a string')

    for filename in walk(root):
        if suffix and not filename.endswith(suffix):
            continue
        yield filename


def glob_tree(root, *,
              suffix=None,
              _glob=glob.iglob,
              ):
    """Yield each file in the tree under the given directory name.

    If "suffix" is provided then only files with that suffix will
    be included.
    """
    suffix = suffix or ''
    if not isinstance(suffix, str):
        raise ValueError('suffix must be a string')

    for filename in _glob(f'{root}/*{suffix}'):
        yield filename
    for filename in _glob(f'{root}/**/*{suffix}'):
        yield filename


def iter_files(root, suffix=None, relparent=None, *,
               get_files=os.walk,
               _glob=glob_tree,
               _walk=walk_tree,
               ):
    """Yield each file in the tree under the given directory name.

    If "root" is a non-string iterable then do the same for each of
    those trees.

    If "suffix" is provided then only files with that suffix will
    be included.

    if "relparent" is provided then it is used to resolve each
    filename as a relative path.
    """
    if not isinstance(root, str):
        roots = root
        for root in roots:
            yield from iter_files(root, suffix, relparent,
                                  get_files=get_files,
                                  _glob=_glob, _walk=_walk)
        return

    # Use the right "walk" function.
    if get_files in (glob.glob, glob.iglob, glob_tree):
        get_files = _glob
    else:
        _files = _walk_tree if get_files in (os.walk, walk_tree) else get_files
        get_files = (lambda *a, **k: _walk(*a, walk=_files, **k))

    # Handle a single suffix.
    if suffix and not isinstance(suffix, str):
        filenames = get_files(root)
        suffix = tuple(suffix)
    else:
        filenames = get_files(root, suffix=suffix)
        suffix = None

    for filename in filenames:
        if suffix and not isinstance(suffix, str):  # multiple suffixes
            if not filename.endswith(suffix):
                continue
        if relparent:
            filename = os.path.relpath(filename, relparent)
        yield filename


def iter_files_by_suffix(root, suffixes, relparent=None, *,
                         walk=walk_tree,
                         _iter_files=iter_files,
                         ):
    """Yield each file in the tree that has the given suffixes.

    Unlike iter_files(), the results are in the original suffix order.
    """
    if isinstance(suffixes, str):
        suffixes = [suffixes]
    # XXX Ignore repeated suffixes?
    for suffix in suffixes:
        yield from _iter_files(root, suffix, relparent)


##################################
# file info

# XXX posix-only?

S_IRANY = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
S_IWANY = stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
S_IXANY = stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH


def is_readable(file, *, user=None, check=False):
    filename, st, mode = _get_file_info(file)
    if check:
        try:
            okay = _check_file(filename, S_IRANY)
        except NotImplementedError:
            okay = NotImplemented
        if okay is not NotImplemented:
            return okay
        # Fall back to checking the mode.
    return _check_mode(st, mode, S_IRANY, user)


def is_writable(file, *, user=None, check=False):
    filename, st, mode = _get_file_info(file)
    if check:
        try:
            okay = _check_file(filename, S_IWANY)
        except NotImplementedError:
            okay = NotImplemented
        if okay is not NotImplemented:
            return okay
        # Fall back to checking the mode.
    return _check_mode(st, mode, S_IWANY, user)


def is_executable(file, *, user=None, check=False):
    filename, st, mode = _get_file_info(file)
    if check:
        try:
            okay = _check_file(filename, S_IXANY)
        except NotImplementedError:
            okay = NotImplemented
        if okay is not NotImplemented:
            return okay
        # Fall back to checking the mode.
    return _check_mode(st, mode, S_IXANY, user)


def _get_file_info(file):
    filename = st = mode = None
    if isinstance(file, int):
        mode = file
    elif isinstance(file, os.stat_result):
        st = file
    else:
        if isinstance(file, str):
            filename = file
        elif hasattr(file, 'name') and os.path.exists(file.name):
            filename = file.name
        else:
            raise NotImplementedError(file)
        st = os.stat(filename)
    return filename, st, mode or st.st_mode


def _check_file(filename, check):
    if not isinstance(filename, str):
        raise Exception(f'filename required to check file, got {filename}')
    if check & S_IRANY:
        flags = os.O_RDONLY
    elif check & S_IWANY:
        flags = os.O_WRONLY
    elif check & S_IXANY:
        # We can worry about S_IXANY later
        return NotImplemented
    else:
        raise NotImplementedError(check)

    try:
        fd = os.open(filename, flags)
    except PermissionError:
        return False
    # We do not ignore other exceptions.
    else:
        os.close(fd)
        return True


def _get_user_info(user):
    import pwd
    username = uid = gid = groups = None
    if user is None:
        uid = os.geteuid()
        #username = os.getlogin()
        username = pwd.getpwuid(uid)[0]
        gid = os.getgid()
        groups = os.getgroups()
    else:
        if isinstance(user, int):
            uid = user
            entry = pwd.getpwuid(uid)
            username = entry.pw_name
        elif isinstance(user, str):
            username = user
            entry = pwd.getpwnam(username)
            uid = entry.pw_uid
        else:
            raise NotImplementedError(user)
        gid = entry.pw_gid
        os.getgrouplist(username, gid)
    return username, uid, gid, groups


def _check_mode(st, mode, check, user):
    orig = check
    _, uid, gid, groups = _get_user_info(user)
    if check & S_IRANY:
        check -= S_IRANY
        matched = False
        if mode & stat.S_IRUSR:
            if st.st_uid == uid:
                matched = True
        if mode & stat.S_IRGRP:
            if st.st_uid == gid or st.st_uid in groups:
                matched = True
        if mode & stat.S_IROTH:
            matched = True
        if not matched:
            return False
    if check & S_IWANY:
        check -= S_IWANY
        matched = False
        if mode & stat.S_IWUSR:
            if st.st_uid == uid:
                matched = True
        if mode & stat.S_IWGRP:
            if st.st_uid == gid or st.st_uid in groups:
                matched = True
        if mode & stat.S_IWOTH:
            matched = True
        if not matched:
            return False
    if check & S_IXANY:
        check -= S_IXANY
        matched = False
        if mode & stat.S_IXUSR:
            if st.st_uid == uid:
                matched = True
        if mode & stat.S_IXGRP:
            if st.st_uid == gid or st.st_uid in groups:
                matched = True
        if mode & stat.S_IXOTH:
            matched = True
        if not matched:
            return False
    if check:
        raise NotImplementedError((orig, check))
    return True
back to top