Source code for forge.output
# Copyright 2017 datawire. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import eventlet, sys
reload(sys)
sys.setdefaultencoding('utf8')
re = eventlet.import_patched('re')
blessed = eventlet.import_patched('blessed')
logging = eventlet.import_patched('logging')
logger = logging.getLogger("tasks")
[docs]class Terminal(blessed.Terminal):
def __init__(self, *args, **kwargs):
blessed.Terminal.__init__(self, *args, **kwargs)
# for tokenizer, the '.lastgroup' is the primary lookup key for
# 'self.caps', unless 'MISMATCH'; then it is an unmatched character.
self._caps_compiled_any = re.compile('|'.join(
cap.named_pattern for name, cap in self.caps.items()
) + '|(?P<MISMATCH>\w+|\W)')
self._caps_unnamed_any = re.compile('|'.join(
'({0})'.format(cap.pattern) for name, cap in self.caps.items()
) + '|(\w+|\W)')
self._wrap_cache = {}
[docs] def wrap(self, text):
lines = []
for line in text.splitlines():
lines.extend(self.wrap_line(line))
return lines
[docs] def wrap_line(self, text):
if text in self._wrap_cache:
lines = self._wrap_cache[text]
else:
lines = []
line = ''
width = 0
for token, cap in blessed.sequences.iter_parse(self, text):
if cap:
delta = cap.horizontal_distance(token)
assert delta < self.width, (repr(token), cap)
if width + delta >= self.width:
lines.append(line)
line = ''
width = 0
else:
line += token
width += delta
else:
while token:
fragment = token[:self.width-width]
token = token[len(fragment):]
line += fragment
width += len(fragment)
if width >= self.width:
lines.append(line)
line = ''
width = 0
if line:
lines.append(line)
self._wrap_cache[text] = lines
return lines
[docs]class Drawer(object):
def __init__(self):
self.previous = []
self.terminal = Terminal()
[docs] def draw(self, lines, trim=True):
if trim:
# we subtract one for the initial shell line, and one for
# the last line
screenful = lines[-(self.terminal.height-2):]
else:
screenful = lines[:]
old = None
new = None
common_head = 0
for old, new in zip(self.previous, screenful):
if old == new:
common_head += 1
else:
break
sys.stdout.write(self.terminal.move_up*(len(self.previous)-common_head))
for line in screenful[common_head:]:
# XXX: should really wrap this properly somehow, but
# writing out more than the terminal width will mess up
# the movement logic
delta = len(line) - self.terminal.length(line)
sys.stdout.write(line[:self.terminal.width+delta])
sys.stdout.write(self.terminal.clear_eol + self.terminal.move_down)
sys.stdout.write(self.terminal.clear_eos)
self.previous = screenful