summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLars Wirzenius <liw@liw.fi>2015-09-27 12:18:50 +0300
committerLars Wirzenius <liw@liw.fi>2015-09-27 13:00:38 +0300
commitdf2691178ddf2d9de9ecd65c4307da17885f61c6 (patch)
treeea5adb63422f244dd18b571b13ec1ad3994bef42
parentab009314c2acd9e520f2d23a085a41556d424df5 (diff)
downloadttystatus-df2691178ddf2d9de9ecd65c4307da17885f61c6.tar.gz
Refactor how messages are displayed
This should be much clearer code now. However, the old unit tests for Messager became obsolete. Did not write new ones, due to it being tedious. I am lazy. Mock me.
-rw-r--r--ttystatus/__init__.py2
-rw-r--r--ttystatus/area.py75
-rw-r--r--ttystatus/messager.py215
-rw-r--r--ttystatus/messager_tests.py121
-rw-r--r--ttystatus/status.py15
-rw-r--r--ttystatus/status_tests.py12
-rw-r--r--ttystatus/tty.py76
-rw-r--r--without-tests14
8 files changed, 246 insertions, 284 deletions
diff --git a/ttystatus/__init__.py b/ttystatus/__init__.py
index 2eab310..e88bdf5 100644
--- a/ttystatus/__init__.py
+++ b/ttystatus/__init__.py
@@ -13,6 +13,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+from .tty import PhysicalTerminal
+from .area import AreaManager
from .messager import Messager
from .status import TerminalStatus
from .widget import Widget
diff --git a/ttystatus/area.py b/ttystatus/area.py
new file mode 100644
index 0000000..d32a41b
--- /dev/null
+++ b/ttystatus/area.py
@@ -0,0 +1,75 @@
+# Copyright 2010-2011,2015 Lars Wirzenius
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+class AreaManager(object):
+
+ '''Manage the area on the terminal for displaying messages.'''
+
+ def __init__(self):
+ self._terminal = None
+
+ def set_terminal(self, terminal):
+ self._terminal = terminal
+
+ def get_max_line_length(self):
+ return self._terminal.get_width() - 1
+
+ def make_space(self, num_lines):
+ '''Make space for a message needing a given number of lines.
+
+ If the cursor is near the bottom of the terminal, scroll
+ things up so that there's space. Otherwise, effectively
+ nothing happens, except that the cursor is left at the last
+ line reserved for the message.
+
+ '''
+
+ self._terminal.write('\n' * (num_lines - 1))
+
+ def display(self, message):
+ '''Display a message, which may be on multiple lines.
+
+ The cursor is assumed to be at the last line of the message
+ area. Long lines are chopped at terminal width - 1.
+
+ '''
+
+ max_chars = self.get_max_line_length()
+ up = self._terminal.get_up_sequence()
+ down = self._terminal.get_down_sequence()
+ erase = self._terminal.get_erase_line_sequence()
+ lines = message.split('\n')
+
+ parts = [up * (len(lines) - 1)]
+ for i, line in enumerate(message.split('\n')):
+ if i > 0:
+ parts.append(down)
+ parts.append(erase)
+ parts.append(line[:max_chars])
+
+ self._terminal.write(''.join(parts))
+
+ def clear_area(self, num_lines):
+ '''Clear area reserved for message needing a given number of lines.
+
+ The cursor is assumed to be at the last line of the message
+ area and is left at the top.
+
+ '''
+
+ up = self._terminal.get_up_sequence()
+ erase = self._terminal.get_erase_line_sequence()
+ self._terminal.write((erase + up) * (num_lines - 1) + erase)
diff --git a/ttystatus/messager.py b/ttystatus/messager.py
index f4cd143..5f08690 100644
--- a/ttystatus/messager.py
+++ b/ttystatus/messager.py
@@ -14,162 +14,88 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import curses
-import fcntl
-import struct
-import termios
import time
+import ttystatus
+
class Messager(object):
- '''Write messages to the terminal.'''
+ '''Manages messages to the terminal.
- def __init__(self, output=None, period=None, open_tty=None,
- fake_width=False):
- self._enabled = True
- if output:
- self.output = output
- else:
- try:
- self.output = (open_tty or self._open_tty)()
- except IOError:
- self.output = None
+ This includes disabling messages, allowing notifications, and
+ becalming the flow of messages to avoid writing too fast. The
+ speed is a performance thing: writing too much message text can
+ slow an application down a lot (too much work for terminal
+ emulators), and doesn't actually help the user in any way.
+
+ '''
+
+ def __init__(self, period=None):
self._period = 1.0 if period is None else period
- self._last_time = 0 # When did we write last?
- self._cached_msg = '' # Last message from user, to write() method.
- self._first_output = True # is our next output the first one?
- self._fake_width = fake_width
- self.set_width(self._get_terminal_width()) # Width of terminal
- def _open_tty(self): # pragma: no cover
- f = open('/dev/tty', 'wb')
- curses.setupterm(None, f.fileno())
- return f
+ self._enabled = True
+
+ self._cached_message = None # The latest message from caller.
+ self._displayed_message = None # The latest message displayed.
+ self._previous_write_at = 0 # When the latest message was written.
+
+ self._terminal = ttystatus.PhysicalTerminal()
+ try:
+ self._terminal.open_tty()
+ except IOError:
+ self._enabled = False
- def set_width(self, actual_width):
- self.width = actual_width - 1
+ self._area = ttystatus.AreaManager()
+ self._area.set_terminal(self._terminal)
+
+ def disable(self):
+ '''Disable all output except notifications.'''
+ self._enabled = False
+
+ def enable(self):
+ '''Enable output to happen.'''
+ self._enabled = True
+
+ def time_to_write(self):
+ '''Is it time to write now?'''
+ return self._now() - self._previous_write_at >= self._period
def _now(self):
'''Return current time.'''
# This is a wrapper around time.time(), for testing.
return time.time()
- def _get_terminal_width(self): # pragma: no cover
- '''Return width of terminal in characters.
+ def get_max_line_length(self):
+ return self._area.get_max_line_length()
- If this fails, assume 80.
+ def write(self, message):
+ '''Write message to terminal.
- Borrowed and adapted from bzrlib.
+ Message may be multiple lines.
'''
- width = 80
- if self._fake_width:
- if hasattr(self, 'width'):
- width = self.width
- elif self.output is not None:
- # StringIO might not have fileno. We use StringIO for tests.
- fileno = getattr(self.output, 'fileno', None)
- if fileno is not None:
- try:
- s = struct.pack('HHHH', 0, 0, 0, 0)
- x = fcntl.ioctl(fileno(), termios.TIOCGWINSZ, s)
- width = struct.unpack('HHHH', x)[1]
- except IOError:
- pass
- return width
-
- def update_width(self): # pragma: no cover
- self.set_width(self._get_terminal_width())
-
- def _raw_write(self, string):
- '''Write raw data if output is terminal.'''
-
- if self._enabled and self.output and self.output.isatty():
- try:
- self.output.write(string)
- self.output.flush()
- except IOError: # pragma: no cover
- self._enabled = False
-
- def time_to_write(self):
- '''Is it time to write now?'''
- return self._now() - self._last_time >= self._period
-
- def write(self, string):
- '''Write raw data, always.'''
- if not string:
- return
- self.update_width()
- rows = string.split('\n')
-
- raw_parts = []
-
- if self._first_output:
- raw_parts.append('\n' * (len(rows) - 1))
- self._first_output = False
-
- if rows:
- up = curses.tparm(curses.tigetstr('cuu'), 1)
- down = curses.tparm(curses.tigetstr('cud'), 1)
- cr = curses.tigetstr('cr')
- el = curses.tigetstr('el')
-
- raw_parts.extend([
- up * (len(rows) - 1),
- cr,
- el,
- rows[0][:self.width],
- ])
- for row in rows[1:]:
- raw_parts.extend([
- down,
- cr,
- el,
- row[:self.width],
- ])
-
- raw = ''.join(raw_parts)
- self._raw_write(raw)
- self._cached_msg = string
- self._last_time = self._now()
+ if self._enabled and self.time_to_write():
+ self.clear()
+ num_lines = len(message.split('\n'))
+ self._area.make_space(num_lines)
+ self._area.display(message)
+ self._displayed_message = message
+ self._previous_write_at = self._now()
+ self._cached_message = message
def clear(self):
- '''Remove current message from terminal.'''
-
- if self._first_output:
- return
-
- rows = self._cached_msg.split('\n')
-
- raw_parts = []
-
- if rows:
- up = curses.tparm(curses.tigetstr('cuu'), 1)
- down = curses.tparm(curses.tigetstr('cud'), 1)
- cr = curses.tigetstr('cr')
- el = curses.tigetstr('el')
-
- raw_parts.extend([
- up * (len(rows) - 1),
- cr,
- el,
- ])
- for row in rows[1:]:
- raw_parts.extend([
- down,
- cr,
- el,
- ])
- raw_parts.extend([
- up * (len(rows) - 1),
- ])
-
- raw = ''.join(raw_parts)
- self._raw_write(raw)
-
- def notify(self, string, f, force=False):
+ '''Remove currently displayed message from terminal, if any.'''
+
+ if self._displayed_message is not None:
+ num_lines = len(self._displayed_message.split('\n'))
+ self._area.clear_area(num_lines)
+ self._displayed_message = None
+ self._cached_message = None
+ self._previous_write_at = 0 # Next .write() should display.
+
+ def notify(self, message, f, force=False):
'''Show a notification message string to the user.
Notifications are meant for error messages and other things
@@ -185,24 +111,17 @@ class Messager(object):
if self._enabled or force:
self.clear()
try:
- f.write('%s\n' % string)
+ f.write(message)
+ f.write('\n')
f.flush()
except IOError:
# We ignore these. No point in crashing if terminal is bad.
pass
- self._first_output = True
- self.write(self._cached_msg)
+ if self._cached_message is not None:
+ self.write(self._cached_message)
def finish(self):
'''Finalize output.'''
- if self._cached_msg:
- self.write(self._cached_msg)
- self._raw_write('\n')
-
- def disable(self):
- '''Disable all output.'''
- self._enabled = False
-
- def enable(self):
- '''Enable output to happen.'''
- self._enabled = True
+ if self._cached_message is not None:
+ self.write(self._cached_message)
+ self._terminal.write('\n')
diff --git a/ttystatus/messager_tests.py b/ttystatus/messager_tests.py
deleted file mode 100644
index 221be13..0000000
--- a/ttystatus/messager_tests.py
+++ /dev/null
@@ -1,121 +0,0 @@
-# Copyright 2010, 2011 Lars Wirzenius
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
-import StringIO
-import unittest
-
-import ttystatus
-
-
-class DummyTerminal(StringIO.StringIO):
-
- def isatty(self):
- return True
-
-
-class MessagerTests(unittest.TestCase):
-
- def setUp(self):
- self.output = DummyTerminal()
- self.messager = ttystatus.Messager(output=self.output, fake_width=True)
-
- def fast_time(self):
- return self.messager._last_time + self.messager._period
-
- def test_sets_output(self):
- self.assertEqual(self.messager.output, self.output)
-
- def test_handles_no_tty(self):
- def notty():
- raise IOError()
- m = ttystatus.Messager(open_tty=notty)
- self.assertEqual(m.output, None)
-
- def test_raw_writes_nothing_if_output_is_not_a_terminal(self):
- self.messager.output = StringIO.StringIO()
- self.messager._raw_write('foo')
- self.assertEqual(self.messager.output.getvalue(), '')
-
- def test_raw_writes_something_if_output_is_not_a_terminal(self):
- self.messager._raw_write('foo')
- self.assertEqual(self.output.getvalue(), 'foo')
-
- def test_knows_it_is_time_to_write_at_start(self):
- self.assert_(self.messager.time_to_write())
-
- def test_knows_it_is_not_time_to_write_right_after_previous_one(self):
- self.messager._last_time = self.messager._now()
- self.assertFalse(self.messager.time_to_write())
-
- def test_knows_it_is_time_to_write_after_a_period(self):
- self.messager._last_time = (
- self.messager._now() - self.messager._period * 2)
- self.assert_(self.messager.time_to_write())
-
- def test_cached_write_writes_first_thing(self):
- self.messager.write('foo')
- self.assertEqual(self.output.getvalue(), 'foo')
-
- def test_write_removes_old_message(self):
- self.messager._now = self.fast_time
- self.messager.write('foo')
- self.messager.write('bar')
- self.assertEqual(self.output.getvalue(), 'foo\r \rbar')
-
- def test_clear_removes_message(self):
- self.messager._now = lambda: self.messager._period + 1
- self.messager.write('foo')
- self.messager.clear()
- self.assertEqual(self.output.getvalue(), 'foo\r \r')
-
- def test_notify_removes_message_and_puts_it_back_afterwards(self):
- f = StringIO.StringIO()
- self.messager.write('foo')
- self.messager.notify('bar', f)
- self.assertEqual(self.output.getvalue(), 'foo\r \rfoo')
- self.assertEqual(f.getvalue(), 'bar\n')
-
- def test_notify_does_not_mind_ioerror(self):
- f = open('/dev/full', 'w')
- self.messager.write('foo')
- self.messager.notify('bar', f)
- self.assertEqual(self.output.getvalue(), 'foo\r \rfoo')
- f.close()
-
- def test_finish_flushes_unwritten_message(self):
- self.messager._now = lambda: 0
- self.messager.write('foo')
- self.messager.finish()
- self.assertEqual(self.output.getvalue(), 'foo\r \rfoo\n')
-
- def test_has_width(self):
- self.assertEqual(self.messager.width, 79)
-
- def test_write_truncates_at_one_less_than_width(self):
- self.messager.set_width(4)
- self.messager.write('foobar')
- self.assertEqual(self.output.getvalue(), 'foo')
-
- def test_disables_output(self):
- self.messager.disable()
- self.messager.write('foo')
- self.assertEqual(self.output.getvalue(), '')
-
- def test_enables_output(self):
- self.messager.disable()
- self.messager.enable()
- self.messager.write('foo')
- self.assertEqual(self.output.getvalue(), 'foo')
diff --git a/ttystatus/status.py b/ttystatus/status.py
index 0f57f74..18b3057 100644
--- a/ttystatus/status.py
+++ b/ttystatus/status.py
@@ -32,16 +32,20 @@ class TerminalStatus(object):
'''
- def __init__(self, output=None, period=None, messager=None):
- self._m = messager or ttystatus.Messager(output=output, period=period)
+ def __init__(self, period=None, messager=None):
+ self._m = messager or ttystatus.Messager(period=period)
self.clear()
def add(self, widget):
'''Add a new widget to the status display.'''
+ if not self._widget_rows:
+ self._widget_rows = [[]]
self._widget_rows[-1].append(widget)
def start_new_line(self): # pragma: no cover
'''Start a new line of widgets.'''
+ if not self._widget_rows:
+ self._widget_rows = [[]]
self._widget_rows.append([])
def format(self, format_string):
@@ -69,7 +73,7 @@ class TerminalStatus(object):
def clear(self):
'''Remove all widgets.'''
- self._widget_rows = [[]]
+ self._widget_rows = []
self._values = dict()
self._m.clear()
@@ -105,7 +109,8 @@ class TerminalStatus(object):
return '\n'.join(self._render_row(row) for row in self._widget_rows)
def _render_row(self, widget_row):
- remaining = self._m.width
+ max_chars = self._m.get_max_line_length()
+ remaining = max_chars
texts = [None] * len(widget_row)
@@ -119,7 +124,7 @@ class TerminalStatus(object):
texts[i] = w.render(remaining)
remaining -= len(texts[i])
- return (''.join(texts))[:self._m.width]
+ return (''.join(texts))[:max_chars]
def _write(self):
'''Render and output current state of all widgets.'''
diff --git a/ttystatus/status_tests.py b/ttystatus/status_tests.py
index 1cadfda..ecf91f9 100644
--- a/ttystatus/status_tests.py
+++ b/ttystatus/status_tests.py
@@ -22,11 +22,13 @@ import ttystatus
class DummyMessager(object):
- width = 80
-
def __init__(self):
self.written = StringIO.StringIO()
self.enabled = True
+ self.fake_width = 80
+
+ def get_max_line_length(self):
+ return self.fake_width
def clear(self):
pass
@@ -144,14 +146,14 @@ class TerminalStatusTests(unittest.TestCase):
self.ts.add(w1)
self.ts.add(w2)
text = self.ts._render()
- self.assertEqual(len(text), self.ts._m.width)
+ self.assertEqual(len(text), self.ts._m.get_max_line_length())
def test_renders_from_beginning_if_there_is_not_enough_space(self):
w1 = ttystatus.Literal('foo')
w2 = ttystatus.Literal('bar')
self.ts.add(w1)
self.ts.add(w2)
- self.ts._m.width = 4
+ self.ts._m.fake_width = 4
text = self.ts._render()
self.assertEqual(text, 'foob')
@@ -162,6 +164,6 @@ class TerminalStatusTests(unittest.TestCase):
self.ts.add(w1)
self.ts.add(w2)
self.ts.add(w3)
- self.ts._m.width = 9
+ self.ts._m.fake_width = 9
text = self.ts._render()
self.assertEqual(text, 'foo---bar')
diff --git a/ttystatus/tty.py b/ttystatus/tty.py
new file mode 100644
index 0000000..094cee9
--- /dev/null
+++ b/ttystatus/tty.py
@@ -0,0 +1,76 @@
+# Copyright 2010-2011,2015 Lars Wirzenius
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+import curses
+import fcntl
+import struct
+import termios
+
+
+class PhysicalTerminal(object):
+
+ '''Handle interaction with the physical terminal.'''
+
+ def __init__(self):
+ self._terminal = None
+
+ def open_tty(self):
+ self._terminal = open('/dev/tty', 'wb')
+ curses.setupterm(None, self._terminal.fileno())
+
+ def get_up_sequence(self):
+ return curses.tparm(curses.tigetstr('cuu'), 1)
+
+ def get_down_sequence(self):
+ return curses.tparm(curses.tigetstr('cud'), 1)
+
+ def get_erase_line_sequence(self):
+ cr = curses.tigetstr('cr')
+ el = curses.tigetstr('el')
+ return cr + el
+
+ def get_width(self):
+ '''Return width of terminal in characters.
+
+ If this fails, assume 80.
+
+ Borrowed and adapted from bzrlib.
+
+ '''
+
+ width = 80
+
+ try:
+ s = struct.pack('HHHH', 0, 0, 0, 0)
+ x = fcntl.ioctl(self._terminal.fileno(), termios.TIOCGWINSZ, s)
+ width = struct.unpack('HHHH', x)[1]
+ except IOError:
+ pass
+
+ return width
+
+ def write(self, raw_data):
+ '''Write raw data to terminal.
+
+ We ignore IOErrors for terminal output.
+
+ '''
+
+ try:
+ self._terminal.write(raw_data)
+ self._terminal.flush()
+ except IOError:
+ pass
diff --git a/without-tests b/without-tests
index 24b6a56..b5e49da 100644
--- a/without-tests
+++ b/without-tests
@@ -1,5 +1,9 @@
-./ttystatus/__init__.py
-./example.py
-./ttystatus/widget.py
-./setup.py
-./doc/conf.py
+example.py
+example2.py
+ttystatus/__init__.py
+ttystatus/area.py
+ttystatus/messager.py
+ttystatus/tty.py
+ttystatus/widget.py
+setup.py
+doc/conf.py