# Copyright 2017 datawire. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import eventlet, sys
from eventlet.corolocal import local
from eventlet.green import time
from contextlib import contextmanager
from .sentinel import Sentinel
traceback = eventlet.import_patched('traceback')
output = eventlet.import_patched('forge.output')
"""A sentinal value used to indicate that the task is not yet complete."""
PENDING = Sentinel("PENDING")
"""A sentinal value used to indicate that the task terminated with an error of some kind."""
ERROR = Sentinel("ERROR")
[docs]class ChildError(Exception):
"""
Used to indicate that a background task has had an error. The
details are reported at the source of the error, so this error
message is intentionally sparse.
"""
def __init__(self, parent, *children):
self.parent = parent
self.children = children
Exception.__init__(self, "%s child task(s) errored" % len(self.children))
[docs]class Result(object):
def __init__(self, executor, parent):
self.executor = executor
self.parent = parent
self.children = []
if self.parent:
self.parent.children.append(self)
self.child_errors = 0
self.value = PENDING
self.exc_info = None
self.thread = None
self.stack = None
self._recovered = False
@property
def exception(self):
return self.exc_info
@exception.setter
def exception(self, exc_info):
self.exc_info = exc_info
if self.parent:
self.parent.child_errors += 1
# XXX: deprecated
@property
def result(self):
return self.value
def _capture_stack(self):
self.stack = traceback.extract_stack()
[docs] def wait(self):
if self.value is PENDING:
if self.thread not in (None, eventlet.getcurrent()):
self.thread.wait()
for ch in self.children:
ch.wait()
if self.child_errors > 0 and self.value is not ERROR:
errors = [e for e in self.leaf_errors if not e._recovered]
if errors:
self.value = ERROR
self.exception = (ChildError, ChildError(self, self.leaf_errors), None)
[docs] def get(self):
self.wait()
if self.value is ERROR:
raise self.exception[0], self.exception[1], self.exception[2]
else:
return self.value
[docs] def recover(self):
for r in self.traversal:
r._recovered = True
@property
def traversal(self):
yield self
for c in self.children:
for d in c.traversal:
yield d
@property
def errors(self):
return [r for r in self.traversal if r.is_leaf_error()]
@property
def leaf_errors(self):
return [ch for ch in self.traversal if ch is not self and ch.is_leaf_error()]
[docs] def is_leaf_error(self):
if self.result is ERROR:
if issubclass(self.exception[0], ChildError):
return False
for ch in self.children:
if ch.value is ERROR and ch.exception[1] == self.exception[1]:
return False
return True
else:
return False
[docs] def is_signal(self, (filename, lineno, funcname, text)):
noise = {"forge/executor.py": ("run", "do_run", "_capture_stack"),
"forge/tasks.py": ("go", "__call__"),
"eventlet/greenthread.py": ("main",)}
for k, v in noise.items():
if filename.endswith(k) and funcname in v:
return False
return True
[docs] def get_traceback(self):
if not self.is_leaf_error():
return None
stack = []
result = self
while result:
if not stack:
stack = traceback.extract_tb(result.exception[2])
stack[:0] = result.stack
elif result.parent and result.executor.async:
stack[:0] = result.stack
result = result.parent
# Noise is considered to be dispatch/glue code that clutters
# stack traces due to bugs in the actual business logic of
# tasks. We only filter out noise if the last line of the
# stack is business logic. That way if there is a bug in the
# dispatch/glue code it doesn't get filtered out.
if self.is_signal(stack[-1]):
stack = filter(self.is_signal, stack)
return "".join(["Traceback (most recent call last):\n"] + traceback.format_list(stack) +
traceback.format_exception_only(*self.exception[:2]))
@property
def terminal(self):
return executor.MUXER.terminal
[docs] def report(self, autocolor=True):
total = 0
errors = []
for r in self.traversal:
total += 1
if r.is_leaf_error():
exc = r.exception[1]
indent = " "
if getattr(exc, "report_traceback", True):
tb = "\n\n" + r.get_traceback().strip()
errors.append("%s%s: unexpected error%s" % (indent, r.executor.name,
tb.replace("\n", "\n " + indent)))
else:
errors.append("%s%s: %s" % (indent, r.executor.context, exc))
if autocolor:
if errors:
color = self.terminal.bold_red
else:
color = self.terminal.green
else:
color = lambda x: x
result = "\n".join(["%s tasks run, %s errors" % (total, len(errors))] + errors)
return "\n".join([color(line) for line in result.splitlines()])
def __repr__(self):
if self.exception is None:
return "Value(%r)" % self.value
else:
return repr(self.exception[1])
class _Muxer(object):
def __init__(self, stream):
assert not isinstance(stream, _Muxer)
self.previous = None
self.stream = stream
self.terminal = output.Terminal()
self.default_color = lambda x: x
def write(self, bytes):
exe = executor.current()
if exe is None:
context = None
color = self.default_color
else:
context = exe.context
color = exe.color
if self.previous != context:
if context is not None:
self.stream.write((color(u"\u2554\u2550") + color(unicode(context)) + u"\n").encode("UTF-8"))
if isinstance(bytes, unicode):
bytes = bytes.encode("UTF-8")
self.stream.write(bytes)
self.previous = context
def flush(self):
self.stream.flush()
def isatty(self):
return self.stream.isatty()
_POOL = eventlet.greenpool.GreenPool()
[docs]class executor(object):
"""
An executor provides some useful utilities for safely running and
coordinating code::
# an executor can run stuff safely:
exe = executor("my-executor")
result = exe.run(lambda x: x/0, 1)
# a result can be an error or a value
if result.value is ERROR:
print result.exception
else:
print result.value
# you can retrieve the result just as if you had run the
# function
try:
x = result.get()
print x
except ZeroDivisionError, e:
print e
An executor can also be used to run asynchronous tasks::
exe = executor("my-async-executor", async=True)
result = exe.run(lambda x: x/0, 1)
# the result is pending
if result.value is PENDING:
print "still waiting..."
# block until the result is available
result.wait()
if result.value is ERROR:
print result.exception
else:
print result.value
When executors are nested, any errors occuring in asynchronous
tasks are tracked:
def my_code():
exe = executor("sub-executor", async=True)
# lets launch a background task and ignore the result
exe.run(lambda: 1/0)
exe = executor("root-executor")
result = exe.run(my_code)
The executor tracks all background tasks and should any errors
occur, the executor constructs a full stack trace that includes
not only the line of code in the background thread, but the stack
for the code that launched the background thread::
print result.report() -->
root-executor: 1 child task(s) errored
sub-executor: unexpected error
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 4, in my_code
File "<stdin>", line 4, in <lambda>
ZeroDivisionError: integer division or modulo by zero
"""
CURRENT = local()
MUXER = _Muxer(sys.stdout)
COLORS = [getattr(MUXER.terminal, n) for n in ("white",
"cyan",
"magenta",
"blue",
"bold_cyan",
"bold_magenta",
"bold_blue",
"bold_white",
"black_on_white",
"bold_white_on_blue",
"white_on_blue",
"white_on_magenta",
"bold_white_on_magenta")]
ALLOCATED = {}
[docs] @classmethod
def allocate_color(cls, name):
if name in cls.ALLOCATED:
return cls.ALLOCATED[name]
else:
color = cls.COLORS[len(cls.ALLOCATED) % len(cls.COLORS)]
cls.ALLOCATED[name] = color
return color
[docs] @classmethod
def current(cls):
return getattr(cls.CURRENT, "executor", None)
[docs] @classmethod
def current_result(cls):
return getattr(cls.CURRENT, "result", None)
[docs] @classmethod
def setup(cls):
eventlet.sleep() # workaround for import cycle: https://github.com/eventlet/eventlet/issues/401
eventlet.monkey_patch(thread=False)
if 'pytest' not in sys.modules:
import getpass
getpass.os = eventlet.patcher.original('os') # workaround for https://github.com/eventlet/eventlet/issues/340
sys.stdout = cls.MUXER
[docs] @classmethod
def resize(cls, size):
_POOL.resize(size)
def __init__(self, name = None, async=False):
self.name = name
self.results = []
self.async = async
self.messages = []
self.parent = self.current()
if self.parent is None:
self.verbose = False
else:
self.verbose = self.parent.verbose
if self.name is None:
if self.parent:
self.context = self.parent.context
else:
self.context = None
else:
self.context = self.name
if not self.parent:
self.context_colors = {}
self.color = self.allocate_color(self.context)
@contextmanager
def _make_current(self, result):
saved_executor = self.current()
saved_result = self.current_result()
self.CURRENT.executor = self
self.CURRENT.result = result
yield
self.CURRENT.executor = saved_executor
self.CURRENT.result = saved_result
[docs] def echo(self, text=u"", prefix=u"\u2551 ", newline=True):
with self._make_current(None):
msg = self.color(prefix) + text.replace(u"\n", u"\n" + self.color(prefix))
if newline:
print msg
else:
sys.stdout.write(msg)
[docs] def info(self, text):
if self.verbose:
self.echo(text)
[docs] def warn(self, text):
if self.verbose:
self.echo(text)
else:
self.messages.append(text)
[docs] def error(self, text):
if self.verbose:
self.echo(text)
else:
self.messages.append(text)
[docs] def do_run(self, result, fun, args, kwargs):
with self._make_current(result):
try:
result.value = fun(*args, **kwargs)
except:
result.value = ERROR
result.exception = sys.exc_info()
result.thread = None
result.wait()
[docs] def run(self, fun, *args, **kwargs):
result = Result(self, self.current_result())
result._capture_stack()
self.results.append(result)
if self.async:
result.thread = _POOL.spawn(self.do_run, result, fun, args, kwargs)
else:
self.do_run(result, fun, args, kwargs)
return result
[docs] def wait(self):
for r in self.results:
r.wait()
[docs] def report(self):
return u"\n".join([r.report() for r in self.results])