summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ttystatus/__init__.py2
-rw-r--r--ttystatus/area.py75
-rw-r--r--ttystatus/messager.py215
-rw-r--r--ttystatus/messager_tests.py121
-rw-r--r--ttystatus/status.py15
-rw-r--r--ttystatus/status_tests.py12
-rw-r--r--ttystatus/tty.py76
-rw-r--r--without-tests14
8 files changed, 246 insertions, 284 deletions
diff --git a/ttystatus/__init__.py b/ttystatus/__init__.py
index 2eab310..e88bdf5 100644
--- a/ttystatus/__init__.py
+++ b/ttystatus/__init__.py
@@ -13,6 +13,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+from .tty import PhysicalTerminal
+from .area import AreaManager
from .messager import Messager
from .status import TerminalStatus
from .widget import Widget
diff --git a/ttystatus/area.py b/ttystatus/area.py
new file mode 100644
index 0000000..d32a41b
--- /dev/null
+++ b/ttystatus/area.py
@@ -0,0 +1,75 @@
+# Copyright 2010-2011,2015 Lars Wirzenius
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+class AreaManager(object):
+
+ '''Manage the area on the terminal for displaying messages.'''
+
+ def __init__(self):
+ self._terminal = None
+
+ def set_terminal(self, terminal):
+ self._terminal = terminal
+
+ def get_max_line_length(self):
+ return self._terminal.get_width() - 1
+
+ def make_space(self, num_lines):
+ '''Make space for a message needing a given number of lines.
+
+ If the cursor is near the bottom of the terminal, scroll
+ things up so that there's space. Otherwise, effectively
+ nothing happens, except that the cursor is left at the last
+ line reserved for the message.
+
+ '''
+
+ self._terminal.write('\n' * (num_lines - 1))
+
+ def display(self, message):
+ '''Display a message, which may be on multiple lines.
+
+ The cursor is assumed to be at the last line of the message
+ area. Long lines are chopped at terminal width - 1.
+
+ '''
+
+ max_chars = self.get_max_line_length()
+ up = self._terminal.get_up_sequence()
+ down = self._terminal.get_down_sequence()
+ erase = self._terminal.get_erase_line_sequence()
+ lines = message.split('\n')
+
+ parts = [up * (len(lines) - 1)]
+ for i, line in enumerate(message.split('\n')):
+ if i > 0:
+ parts.append(down)
+ parts.append(erase)
+ parts.append(line[:max_chars])
+
+ self._terminal.write(''.join(parts))
+
+ def clear_area(self, num_lines):
+ '''Clear area reserved for message needing a given number of lines.
+
+ The cursor is assumed to be at the last line of the message
+ area and is left at the top.
+
+ '''
+
+ up = self._terminal.get_up_sequence()
+ erase = self._terminal.get_erase_line_sequence()
+ self._terminal.write((erase + up) * (num_lines - 1) + erase)
diff --git a/ttystatus/messager.py b/ttystatus/messager.py
index f4cd143..5f08690 100644
--- a/ttystatus/messager.py
+++ b/ttystatus/messager.py
@@ -14,162 +14,88 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import curses
-import fcntl
-import struct
-import termios
import time
+import ttystatus
+
class Messager(object):
- '''Write messages to the terminal.'''
+ '''Manages messages to the terminal.
- def __init__(self, output=None, period=None, open_tty=None,
- fake_width=False):
- self._enabled = True
- if output:
- self.output = output
- else:
- try:
- self.output = (open_tty or self._open_tty)()
- except IOError:
- self.output = None
+ This includes disabling messages, allowing notifications, and
+ becalming the flow of messages to avoid writing too fast. The
+ speed is a performance thing: writing too much message text can
+ slow an application down a lot (too much work for terminal
+ emulators), and doesn't actually help the user in any way.
+
+ '''
+
+ def __init__(self, period=None):
self._period = 1.0 if period is None else period
- self._last_time = 0 # When did we write last?
- self._cached_msg = '' # Last message from user, to write() method.
- self._first_output = True # is our next output the first one?
- self._fake_width = fake_width
- self.set_width(self._get_terminal_width()) # Width of terminal
- def _open_tty(self): # pragma: no cover
- f = open('/dev/tty', 'wb')
- curses.setupterm(None, f.fileno())
- return f
+ self._enabled = True
+
+ self._cached_message = None # The latest message from caller.
+ self._displayed_message = None # The latest message displayed.
+ self._previous_write_at = 0 # When the latest message was written.
+
+ self._terminal = ttystatus.PhysicalTerminal()
+ try:
+ self._terminal.open_tty()
+ except IOError:
+ self._enabled = False
- def set_width(self, actual_width):
- self.width = actual_width - 1
+ self._area = ttystatus.AreaManager()
+ self._area.set_terminal(self._terminal)
+
+ def disable(self):
+ '''Disable all output except notifications.'''
+ self._enabled = False
+
+ def enable(self):
+ '''Enable output to happen.'''
+ self._enabled = True
+
+ def time_to_write(self):
+ '''Is it time to write now?'''
+ return self._now() - self._previous_write_at >= self._period
def _now(self):
'''Return current time.'''
# This is a wrapper around time.time(), for testing.
return time.time()
- def _get_terminal_width(self): # pragma: no cover
- '''Return width of terminal in characters.
+ def get_max_line_length(self):
+ return self._area.get_max_line_length()
- If this fails, assume 80.
+ def write(self, message):
+ '''Write message to terminal.
- Borrowed and adapted from bzrlib.
+ Message may be multiple lines.
'''
- width = 80
- if self._fake_width:
- if hasattr(self, 'width'):
- width = self.width
- elif self.output is not None:
- # StringIO might not have fileno. We use StringIO for tests.
- fileno = getattr(self.output, 'fileno', None)
- if fileno is not None:
- try:
- s = struct.pack('HHHH', 0, 0, 0, 0)
- x = fcntl.ioctl(fileno(), termios.TIOCGWINSZ, s)
- width = struct.unpack('HHHH', x)[1]
- except IOError:
- pass
- return width
-
- def update_width(self): # pragma: no cover
- self.set_width(self._get_terminal_width())
-
- def _raw_write(self, string):
- '''Write raw data if output is terminal.'''
-
- if self._enabled and self.output and self.output.isatty():
- try:
- self.output.write(string)
- self.output.flush()
- except IOError: # pragma: no cover
- self._enabled = False
-
- def time_to_write(self):
- '''Is it time to write now?'''
- return self._now() - self._last_time >= self._period
-
- def write(self, string):
- '''Write raw data, always.'''
- if not string:
- return
- self.update_width()
- rows = string.split('\n')
-
- raw_parts = []
-
- if self._first_output:
- raw_parts.append('\n' * (len(rows) - 1))
- self._first_output = False
-
- if rows:
- up = curses.tparm(curses.tigetstr('cuu'), 1)
- down = curses.tparm(curses.tigetstr('cud'), 1)
- cr = curses.tigetstr('cr')
- el = curses.tigetstr('el')
-
- raw_parts.extend([
- up * (len(rows) - 1),
- cr,
- el,
- rows[0][:self.width],
- ])
- for row in rows[1:]:
- raw_parts.extend([
- down,
- cr,
- el,
- row[:self.width],
- ])
-
- raw = ''.join(raw_parts)
- self._raw_write(raw)
- self._cached_msg = string
- self._last_time = self._now()
+ if self._enabled and self.time_to_write():
+ self.clear()
+ num_lines = len(message.split('\n'))
+ self._area.make_space(num_lines)
+ self._area.display(message)
+ self._displayed_message = message
+ self._previous_write_at = self._now()
+ self._cached_message = message
def clear(self):
- '''Remove current message from terminal.'''
-
- if self._first_output:
- return
-
- rows = self._cached_msg.split('\n')
-
- raw_parts = []
-
- if rows:
- up = curses.tparm(curses.tigetstr('cuu'), 1)
- down = curses.tparm(curses.tigetstr('cud'), 1)
- cr = curses.tigetstr('cr')
- el = curses.tigetstr('el')
-
- raw_parts.extend([
- up * (len(rows) - 1),
- cr,
- el,
- ])
- for row in rows[1:]:
- raw_parts.extend([
- down,
- cr,
- el,
- ])
- raw_parts.extend([
- up * (len(rows) - 1),
- ])
-
- raw = ''.join(raw_parts)
- self._raw_write(raw)
-
- def notify(self, string, f, force=False):
+ '''Remove currently displayed message from terminal, if any.'''
+
+ if self._displayed_message is not None:
+ num_lines = len(self._displayed_message.split('\n'))
+ self._area.clear_area(num_lines)
+ self._displayed_message = None
+ self._cached_message = None
+ self._previous_write_at = 0 # Next .write() should display.
+
+ def notify(self, message, f, force=False):
'''Show a notification message string to the user.
Notifications are meant for error messages and other things
@@ -185,24 +111,17 @@ class Messager(object):
if self._enabled or force:
self.clear()
try:
- f.write('%s\n' % string)
+ f.write(message)
+ f.write('\n')
f.flush()
except IOError:
# We ignore these. No point in crashing if terminal is bad.
pass
- self._first_output = True
- self.write(self._cached_msg)
+ if self._cached_message is not None:
+ self.write(self._cached_message)
def finish(self):
'''Finalize output.'''
- if self._cached_msg:
- self.write(self._cached_msg)
- self._raw_write('\n')
-
- def disable(self):
- '''Disable all output.'''
- self._enabled = False
-
- def enable(self):
- '''Enable output to happen.'''
- self._enabled = True
+ if self._cached_message is not None:
+ self.write(self._cached_message)
+ self._terminal.write('\n')
diff --git a/ttystatus/messager_tests.py b/ttystatus/messager_tests.py
deleted file mode 100644
index 221be13..0000000
--- a/ttystatus/messager_tests.py
+++ /dev/null
@@ -1,121 +0,0 @@
-# Copyright 2010, 2011 Lars Wirzenius
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
-import StringIO
-import unittest
-
-import ttystatus
-
-
-class DummyTerminal(StringIO.StringIO):
-
- def isatty(self):
- return True
-
-
-class MessagerTests(unittest.TestCase):
-
- def setUp(self):
- self.output = DummyTerminal()
- self.messager = ttystatus.Messager(output=self.output, fake_width=True)
-
- def fast_time(self):
- return self.messager._last_time + self.messager._period
-
- def test_sets_output(self):
- self.assertEqual(self.messager.output, self.output)
-
- def test_handles_no_tty(self):
- def notty():
- raise IOError()
- m = ttystatus.Messager(open_tty=notty)
- self.assertEqual(m.output, None)
-
- def test_raw_writes_nothing_if_output_is_not_a_terminal(self):
- self.messager.output = StringIO.StringIO()
- self.messager._raw_write('foo')
- self.assertEqual(self.messager.output.getvalue(), '')
-
- def test_raw_writes_something_if_output_is_not_a_terminal(self):
- self.messager._raw_write('foo')
- self.assertEqual(self.output.getvalue(), 'foo')
-
- def test_knows_it_is_time_to_write_at_start(self):
- self.assert_(self.messager.time_to_write())
-
- def test_knows_it_is_not_time_to_write_right_after_previous_one(self):
- self.messager._last_time = self.messager._now()
- self.assertFalse(self.messager.time_to_write())
-
- def test_knows_it_is_time_to_write_after_a_period(self):
- self.messager._last_time = (
- self.messager._now() - self.messager._period * 2)
- self.assert_(self.messager.time_to_write())
-
- def test_cached_write_writes_first_thing(self):
- self.messager.write('foo')
- self.assertEqual(self.output.getvalue(), 'foo')
-
- def test_write_removes_old_message(self):
- self.messager._now = self.fast_time
- self.messager.write('foo')
- self.messager.write('bar')
- self.assertEqual(self.output.getvalue(), 'foo\r \rbar')
-
- def test_clear_removes_message(self):
- self.messager._now = lambda: self.messager._period + 1
- self.messager.write('foo')
- self.messager.clear()
- self.assertEqual(self.output.getvalue(), 'foo\r \r')
-
- def test_notify_removes_message_and_puts_it_back_afterwards(self):
- f = StringIO.StringIO()
- self.messager.write('foo')
- self.messager.notify('bar', f)
- self.assertEqual(self.output.getvalue(), 'foo\r \rfoo')
- self.assertEqual(f.getvalue(), 'bar\n')
-
- def test_notify_does_not_mind_ioerror(self):
- f = open('/dev/full', 'w')
- self.messager.write('foo')
- self.messager.notify('bar', f)
- self.assertEqual(self.output.getvalue(), 'foo\r \rfoo')
- f.close()
-
- def test_finish_flushes_unwritten_message(self):
- self.messager._now = lambda: 0
- self.messager.write('foo')
- self.messager.finish()
- self.assertEqual(self.output.getvalue(), 'foo\r \rfoo\n')
-
- def test_has_width(self):
- self.assertEqual(self.messager.width, 79)
-
- def test_write_truncates_at_one_less_than_width(self):
- self.messager.set_width(4)
- self.messager.write('foobar')
- self.assertEqual(self.output.getvalue(), 'foo')
-
- def test_disables_output(self):
- self.messager.disable()
- self.messager.write('foo')
- self.assertEqual(self.output.getvalue(), '')
-
- def test_enables_output(self):
- self.messager.disable()
- self.messager.enable()
- self.messager.write('foo')
- self.assertEqual(self.output.getvalue(), 'foo')
diff --git a/ttystatus/status.py b/ttystatus/status.py
index 0f57f74..18b3057 100644
--- a/ttystatus/status.py
+++ b/ttystatus/status.py
@@ -32,16 +32,20 @@ class TerminalStatus(object):
'''
- def __init__(self, output=None, period=None, messager=None):
- self._m = messager or ttystatus.Messager(output=output, period=period)
+ def __init__(self, period=None, messager=None):
+ self._m = messager or ttystatus.Messager(period=period)
self.clear()
def add(self, widget):
'''Add a new widget to the status display.'''
+ if not self._widget_rows:
+ self._widget_rows = [[]]
self._widget_rows[-1].append(widget)
def start_new_line(self): # pragma: no cover
'''Start a new line of widgets.'''
+ if not self._widget_rows:
+ self._widget_rows = [[]]
self._widget_rows.append([])
def format(self, format_string):
@@ -69,7 +73,7 @@ class TerminalStatus(object):
def clear(self):
'''Remove all widgets.'''
- self._widget_rows = [[]]
+ self._widget_rows = []
self._values = dict()
self._m.clear()
@@ -105,7 +109,8 @@ class TerminalStatus(object):
return '\n'.join(self._render_row(row) for row in self._widget_rows)
def _render_row(self, widget_row):
- remaining = self._m.width
+ max_chars = self._m.get_max_line_length()
+ remaining = max_chars
texts = [None] * len(widget_row)
@@ -119,7 +124,7 @@ class TerminalStatus(object):
texts[i] = w.render(remaining)
remaining -= len(texts[i])
- return (''.join(texts))[:self._m.width]
+ return (''.join(texts))[:max_chars]
def _write(self):
'''Render and output current state of all widgets.'''
diff --git a/ttystatus/status_tests.py b/ttystatus/status_tests.py
index 1cadfda..ecf91f9 100644
--- a/ttystatus/status_tests.py
+++ b/ttystatus/status_tests.py
@@ -22,11 +22,13 @@ import ttystatus
class DummyMessager(object):
- width = 80
-
def __init__(self):
self.written = StringIO.StringIO()
self.enabled = True
+ self.fake_width = 80
+
+ def get_max_line_length(self):
+ return self.fake_width
def clear(self):
pass
@@ -144,14 +146,14 @@ class TerminalStatusTests(unittest.TestCase):
self.ts.add(w1)
self.ts.add(w2)
text = self.ts._render()
- self.assertEqual(len(text), self.ts._m.width)
+ self.assertEqual(len(text), self.ts._m.get_max_line_length())
def test_renders_from_beginning_if_there_is_not_enough_space(self):
w1 = ttystatus.Literal('foo')
w2 = ttystatus.Literal('bar')
self.ts.add(w1)
self.ts.add(w2)
- self.ts._m.width = 4
+ self.ts._m.fake_width = 4
text = self.ts._render()
self.assertEqual(text, 'foob')
@@ -162,6 +164,6 @@ class TerminalStatusTests(unittest.TestCase):
self.ts.add(w1)
self.ts.add(w2)
self.ts.add(w3)
- self.ts._m.width = 9
+ self.ts._m.fake_width = 9
text = self.ts._render()
self.assertEqual(text, 'foo---bar')
diff --git a/ttystatus/tty.py b/ttystatus/tty.py
new file mode 100644
index 0000000..094cee9
--- /dev/null
+++ b/ttystatus/tty.py
@@ -0,0 +1,76 @@
+# Copyright 2010-2011,2015 Lars Wirzenius
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+import curses
+import fcntl
+import struct
+import termios
+
+
+class PhysicalTerminal(object):
+
+ '''Handle interaction with the physical terminal.'''
+
+ def __init__(self):
+ self._terminal = None
+
+ def open_tty(self):
+ self._terminal = open('/dev/tty', 'wb')
+ curses.setupterm(None, self._terminal.fileno())
+
+ def get_up_sequence(self):
+ return curses.tparm(curses.tigetstr('cuu'), 1)
+
+ def get_down_sequence(self):
+ return curses.tparm(curses.tigetstr('cud'), 1)
+
+ def get_erase_line_sequence(self):
+ cr = curses.tigetstr('cr')
+ el = curses.tigetstr('el')
+ return cr + el
+
+ def get_width(self):
+ '''Return width of terminal in characters.
+
+ If this fails, assume 80.
+
+ Borrowed and adapted from bzrlib.
+
+ '''
+
+ width = 80
+
+ try:
+ s = struct.pack('HHHH', 0, 0, 0, 0)
+ x = fcntl.ioctl(self._terminal.fileno(), termios.TIOCGWINSZ, s)
+ width = struct.unpack('HHHH', x)[1]
+ except IOError:
+ pass
+
+ return width
+
+ def write(self, raw_data):
+ '''Write raw data to terminal.
+
+ We ignore IOErrors for terminal output.
+
+ '''
+
+ try:
+ self._terminal.write(raw_data)
+ self._terminal.flush()
+ except IOError:
+ pass
diff --git a/without-tests b/without-tests
index 24b6a56..b5e49da 100644
--- a/without-tests
+++ b/without-tests
@@ -1,5 +1,9 @@
-./ttystatus/__init__.py
-./example.py
-./ttystatus/widget.py
-./setup.py
-./doc/conf.py
+example.py
+example2.py
+ttystatus/__init__.py
+ttystatus/area.py
+ttystatus/messager.py
+ttystatus/tty.py
+ttystatus/widget.py
+setup.py
+doc/conf.py