diff options
-rw-r--r-- | ttystatus/__init__.py | 2 | ||||
-rw-r--r-- | ttystatus/area.py | 75 | ||||
-rw-r--r-- | ttystatus/messager.py | 215 | ||||
-rw-r--r-- | ttystatus/messager_tests.py | 121 | ||||
-rw-r--r-- | ttystatus/status.py | 15 | ||||
-rw-r--r-- | ttystatus/status_tests.py | 12 | ||||
-rw-r--r-- | ttystatus/tty.py | 76 | ||||
-rw-r--r-- | without-tests | 14 |
8 files changed, 246 insertions, 284 deletions
diff --git a/ttystatus/__init__.py b/ttystatus/__init__.py index 2eab310..e88bdf5 100644 --- a/ttystatus/__init__.py +++ b/ttystatus/__init__.py @@ -13,6 +13,8 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +from .tty import PhysicalTerminal +from .area import AreaManager from .messager import Messager from .status import TerminalStatus from .widget import Widget diff --git a/ttystatus/area.py b/ttystatus/area.py new file mode 100644 index 0000000..d32a41b --- /dev/null +++ b/ttystatus/area.py @@ -0,0 +1,75 @@ +# Copyright 2010-2011,2015 Lars Wirzenius +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +class AreaManager(object): + + '''Manage the area on the terminal for displaying messages.''' + + def __init__(self): + self._terminal = None + + def set_terminal(self, terminal): + self._terminal = terminal + + def get_max_line_length(self): + return self._terminal.get_width() - 1 + + def make_space(self, num_lines): + '''Make space for a message needing a given number of lines. + + If the cursor is near the bottom of the terminal, scroll + things up so that there's space. Otherwise, effectively + nothing happens, except that the cursor is left at the last + line reserved for the message. + + ''' + + self._terminal.write('\n' * (num_lines - 1)) + + def display(self, message): + '''Display a message, which may be on multiple lines. + + The cursor is assumed to be at the last line of the message + area. Long lines are chopped at terminal width - 1. + + ''' + + max_chars = self.get_max_line_length() + up = self._terminal.get_up_sequence() + down = self._terminal.get_down_sequence() + erase = self._terminal.get_erase_line_sequence() + lines = message.split('\n') + + parts = [up * (len(lines) - 1)] + for i, line in enumerate(message.split('\n')): + if i > 0: + parts.append(down) + parts.append(erase) + parts.append(line[:max_chars]) + + self._terminal.write(''.join(parts)) + + def clear_area(self, num_lines): + '''Clear area reserved for message needing a given number of lines. + + The cursor is assumed to be at the last line of the message + area and is left at the top. + + ''' + + up = self._terminal.get_up_sequence() + erase = self._terminal.get_erase_line_sequence() + self._terminal.write((erase + up) * (num_lines - 1) + erase) diff --git a/ttystatus/messager.py b/ttystatus/messager.py index f4cd143..5f08690 100644 --- a/ttystatus/messager.py +++ b/ttystatus/messager.py @@ -14,162 +14,88 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. -import curses -import fcntl -import struct -import termios import time +import ttystatus + class Messager(object): - '''Write messages to the terminal.''' + '''Manages messages to the terminal. - def __init__(self, output=None, period=None, open_tty=None, - fake_width=False): - self._enabled = True - if output: - self.output = output - else: - try: - self.output = (open_tty or self._open_tty)() - except IOError: - self.output = None + This includes disabling messages, allowing notifications, and + becalming the flow of messages to avoid writing too fast. The + speed is a performance thing: writing too much message text can + slow an application down a lot (too much work for terminal + emulators), and doesn't actually help the user in any way. + + ''' + + def __init__(self, period=None): self._period = 1.0 if period is None else period - self._last_time = 0 # When did we write last? - self._cached_msg = '' # Last message from user, to write() method. - self._first_output = True # is our next output the first one? - self._fake_width = fake_width - self.set_width(self._get_terminal_width()) # Width of terminal - def _open_tty(self): # pragma: no cover - f = open('/dev/tty', 'wb') - curses.setupterm(None, f.fileno()) - return f + self._enabled = True + + self._cached_message = None # The latest message from caller. + self._displayed_message = None # The latest message displayed. + self._previous_write_at = 0 # When the latest message was written. + + self._terminal = ttystatus.PhysicalTerminal() + try: + self._terminal.open_tty() + except IOError: + self._enabled = False - def set_width(self, actual_width): - self.width = actual_width - 1 + self._area = ttystatus.AreaManager() + self._area.set_terminal(self._terminal) + + def disable(self): + '''Disable all output except notifications.''' + self._enabled = False + + def enable(self): + '''Enable output to happen.''' + self._enabled = True + + def time_to_write(self): + '''Is it time to write now?''' + return self._now() - self._previous_write_at >= self._period def _now(self): '''Return current time.''' # This is a wrapper around time.time(), for testing. return time.time() - def _get_terminal_width(self): # pragma: no cover - '''Return width of terminal in characters. + def get_max_line_length(self): + return self._area.get_max_line_length() - If this fails, assume 80. + def write(self, message): + '''Write message to terminal. - Borrowed and adapted from bzrlib. + Message may be multiple lines. ''' - width = 80 - if self._fake_width: - if hasattr(self, 'width'): - width = self.width - elif self.output is not None: - # StringIO might not have fileno. We use StringIO for tests. - fileno = getattr(self.output, 'fileno', None) - if fileno is not None: - try: - s = struct.pack('HHHH', 0, 0, 0, 0) - x = fcntl.ioctl(fileno(), termios.TIOCGWINSZ, s) - width = struct.unpack('HHHH', x)[1] - except IOError: - pass - return width - - def update_width(self): # pragma: no cover - self.set_width(self._get_terminal_width()) - - def _raw_write(self, string): - '''Write raw data if output is terminal.''' - - if self._enabled and self.output and self.output.isatty(): - try: - self.output.write(string) - self.output.flush() - except IOError: # pragma: no cover - self._enabled = False - - def time_to_write(self): - '''Is it time to write now?''' - return self._now() - self._last_time >= self._period - - def write(self, string): - '''Write raw data, always.''' - if not string: - return - self.update_width() - rows = string.split('\n') - - raw_parts = [] - - if self._first_output: - raw_parts.append('\n' * (len(rows) - 1)) - self._first_output = False - - if rows: - up = curses.tparm(curses.tigetstr('cuu'), 1) - down = curses.tparm(curses.tigetstr('cud'), 1) - cr = curses.tigetstr('cr') - el = curses.tigetstr('el') - - raw_parts.extend([ - up * (len(rows) - 1), - cr, - el, - rows[0][:self.width], - ]) - for row in rows[1:]: - raw_parts.extend([ - down, - cr, - el, - row[:self.width], - ]) - - raw = ''.join(raw_parts) - self._raw_write(raw) - self._cached_msg = string - self._last_time = self._now() + if self._enabled and self.time_to_write(): + self.clear() + num_lines = len(message.split('\n')) + self._area.make_space(num_lines) + self._area.display(message) + self._displayed_message = message + self._previous_write_at = self._now() + self._cached_message = message def clear(self): - '''Remove current message from terminal.''' - - if self._first_output: - return - - rows = self._cached_msg.split('\n') - - raw_parts = [] - - if rows: - up = curses.tparm(curses.tigetstr('cuu'), 1) - down = curses.tparm(curses.tigetstr('cud'), 1) - cr = curses.tigetstr('cr') - el = curses.tigetstr('el') - - raw_parts.extend([ - up * (len(rows) - 1), - cr, - el, - ]) - for row in rows[1:]: - raw_parts.extend([ - down, - cr, - el, - ]) - raw_parts.extend([ - up * (len(rows) - 1), - ]) - - raw = ''.join(raw_parts) - self._raw_write(raw) - - def notify(self, string, f, force=False): + '''Remove currently displayed message from terminal, if any.''' + + if self._displayed_message is not None: + num_lines = len(self._displayed_message.split('\n')) + self._area.clear_area(num_lines) + self._displayed_message = None + self._cached_message = None + self._previous_write_at = 0 # Next .write() should display. + + def notify(self, message, f, force=False): '''Show a notification message string to the user. Notifications are meant for error messages and other things @@ -185,24 +111,17 @@ class Messager(object): if self._enabled or force: self.clear() try: - f.write('%s\n' % string) + f.write(message) + f.write('\n') f.flush() except IOError: # We ignore these. No point in crashing if terminal is bad. pass - self._first_output = True - self.write(self._cached_msg) + if self._cached_message is not None: + self.write(self._cached_message) def finish(self): '''Finalize output.''' - if self._cached_msg: - self.write(self._cached_msg) - self._raw_write('\n') - - def disable(self): - '''Disable all output.''' - self._enabled = False - - def enable(self): - '''Enable output to happen.''' - self._enabled = True + if self._cached_message is not None: + self.write(self._cached_message) + self._terminal.write('\n') diff --git a/ttystatus/messager_tests.py b/ttystatus/messager_tests.py deleted file mode 100644 index 221be13..0000000 --- a/ttystatus/messager_tests.py +++ /dev/null @@ -1,121 +0,0 @@ -# Copyright 2010, 2011 Lars Wirzenius -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - -import StringIO -import unittest - -import ttystatus - - -class DummyTerminal(StringIO.StringIO): - - def isatty(self): - return True - - -class MessagerTests(unittest.TestCase): - - def setUp(self): - self.output = DummyTerminal() - self.messager = ttystatus.Messager(output=self.output, fake_width=True) - - def fast_time(self): - return self.messager._last_time + self.messager._period - - def test_sets_output(self): - self.assertEqual(self.messager.output, self.output) - - def test_handles_no_tty(self): - def notty(): - raise IOError() - m = ttystatus.Messager(open_tty=notty) - self.assertEqual(m.output, None) - - def test_raw_writes_nothing_if_output_is_not_a_terminal(self): - self.messager.output = StringIO.StringIO() - self.messager._raw_write('foo') - self.assertEqual(self.messager.output.getvalue(), '') - - def test_raw_writes_something_if_output_is_not_a_terminal(self): - self.messager._raw_write('foo') - self.assertEqual(self.output.getvalue(), 'foo') - - def test_knows_it_is_time_to_write_at_start(self): - self.assert_(self.messager.time_to_write()) - - def test_knows_it_is_not_time_to_write_right_after_previous_one(self): - self.messager._last_time = self.messager._now() - self.assertFalse(self.messager.time_to_write()) - - def test_knows_it_is_time_to_write_after_a_period(self): - self.messager._last_time = ( - self.messager._now() - self.messager._period * 2) - self.assert_(self.messager.time_to_write()) - - def test_cached_write_writes_first_thing(self): - self.messager.write('foo') - self.assertEqual(self.output.getvalue(), 'foo') - - def test_write_removes_old_message(self): - self.messager._now = self.fast_time - self.messager.write('foo') - self.messager.write('bar') - self.assertEqual(self.output.getvalue(), 'foo\r \rbar') - - def test_clear_removes_message(self): - self.messager._now = lambda: self.messager._period + 1 - self.messager.write('foo') - self.messager.clear() - self.assertEqual(self.output.getvalue(), 'foo\r \r') - - def test_notify_removes_message_and_puts_it_back_afterwards(self): - f = StringIO.StringIO() - self.messager.write('foo') - self.messager.notify('bar', f) - self.assertEqual(self.output.getvalue(), 'foo\r \rfoo') - self.assertEqual(f.getvalue(), 'bar\n') - - def test_notify_does_not_mind_ioerror(self): - f = open('/dev/full', 'w') - self.messager.write('foo') - self.messager.notify('bar', f) - self.assertEqual(self.output.getvalue(), 'foo\r \rfoo') - f.close() - - def test_finish_flushes_unwritten_message(self): - self.messager._now = lambda: 0 - self.messager.write('foo') - self.messager.finish() - self.assertEqual(self.output.getvalue(), 'foo\r \rfoo\n') - - def test_has_width(self): - self.assertEqual(self.messager.width, 79) - - def test_write_truncates_at_one_less_than_width(self): - self.messager.set_width(4) - self.messager.write('foobar') - self.assertEqual(self.output.getvalue(), 'foo') - - def test_disables_output(self): - self.messager.disable() - self.messager.write('foo') - self.assertEqual(self.output.getvalue(), '') - - def test_enables_output(self): - self.messager.disable() - self.messager.enable() - self.messager.write('foo') - self.assertEqual(self.output.getvalue(), 'foo') diff --git a/ttystatus/status.py b/ttystatus/status.py index 0f57f74..18b3057 100644 --- a/ttystatus/status.py +++ b/ttystatus/status.py @@ -32,16 +32,20 @@ class TerminalStatus(object): ''' - def __init__(self, output=None, period=None, messager=None): - self._m = messager or ttystatus.Messager(output=output, period=period) + def __init__(self, period=None, messager=None): + self._m = messager or ttystatus.Messager(period=period) self.clear() def add(self, widget): '''Add a new widget to the status display.''' + if not self._widget_rows: + self._widget_rows = [[]] self._widget_rows[-1].append(widget) def start_new_line(self): # pragma: no cover '''Start a new line of widgets.''' + if not self._widget_rows: + self._widget_rows = [[]] self._widget_rows.append([]) def format(self, format_string): @@ -69,7 +73,7 @@ class TerminalStatus(object): def clear(self): '''Remove all widgets.''' - self._widget_rows = [[]] + self._widget_rows = [] self._values = dict() self._m.clear() @@ -105,7 +109,8 @@ class TerminalStatus(object): return '\n'.join(self._render_row(row) for row in self._widget_rows) def _render_row(self, widget_row): - remaining = self._m.width + max_chars = self._m.get_max_line_length() + remaining = max_chars texts = [None] * len(widget_row) @@ -119,7 +124,7 @@ class TerminalStatus(object): texts[i] = w.render(remaining) remaining -= len(texts[i]) - return (''.join(texts))[:self._m.width] + return (''.join(texts))[:max_chars] def _write(self): '''Render and output current state of all widgets.''' diff --git a/ttystatus/status_tests.py b/ttystatus/status_tests.py index 1cadfda..ecf91f9 100644 --- a/ttystatus/status_tests.py +++ b/ttystatus/status_tests.py @@ -22,11 +22,13 @@ import ttystatus class DummyMessager(object): - width = 80 - def __init__(self): self.written = StringIO.StringIO() self.enabled = True + self.fake_width = 80 + + def get_max_line_length(self): + return self.fake_width def clear(self): pass @@ -144,14 +146,14 @@ class TerminalStatusTests(unittest.TestCase): self.ts.add(w1) self.ts.add(w2) text = self.ts._render() - self.assertEqual(len(text), self.ts._m.width) + self.assertEqual(len(text), self.ts._m.get_max_line_length()) def test_renders_from_beginning_if_there_is_not_enough_space(self): w1 = ttystatus.Literal('foo') w2 = ttystatus.Literal('bar') self.ts.add(w1) self.ts.add(w2) - self.ts._m.width = 4 + self.ts._m.fake_width = 4 text = self.ts._render() self.assertEqual(text, 'foob') @@ -162,6 +164,6 @@ class TerminalStatusTests(unittest.TestCase): self.ts.add(w1) self.ts.add(w2) self.ts.add(w3) - self.ts._m.width = 9 + self.ts._m.fake_width = 9 text = self.ts._render() self.assertEqual(text, 'foo---bar') diff --git a/ttystatus/tty.py b/ttystatus/tty.py new file mode 100644 index 0000000..094cee9 --- /dev/null +++ b/ttystatus/tty.py @@ -0,0 +1,76 @@ +# Copyright 2010-2011,2015 Lars Wirzenius +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import curses +import fcntl +import struct +import termios + + +class PhysicalTerminal(object): + + '''Handle interaction with the physical terminal.''' + + def __init__(self): + self._terminal = None + + def open_tty(self): + self._terminal = open('/dev/tty', 'wb') + curses.setupterm(None, self._terminal.fileno()) + + def get_up_sequence(self): + return curses.tparm(curses.tigetstr('cuu'), 1) + + def get_down_sequence(self): + return curses.tparm(curses.tigetstr('cud'), 1) + + def get_erase_line_sequence(self): + cr = curses.tigetstr('cr') + el = curses.tigetstr('el') + return cr + el + + def get_width(self): + '''Return width of terminal in characters. + + If this fails, assume 80. + + Borrowed and adapted from bzrlib. + + ''' + + width = 80 + + try: + s = struct.pack('HHHH', 0, 0, 0, 0) + x = fcntl.ioctl(self._terminal.fileno(), termios.TIOCGWINSZ, s) + width = struct.unpack('HHHH', x)[1] + except IOError: + pass + + return width + + def write(self, raw_data): + '''Write raw data to terminal. + + We ignore IOErrors for terminal output. + + ''' + + try: + self._terminal.write(raw_data) + self._terminal.flush() + except IOError: + pass diff --git a/without-tests b/without-tests index 24b6a56..b5e49da 100644 --- a/without-tests +++ b/without-tests @@ -1,5 +1,9 @@ -./ttystatus/__init__.py -./example.py -./ttystatus/widget.py -./setup.py -./doc/conf.py +example.py +example2.py +ttystatus/__init__.py +ttystatus/area.py +ttystatus/messager.py +ttystatus/tty.py +ttystatus/widget.py +setup.py +doc/conf.py |