Source code for forge.match

# Copyright 2015 datawire. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import inspect, collections, textwrap

[docs]class MatchError(Exception): pass
EPSILON = object()
[docs]class State: sequence = 0 def __init__(self): self.id = State.sequence State.sequence += 1 self.matches = {} self.epsilons = () self.action = None self.match_value = True @property def transitions(self): for s in self.epsilons: yield (EPSILON, s) for k, v in self.matches.items(): for s in v: yield (k, s) @property def nodes(self): done = set() todo = [self] while todo: state = todo.pop() if state not in done: yield state done.add(state) for k, s in state.transitions: if s not in done: todo.append(s) @property def edges(self): done = set() for state in self.nodes: for k, s in state.transitions: edge = (state, k, s) if edge not in done: yield (state, k, s) done.add(edge) @property def epsilon_closure(self): todo = [self] done = set() while todo: s = todo.pop() if s not in done: done.add(s) for e in s.epsilons: yield e if e not in done: todo.append(e)
[docs] def force(self): todo = [self] done = set() while todo: s = todo.pop() if s not in done: done.add(s) for k, v in s.matches.items(): if isinstance(k, delay): key = k.force() s.matches[key] = v del s.matches[k] for s2 in v: if s2 not in done: todo.append(s2) for e in s.epsilons: if e not in done: todo.append(e)
[docs] def compile(self): if self.epsilons: assert self.action is None actions = set() for e in self.epsilon_closure: if e.action: actions.add(e.action) for k, v in e.matches.items(): for s in v: self.edge(k, s) assert len(actions) <= 1, str(self) if actions: self.action = actions.pop() self.epsilons = ()
[docs] def edge(self, *args): if len(args) == 1: assert isinstance(args[0], State) self.epsilons += args elif len(args) == 2: key, state = args self.matches[key] = self.matches.get(key, ()) + (state,) else: assert False, "wrong number of args"
def __setitem__(self, key, value): self.matches[key] = (value,) def __getitem__(self, key): return self.matches.get(key, ()) def __repr__(self): transitions = [] actions = [] for start, key, end in self.edges: if key is EPSILON: transitions.append("S%s -> S%s" % (start.id, end.id)) else: transitions.append("S%s %s -> S%s" % (start.id, key, end.id)) for state in self.nodes: if state.action: actions.append("S%s(%s)" % (state.id, state.action)) if transitions: return "State(S%s\n %s\n)" % (self.id, ",\n ".join(transitions + actions)) else: return "State(S%s)" % self.id
[docs] def match(self, *args, **kwargs): states = {self: ()} remaining = list(args) for value in flatten(args): next = {} for state, distance in states.items(): count = 0 for proj in projections(value, state.match_value): transitions = state[proj] if transitions: for s in transitions: if s not in next: next[s] = distance + (count,) count += 1 states = next nearest = {} minimum = None for state, distance in states.items(): if state.action: if minimum is None: nearest = {state.action: state} minimum = distance elif distance < minimum: nearest = {state.action: state} minimum = distance elif distance == minimum: nearest[state.action] = state if len(nearest) > 1: dfns = "\n".join([ppfun(n.action) for n in nearest.values()]) raise MatchError("arguments ({}) match multiple actions:\n\n{}".format(ppargs(args), dfns)) if not nearest: dfns = "\n".join([ppfun(n.action) for n in self.nodes if n.action]) raise MatchError("arguments ({}) do not match:\n\n{}".format(ppargs(args), dfns)) assert len(nearest) == 1, nearest state = nearest.popitem()[1] assert state.action, (state, remaining) return state.action
[docs] def apply(self, *args, **kwargs): return self.match(*args, **kwargs)(*args, **kwargs)
[docs]def deduplicate(items): deduped = [] for item in items: if not deduped: deduped.append([item, 1]) else: last = deduped[-1] if last[0] == item: last[1] += 1 else: deduped.append([item, 1]) return [(("%s*%s" % tuple(p)) if p[1] > 1 else "%s" % p[0]) for p in deduped]
[docs]def ppargs(args, dedup=False): result = [] for a in args: if isinstance(a, list): result.append("[%s]" % ppargs(a, True)) elif isinstance(a, tuple): result.append("(%s)" % ppargs(a, True)) else: result.append("%s.%s" % (a.__class__.__module__, a.__class__.__name__)) if dedup: result = deduplicate(result) return ", ".join(result)
[docs]def ppfun(fun): try: lines, number = inspect.getsourcelines(fun) return "%s:%s:\n%s" % (inspect.getsourcefile(fun), number, "".join(lines)) except TypeError: return repr(fun)
[docs]class Marker(object): pass
[docs]class Begin(Marker): def __init__(self, cls): self.cls = cls def __hash__(self): return hash(self.cls) def __eq__(self, other): if not isinstance(other, Begin): return False return self.cls == other.cls def __repr__(self): return "BEGIN(%s)" % self.cls.__name__
[docs]class End(Marker): def __repr__(self): return "END"
END = End()
[docs]def flatten(values): for value in values: if isinstance(value, (list, tuple)): yield Begin(value.__class__) for v in flatten(value): yield v yield END else: yield value
[docs]def projections(value, match_value=True): if match_value and isinstance(value, collections.Hashable): yield value traits = getattr(value, "MATCH_TRAITS", None) if traits is not None: if isinstance(traits, tuple): for t in traits: yield t else: yield traits if not isinstance(value, Marker): if isinstance(value, super): for cls in value.__self_class__.__mro__[1:]: yield cls else: for cls in value.__class__.__mro__: yield cls
[docs]class Fragment(object): def __init__(self, start, extend, doc): assert isinstance(start, State) self.start = start self.extend = extend self.doc = doc
def _value(value): start = State() if isinstance(value, type): doc = value.__name__ elif isinstance(value, lazy): doc = value.name else: doc = repr(value) return Fragment(start, lambda next: start.edge(value, next), doc)
[docs]def one(*pattern): return cat(pattern)
[docs]def cat(patterns): start = None extend = None docs = [] for p in flatten(patterns): if isinstance(p, Fragment): frag = p else: frag = _value(p) docs.append(frag.doc) if start is None: start = frag.start extend = frag.extend else: extend(frag.start) extend = frag.extend if start is None: start = State() extend = start.edge return Fragment(start, extend, ", ".join(docs))
[docs]def opt(*pattern): frag = cat(pattern) return Fragment(frag.start, lambda next: (frag.extend(next), frag.start.edge(next)), "opt(%s)" % frag.doc)
def _many(pattern): frag = cat(pattern) frag.extend(frag.start) return Fragment(frag.start, lambda next: (frag.extend(next), frag.start.edge(next)), "many(%s)" % frag.doc)
[docs]def many(*pattern, **kwargs): min = kwargs.pop("min", 0) if kwargs: raise TypeError("no such keyword argument(s): %s" % ", ".join(kwargs.keys())) patterns = [] for i in range(min): patterns.extend(pattern) patterns.append(_many(pattern)) return cat(patterns)
[docs]def when(pattern, action): frag = one(pattern) state = State() state.action = action frag.extend(state) if action.__doc__ is not None: wrapped = textwrap.fill(action.__doc__, initial_indent=" ", subsequent_indent=" ") else: wrapped = None return Fragment(frag.start, lambda next: state.edge(next), "%s:\n%s" % (frag.doc, wrapped))
[docs]def choice(*patterns): start = State() docs = [] fragments = [one(p) for p in patterns] for f in fragments: start.edge(f.start) docs.append(f.doc) return Fragment(start, lambda next: [f.extend(next) for f in fragments], "choice(%s)" % ", ".join(docs))
[docs]def ntuple(pattern, **kwargs): return (many(pattern, **kwargs),)
[docs]class delay(object): def __init__(self, thunk): self.thunk = thunk
[docs] def force(self): return self.thunk()
[docs]class lazy(delay): def __init__(self, name): self.name = name frame = inspect.currentframe() try: self.frame = frame.f_back finally: del frame
[docs] def force(self): frame = self.frame while frame: if self.name in frame.f_locals: return frame.f_locals[self.name] frame = frame.f_back raise NameError(self.name)
def __repr__(self): return "lazy(%r)" % self.name
[docs]def compile(fragment): fragment.start.force() todo = [fragment.start] done = set() while todo: state = todo.pop() state.compile() done.add(state) for v in state.matches.values(): for s in v: if s not in done: todo.append(s) fragment.start.doc = fragment.doc return fragment.start
class _BoundDispatcher(object): def __init__(self, clazz, object, dispatcher): self.clazz = clazz self.object = object self.dispatcher = dispatcher # XXX: This is here for inspect.ismethoddescriptor which needs to # return True for help() to work properly. There may be a better # way to do this. def __get__(self): assert False @property def __name__(self): return self.dispatcher.name @property def __doc__(self): return "\n\n".join([f.__doc__ for c, f in self._mro]) @property def _mro(self): for c in self.clazz.__mro__: if self.dispatcher.name in c.__dict__: disp = c.__dict__[self.dispatcher.name] if isinstance(disp, _Dispatcher): yield c, disp @property def _compiled(self): compiled = self.dispatcher.cache.get(self.clazz) if compiled is None: fragments = [] for c, disp in self._mro: fragments.append(one(c, disp.fragment)) compiled = compile(choice(*fragments)) compiled.match_value = False self.dispatcher.cache[self.clazz] = compiled return compiled def __call__(self, *args, **kwargs): compiled = self._compiled try: if self.object is None: return compiled.apply(*args, **kwargs) else: return compiled.apply(self.object, *args, **kwargs) except MatchError, e: raise TypeError("%s.%s() %s" % (self.clazz.__name__, self.dispatcher.name, e)) def match(self, *args, **kwargs): compiled = self._compiled if self.object is None: fun = compiled.match(*args, **kwargs) return lambda: fun(*args, **kwargs) else: fun = compiled.match(self.object, *args, **kwargs) return lambda: fun(self.object, *args, **kwargs) class _Dispatcher(object): def __init__(self, name): self.name = name self.fragment = None self.compiled = None self.cache = {} self.docs = [] def add(self, types, action): frag = when(cat(types), action) if self.fragment is None: self.fragment = frag else: self.fragment = choice(self.fragment, frag) self.compiled = None self.docs.append(frag.doc) self.__doc__ = "\n\n".join(self.docs) def __get__(self, object, clazz): return _BoundDispatcher(clazz, object, self) @property def _compiled(self): compiled = self.compiled if compiled is None: compiled = compile(self.fragment) self.compiled = compiled return compiled def __call__(self, *args, **kwargs): compiled = self._compiled try: return compiled.apply(*args, **kwargs) except MatchError, e: raise TypeError("%s() %s" % (self.name, e)) def match(self, *args, **kwargs): compiled = self._compiled fun = compiled.match(*args, **kwargs) return lambda: fun(*args, **kwargs) def _decorate(namespace, function, pattern): name = function.__name__ if name in namespace and isinstance(namespace[name], _Dispatcher): dispatcher = namespace[name] else: dispatcher = _Dispatcher(name) namespace[name] = dispatcher dispatcher.add(pattern, function) return dispatcher
[docs]def match(*pattern): def decorator(function): namespace = inspect.currentframe().f_back.f_locals return _decorate(namespace, function, pattern) return decorator
[docs]class trait(object): def __init__(self, value): self.value = value def __hash__(self): return hash(self.value) def __eq__(self, other): return isinstance(other, trait) and self.value == other.value