Staging
v0.5.0
https://foss.heptapod.net/mercurial/hgview
Raw File
Tip revision: 1850a087c1b6cb026470c738b2c7f82c6fada06d authored by Mads Kiilerich on 04 April 2020, 02:27:44 UTC
qt5: renaming of references to hgviewlib/qt5, finishing up the Qt5 port
Tip revision: 1850a08
graphlog.py
# -*- coding: utf-8 -*-
# Copyright (c) 2003-2012 LOGILAB S.A. (Paris, FRANCE).
# http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# this program.  If not, see <http://www.gnu.org/licenses/>.
'''
Contains a listbox definition that walk the repo log and display an ascii graph
'''

try:
    from itertools import zip_longest as zzip
except ImportError:  # Python 2
    from itertools import izip_longest as zzip
zzip(()) # force check over lazy import

from logging import warn

from mercurial.node import short
from urwid import AttrMap, Text, ListWalker, Columns, emit_signal

from hgviewlib.hgpatches.graphmod import (_fixlongrightedges,
                                          _getnodelineedgestail,
                                          _drawedges, _getpaddingline)

from hgviewlib.util import tounicode
from hgviewlib.hggraph import getlog, gettags, getdate, HgRepoListWalker
from hgviewlib.curses import connect_command, SelectableText


def getnodelineedgestail(echars, idx, pidx, ncols, coldiff, pdiff, fix_tail):
    try:
        return _getnodelineedgestail(echars, idx, pidx, ncols, coldiff, pdiff, fix_tail)
    except TypeError: # Mercurial < 3.8: _getnodelineedgestail() takes exactly 6 arguments (7 given)
        return _getnodelineedgestail(idx, pidx, ncols, coldiff, pdiff, fix_tail)


def drawedges(echars, edges, nodeline, interline):
    try:
        return _drawedges(echars, edges, nodeline, interline)
    except TypeError: # Mercurial < 3.8: _drawedges() takes exactly 3 arguments (4 given)
        return _drawedges(edges, nodeline, interline)


# __________________________________________________________________ constants

COLORS = ["brown", "dark red", "dark magenta", "dark blue", "dark cyan",
          "dark green", "yellow", "light red", "light magenta", "light blue",
          "light cyan", "light green"]


_COLUMNMAP = {
    'ID': lambda m, c, g: c.rev() is not None and str(c.rev()) or "",
    'Log': getlog,
    'Author': lambda m, c, g: tounicode(c.user()).split('<', 1)[0] if c.node() else '',
    'Date': getdate,
    'Tags': gettags,
    'Bookmarks': lambda m, c, g: ', '.join(tounicode(b) for b in c.bookmarks() or ()),
    'Branch': lambda m, c, g: "" if c.branch() == b'default' else tounicode(c.branch()),
    'Filename': lambda m, c, g: g.extra[0],
    'Phase': lambda model, ctx, gnode: ctx.phasestr(),
    }
GRAPH_MIN_WIDTH = 6

# ____________________________________________________________________ classes

class RevisionsWalker(ListWalker):
    """ListWalker-compatible class for browsing log changeset.
    """

    signals = ['focus changed']
    _columns = HgRepoListWalker._columns

    _allfields = (('Bookmarks', 'Branch', 'Tags', 'Log'),)
    _allcolumns = (('Date', 16), ('Author', 20), ('ID', 6),)

    def __init__(self, walker, branch='', fromhead=None, follow=False,
                 *args, **kwargs):
        self._data_cache = {}
        self._focus = 0
        self.walker = walker
        super(RevisionsWalker, self).__init__(*args, **kwargs)
        self.asciistate = [0, 0, {}, []] # coldiff, idx, edgemap, seen

    def connect_commands(self):
        """Connect usefull commands to callbacks"""
        connect_command('goto', self.set_rev)

    def _modified(self):
        """obsolete widget content"""
        super(RevisionsWalker, self)._modified()

    def _invalidate(self):
        """obsolete rendering cache"""
        self._data_cache.clear()
        super(RevisionsWalker, self)._modified()

    @staticmethod
    def get_color(idx, ignore=()):
        """
        Return a color at index 'n' rotating in the available
        colors. 'ignore' is a list of colors not to be chosen.
        """
        colors = [x for x in COLORS if x not in ignore]
        if not colors: # ghh, no more available colors...
            colors = COLORS
        return colors[idx % len(colors)]

    def data(self, pos):
        """Return a widget and the position passed."""
        # cache may be very huge on very big repo
        # (cpython for instance: >1.5GB)
        if pos in self._data_cache: # speed up rendering
            return self._data_cache[pos], pos
        widget = self.get_widget(pos)
        if widget is None:
            return None, None
        self._data_cache[pos] = widget
        return widget, pos

    def get_widget(self, pos):
        """Return a widget for the node"""
        if pos in self._data_cache: # speed up rendering
            return self._data_cache[pos], pos

        try:
            self.walker.ensureBuilt(row=pos)
        except ValueError:
            return None
        gnode = self.walker.graph[pos]
        ctx = self.walker.repo[gnode.rev]
        # prepare the last columns content
        txts = []
        for graph, fields in zzip(self.graphlog(gnode, ctx), self._allfields):
            graph = graph or ''
            fields = fields or ()
            txts.append(graph)
            txts.append(' ')
            for field in fields:
                if field not in self._columns:
                    continue
                txt = _COLUMNMAP[field](self.walker, ctx, gnode)
                if not txt:
                    continue
                txts.append((field, txt))
                txts.append(('default', ' '))
            txts.pop() # remove pending space
            txts.append('\n')
        txts.pop() # remove pending newline
        # prepare other columns
        txter = lambda col, sz: Text(
                 (col, _COLUMNMAP[col](self.walker, ctx, gnode)[:sz]),
                                       align='right', wrap='clip')
        columns = [('fixed', sz, txter(col, sz))
                   for col, sz in self._allcolumns
                   if col in self._columns]
        columns.append(SelectableText(txts, wrap='clip'))
        # tune style
        spec_style = {} # style modifier for normal
        foc_style = {} # style modifier for focused
        all_styles = set(self._columns) | set(['GraphLog', 'GraphLog.node', None])
        important_styles = set(['ID', 'GraphLog.node'])
        if ctx.obsolete():
            spec_style.update(dict.fromkeys(all_styles, 'obsolete'))
        # normal style: use special styles for working directory and tip
        style = None
        if gnode.rev is None:
            style = 'modified' # pending changes
        elif gnode.rev in self.walker.wd_revs:
            style = 'current'
        if style is not None:
            spec_style.update(dict.fromkeys(important_styles, style))
        # focused style: use special styles for working directory and tip
        foc_style.update(dict.fromkeys(all_styles, style or 'focus'))
        foc_style.update(dict.fromkeys(important_styles, 'focus.alternate'))
        # wrap widget with style modified
        widget = AttrMap(Columns(columns, 1), spec_style, foc_style)
        return widget

    def graphlog(self, gnode, ctx):
        """Return a generator that get lines of graph log for the node
        """
        # define node symbol
        if gnode.rev is None:
            char = '!' # pending changes
        elif not getattr(ctx, 'applied', True):
            char = ' '
        elif set(ctx.tags()).intersection(self.walker.mqueues):
            char = '*'
        else:
            phase = ctx.phase()
            try:
                char = 'o#^'[phase]
            except IndexError:
                warn('"%(node)s" has an unknown phase: %(phase)i',
                     {'node':short(ctx.node()), 'phase':phase})
                char = '?'

        # build the column data for the graphlogger from data given by hgview
        curcol = gnode.x
        curedges = [(start, end)
                    for start, end, color, fill in gnode.bottomlines
                    if start == curcol]
        try:
            prv, nxt, _, _ = zip(*gnode.bottomlines)
            prv, nxt = len(set(prv)), len(set(nxt))
        except ValueError: # last
            prv, nxt = 1, 0
        coldata = (curcol, curedges, prv, nxt - prv)
        self.asciistate = self.asciistate or [0, 0, {}, []]
        return hgview_ascii(self.asciistate, char, len(self._allfields),
                            coldata)

    def get_focus(self):
        """Get focused widget"""
        try:
            return self.data(self._focus)
        except IndexError:
            if self._focus > 0:
                self._focus = 0
            else:
                self._focus = 0
        try:
            return self.data(self._focus)
        except:
            return None, None

    def set_focus(self, focus=None):
        """change focused widget"""
        self._focus = focus or 0
        emit_signal(self, 'focus changed', self.get_ctx())

    focus = property(lambda self: self._focus, set_focus, None,
                     'focused widget index')

    def get_rev(self):
        """Return revision of the focused changeset"""
        if self._focus >= 0:
            return self.walker.graph[self._focus].rev

    def set_rev(self, rev=None):
        """change focused widget to the given revision ``rev``."""
        if rev is None:
            self.set_focus(0)
        else:
            self.set_focus(self.walker.graph.index(rev or 0))
        self._invalidate()

    rev = property(get_rev, set_rev, None, 'current revision')

    def get_ctx(self):
        """return context of the focused changeset"""
        if self.focus >= 0:
            return self.walker.repo[self.rev]

    def get_next(self, start_from):
        """get the next widget to display"""
        focus = start_from + 1
        try:
            return self.data(focus)
        except IndexError:
            return None, None

    def get_prev(self, start_from):
        """get the previous widget to display"""
        focus = start_from - 1
        if focus < 0:
            return None, None
        try:
            return self.data(focus)
        except IndexError:
            return None, None

# __________________________________________________________________ functions

def hgview_ascii(state, char, height, coldata):
    """prints an ASCII graph of the DAG

    takes the following arguments (one call per node in the graph):

    :param state: Somewhere to keep the needed state in (init to [0, 0])
    :param char: character to use as node's symbol.
    :param height: minimal line number to use for this node
    :param coldata: (idx, edges, ncols, coldiff)
        * idx: column index for the current changeset
        * edges: a list of (col, next_col) indicating the edges between
          the current node and its parents.
        * ncols: number of columns (ongoing edges) in the current revision.
        * coldiff: the difference between the number of columns (ongoing edges)
          in the next revision and the number of columns (ongoing edges)
          in the current revision. That is: -1 means one column removed;
          0 means no columns added or removed; 1 means one column added.


    :note: it is a Modified version of Joel Rosdahl <joel@rosdahl.net> 
           graphlog extension for mercurial
    """
    idx, edges, ncols, coldiff = coldata
    edgemap = state[2]
    seen = state[3]
    echars = [c for p in seen for c in (edgemap.get(p, '|'), ' ')]
    echars.extend(('|', ' ') * max(ncols + coldiff - len(seen), 0))

    # graphlog is broken with multiple parent. But we have ignore that to allow
    # some support of obsolete relation display
    # assert -2 < coldiff < 2
    assert height > 0, height
    if coldiff == -1:
        _fixlongrightedges(edges)
    # add_padding_line says whether to rewrite
    add_padding_line = (height > 2 and coldiff == -1 and
                        [x for (x, y) in edges if x + 1 < y])
    # fix_nodeline_tail says whether to rewrite
    fix_nodeline_tail = height <= 2 and not add_padding_line

    # nodeline is the line containing the node character (typically o)
    nodeline = ["|", " "] * idx
    nodeline.extend([('GraphLog.node', char), " "])
    nodeline.extend(getnodelineedgestail(echars, idx, state[1], ncols, coldiff,
                                         state[0], fix_nodeline_tail))
    # shift_interline is the line containing the non-vertical
    # edges between this entry and the next
    shift_interline = ["|", " "] * idx
    if coldiff == -1:
        n_spaces = 1
        edge_ch = "/"
    elif coldiff == 0:
        n_spaces = 2
        edge_ch = "|"
    else:
        n_spaces = 3
        edge_ch = "\\"
    shift_interline.extend(n_spaces * [" "])
    shift_interline.extend([edge_ch, " "] * (ncols - idx - 1))
    # draw edges from the current node to its parents
    drawedges(echars, edges, nodeline, shift_interline)
    # lines is the list of all graph lines to print
    lines = [nodeline]
    if add_padding_line:
        lines.append(_getpaddingline(idx, ncols, edges))
    if not set(shift_interline).issubset(set([' ', '|'])): # compact
        lines.append(shift_interline)
    # make sure that there are as many graph lines as there are
    # log strings
    if len(lines) < height:
        extra_interline = ["|", " "] * (ncols + coldiff)
        while len(lines) < height:
            lines.append(extra_interline)
    # print lines
    indentation_level = max(ncols, ncols + coldiff)
    for line in lines:
        # justify to GRAPH_MIN_WIDTH for convenience
        if len(line) < GRAPH_MIN_WIDTH:
            line.append(' ' * (GRAPH_MIN_WIDTH - len(line)))
        yield [
            item if isinstance(item, tuple) else
            ('GraphLog', item) if isinstance(item, str) else
            ('GraphLog', tounicode(item))  # probably from Mercurial _drawedges
            for item in line]
    # ... and start over
    state[0] = coldiff
    state[1] = idx

back to top