#!/usr/bin/python # # Copyright (C) 2009, 2010, 2011 Lars Wirzenius # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. '''Run some black box tests of obnam.''' import logging import os import random import re import shutil import stat import subprocess import sys import tempfile import traceback import unittest import obnamlib class ObnamTestCase(unittest.TestCase): '''Base class for obnam test cases. We use the unittest framework even though these are black box tests, not unit tests. unittest makes implementation of these black box tests convenient, even though that might not be true for all black box tests. This base class provides a fresh environment for each test, and cleans up afterwards. It provides helpers for doing the usual backup operations, and for verifyting results. ''' # These are the keys in test-gpghome. gpgkey = '3B1802F81B321347' gpgkey2 = 'DF3D13AA11E69900' def setUp(self): self.client_name = 'client_name' self.tempdir = tempfile.mkdtemp() self.data_dir = self.mkdir('data') if os.environ.get('OBNAM_TEST_SFTP_ROOT') == 'yes': self.data = 'sftp://localhost%s' % self.data_dir self.round_time = lambda t: float(int(t)) else: self.data = self.data_dir self.round_time = lambda t: t self.repo_dir = self.mkdir('repo') if os.environ.get('OBNAM_TEST_SFTP_REPOSITORY') == 'yes': self.repo = 'sftp://localhost%s' % self.repo_dir else: self.repo = self.repo_dir self.restored_dir = self.mkdir('restored') # restored_dir and url are always the same, since we can't do proper # restoreds over SFTP, so we always do them to the local filesystem. # This is because SFTP fails to support hardlink creation, mknod, # and perhaps other stuff. self.restored = self.restored_dir self.gpghome = os.path.join(self.tempdir, 'gpghome') shutil.copytree('test-gpghome', self.gpghome) self.setUpHook() def setUpHook(self): pass def tearDown(self): self.tearDownHook() shutil.rmtree(self.tempdir) def tearDownHook(self): pass def mkdir(self, dirname): abs_dirname = os.path.join(self.tempdir, dirname) os.makedirs(abs_dirname) return abs_dirname def runcmd(self, argv, stdin='', stderr_ignore=None): '''Run an external command. If the command fails (non-zero exit), raise an exception. If stderr_ignore is not None, it must be a string with a regexp for lines in stderr to ignore. ''' logging.debug('executing %s' % argv) env = dict(os.environ) env['GNUPGHOME'] = self.gpghome p = subprocess.Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env) stdout, stderr = p.communicate(stdin) if stderr_ignore: lines = [line for line in stderr.splitlines() if not re.match(stderr_ignore, line)] stderr = ''.join(lines) sys.stderr.write(stderr) if p.returncode != 0: raise subprocess.CalledProcessError(p.returncode, argv) return stdout def obnam(self, args, stderr_ignore=None): '''Run obnam, adding default options to args.''' return self.runcmd(['./obnam', '--quiet', '--log', 'blackboxtest-obnam.log', '--log-level', 'debug', '--trace', 'repo', '--trace', 'vfs', '--encrypt-with', self.gpgkey, '--weak-random', '--client-name', self.client_name] + args, stderr_ignore=stderr_ignore) def backup(self, roots=None, extraopts=[]): '''Back up the data directory to the repository.''' if roots is None: roots = [self.data] logging.debug('backing up %s to %s' % (roots, self.repo)) self.obnam(['--repository', self.repo, 'backup'] + extraopts + roots) self.obnam(['fsck', '--repository', self.repo]) def restore(self, pathnames=None, stderr_ignore=None): '''Restore the newest generation from a repository. Data is restored to self.restored. ''' logging.debug('restoring %s to %s' % (self.repo, self.restored)) self.obnam(['restore', '--to', self.restored, '--generation=latest', '--repository', self.repo] + (pathnames or []), stderr_ignore=stderr_ignore) def verify(self, stderr_ignore=None): '''Run 'obnam verify' on the repository.''' self.obnam(['verify', '--root', self.data, '--repository', self.repo], stderr_ignore=stderr_ignore) def generations(self): '''Return all generation ids.''' logging.debug('Looking up generations in %s' % self.repo) stdout = self.obnam(['generations', '--repository', self.repo]) return [line.split()[0] for line in stdout.splitlines()] def forget(self, genids=None, keep=None, pretend=False): '''Forget specific generations.''' logging.debug('Forgetting in %s' % self.repo) logging.debug('genids: %s' % genids) logging.debug('keep: %s' % keep) args = [] if genids is not None: args += genids if keep is not None: args += ['--keep', keep] if pretend: args += ['--pretend'] self.obnam(['forget', '--repository', self.repo] + args) def create_file(self, dirname, relative, contents): '''Create a new file with the desired contents.''' pathname = os.path.join(dirname, relative) logging.debug('creating file %s' % pathname) f = open(pathname, 'w') f.write(contents) f.close() return pathname def remove_file(self, root, relative): '''Remove a file.''' pathname = os.path.join(root, relative) logging.debug('removing file %s' % pathname) os.remove(pathname) def create_dir(self, root, pathname): '''Create a new directory, return name.''' fullname = os.path.join(root, pathname) logging.debug('mkdir %s' % fullname) os.makedirs(fullname) return fullname def get_info(self, root, pathname): '''Get the information about a given file. Return a tuple (relativepath, stat) where relativepath is the path relative to root, and stat is the result of os.lstat. ''' root_base = os.path.basename(root) del_prefix = root[:-len(root_base)] if pathname == root: return None assert pathname.startswith(root + os.sep), (pathname, root) return pathname[len(root + os.sep):], os.lstat(pathname) def find_everything(self, root): '''Find all filesystem objects inside a directory tree. Return list of (pathname, stat) tuples. The pathname will be relative to the root of the directory tree. The stat tuples will be the result of os.lstat for each pathname. ''' result = [] for dirname, dirnames, filenames in os.walk(root): result.append(self.get_info(root, dirname)) for filename in filenames: pathname = os.path.join(dirname, filename) result.append(self.get_info(root, pathname)) return [x for x in result if x] def open_repository(self): '''Open the repository.''' fs = obnamlib.LocalFS(self.repo_dir) s = obnamlib.Repository(fs, obnamlib.DEFAULT_NODE_SIZE, obnamlib.DEFAULT_UPLOAD_QUEUE_SIZE, obnamlib.DEFAULT_LRU_SIZE, None, obnamlib.IDPATH_DEPTH, obnamlib.IDPATH_BITS, obnamlib.IDPATH_SKIP) s.open_client(self.client_name) return s def assert_equal_stat_fields(self, filename, stat1, stat2, fieldname): field1 = getattr(stat1, fieldname) field2 = getattr(stat2, fieldname) msg = ('%s stat field %s difference: %s vs %s' % (filename, fieldname, repr(field1), repr(field2))) if type(field1) == float: field1 = self.round_time(field1) field2 = self.round_time(field2) self.assertAlmostEqual(field1, field2, places=5, msg=msg) else: self.assertEqual(field1, field2, msg=msg) def assert_same_stat(self, name, stat1, stat2): '''Are two stat results effectively identical?''' class Fake(object): def __init__(self, stat_result): self.st = stat_result def __getattr__(self, name): return getattr(self.st, name) self.assert_equal_stat_fields(name, stat1, stat2, 'st_blocks') self.assert_equal_stat_fields(name, stat1, stat2, 'st_gid') self.assert_equal_stat_fields(name, stat1, stat2, 'st_mode') self.assert_equal_stat_fields(name, Fake(stat1), Fake(stat2), 'st_mtime') if self.data == self.data_dir: # We can only check this when accessing the live data via the # local filesystem. SFTP (or paramiko) does not return st_nlink, # so we fake it, so this test always fails for them. self.assert_equal_stat_fields(name, stat1, stat2, 'st_nlink') self.assert_equal_stat_fields(name, stat1, stat2, 'st_size') self.assert_equal_stat_fields(name, stat1, stat2, 'st_uid') def assert_same_contents(self, relative, root1, root2): '''Verify that file contents has been restored correctly.''' path1 = os.path.join(root1, relative) path2 = os.path.join(root2, relative) self.assertFilesEqual(path1, path2) def assertFileExists(self, path): self.assert_(os.path.exists(path), '%s does not exist' % path) def assertIsRegularFile(self, path): self.assert_(os.path.isfile(path), '%s is not a regular file' % path) def assertFilesEqual(self, path1, path2): '''Verify that file contents are equal.''' self.assertFileExists(path1) self.assertFileExists(path2) self.assertIsRegularFile(path1) self.assertIsRegularFile(path2) f1 = open(path1, 'r') f2 = open(path2, 'r') data1 = f1.read() data2 = f2.read() f1.close() f2.close() self.assertEqual(data1, data2, 'contents of %s and %s differ' % (path1, path2)) def report_contents(self, dirname, contents): logging.debug('Contents of %s:' % dirname) for name in sorted(contents.keys()): logging.debug(' %s' % name) def assert_restored_correctly(self): '''Verify that a restored directory is identical to the original. Raise an exception for the first error found. We need to play a little trick. When we backup /foo/bar/foobar, the backup will contain entries for /, /foo, and /foo/bar, and when we restore the backup, it will put foobar into $TARGET/foo/bar/foobar. Thus, to verify data, we need to append data to restored. ''' origs = dict(self.find_everything(self.data_dir)) basename = os.path.basename(self.data_dir) restored2 = os.path.join(self.restored_dir, './' + self.data_dir) restoreds = dict(self.find_everything(restored2)) for name, orig_stat in origs.iteritems(): if name not in restoreds: logging.error('file %s not in restored data' % name) self.report_contents(self.data_dir, origs) self.report_contents(self.restored_dir, restoreds) raise Exception('%s not in restored data' % name) restored_stat = restoreds[name] self.assert_same_stat(name, orig_stat, restored_stat) if stat.S_ISREG(orig_stat.st_mode): self.assert_same_contents(name, self.data_dir, restored2) for name, restored_stat in restoreds.iteritems(): if name not in origs: logging.error('spurious file %s in restored data' % name) self.report_contents(self.data_dir, origs) self.report_contents(self.restored_dir, restoreds) raise cliapp.AppException('spurious %s in restored data' % name) def remove_encryption_metadata(self, parent, subdir): for x in ['key', 'userkeys']: pathname = os.path.join(parent, subdir, x) if os.path.exists(pathname): os.remove(pathname) class RestoreTests(ObnamTestCase): def setUpHook(self): self.create_file(self.data_dir, 'foo', 'foo') self.create_file(self.data_dir, 'bar', 'bar') self.backup() def test_restores_identical_data(self): self.restore() self.assert_restored_correctly() def test_restores_individual_file(self): bar_orig = os.path.join(self.data_dir, 'bar') self.restore([bar_orig]) restored_files = [os.path.join(self.restored_dir, x) for x, y in self.find_everything(self.restored_dir) if stat.S_ISREG(y.st_mode)] bar_restored = os.path.join(self.restored_dir, './' + bar_orig) bar_restored = os.path.normpath(bar_restored) self.assertEqual(restored_files, [bar_restored]) self.assertFilesEqual(bar_orig, bar_restored) def test_restores_sparse_file(self): pathname = os.path.join(self.data_dir, 'sparse') f = open(pathname, 'wb') f.seek(1000**2) f.write('x') f.close() self.backup() self.restore() restored2 = os.path.join(self.restored_dir, './' + self.data_dir) self.assert_same_contents('sparse', self.data_dir, restored2) def test_restores_pipe(self): pathname = os.path.join(self.data_dir, 'pipe') os.mknod(pathname, 0600 | stat.S_IFIFO) self.backup() self.restore() self.assert_restored_correctly() def test_restores_socket(self): pathname = os.path.join(self.data_dir, 'socket') os.mknod(pathname, 0600 | stat.S_IFSOCK) self.backup() self.restore() self.assert_restored_correctly() def _mangle_chunks(self): chunkdir = os.path.join(self.repo_dir, 'chunks') for dirname, subdirs, basenames in os.walk(chunkdir): basenames = [x for x in basenames if x != 'key' and x != 'userkeys'] for chunk in [os.path.join(dirname, x) for x in basenames]: os.remove(chunk) self.runcmd(['gpg', '-e', '-r', self.gpgkey, '-o', chunk], stdin='meh', stderr_ignore='.*') def test_fails_if_chunk_is_corrupted(self): self._mangle_chunks() self.assertRaises(subprocess.CalledProcessError, self.restore, stderr_ignore='.*') def test_fails_if_file_checksum_is_incorrect(self): # Remove the chunk checksum list, and then modify the chunks, # so that we can do a restore without triggering "bad chunk checksum" # errors. We only want to trigger the whole-file checksum error. shutil.rmtree(os.path.join(self.repo_dir, 'chunklist', 'nodes')) shutil.rmtree(os.path.join(self.repo_dir, 'chunklist', 'refcounts')) os.remove(os.path.join(self.repo_dir, 'chunklist', 'metadata')) self._mangle_chunks() self.assertRaises(subprocess.CalledProcessError, self.restore, stderr_ignore='^ERROR: There were errors.*') class BackupTests(ObnamTestCase): def test_makes_two_generations(self): self.create_file(self.data_dir, 'foo', 'foo') self.backup() self.create_file(self.data_dir, 'bar', 'bar') self.remove_file(self.data_dir, 'foo') self.backup() self.restore() self.assert_restored_correctly() def test_handles_two_roots(self): root1 = self.create_dir(self.data_dir, 'root1') root2 = self.create_dir(self.data_dir, 'root2') self.create_file(root1, 'file1', 'content1') self.create_file(root2, 'file2', 'content2') self.backup([root1, root2]) self.restore() self.assert_restored_correctly() def test_handles_symlink(self): self.create_file(self.data_dir, 'target', 'content1') os.symlink('target', os.path.join(self.data_dir, 'symlink')) self.backup() self.restore() self.assert_restored_correctly() def test_handles_dangling_symlink(self): os.symlink('target', os.path.join(self.data_dir, 'symlink')) self.backup() self.restore() self.assert_restored_correctly() def test_handles_hardlink(self): self.create_file(self.data_dir, 'target', 'content1') os.link(os.path.join(self.data_dir, 'target'), os.path.join(self.data_dir, 'hardlink')) self.backup() self.restore() self.assert_restored_correctly() def test_does_not_include_roots_from_old_gens_unless_specified_again(self): foo = self.create_dir(self.data_dir, 'foo') self.backup(roots=[foo]) bar = self.create_dir(self.data_dir, 'bar') self.backup(roots=[bar]) self.restore() parent = os.path.join(self.restored_dir, './' + self.data_dir) self.assertEqual(os.listdir(parent), ['bar']) def test_excludes_cache_directory(self): cachedir = self.mkdir(os.path.join(self.data_dir, 'cache')) self.create_file(cachedir, 'CACHEDIR.TAG', 'Signature: 8a477f597d28d172789f06886806bc55') self.create_file(cachedir, 'foo', 'foo') self.backup(extraopts=['--exclude-caches']) self.restore() x = os.path.join(self.restored_dir, './' + cachedir) self.assertFalse(os.path.exists(os.path.join(x, 'CACHEDIR.TAG'))) self.assertFalse(os.path.exists(os.path.join(x, 'foo'))) def test_makes_repository_files_have_correct_perms(self): self.create_file(self.data_dir, 'foo', 'foo') self.backup() for path, st in self.find_everything(self.repo_dir): perms = stat.S_IMODE(st.st_mode) if stat.S_ISREG(st.st_mode): self.assertEqual(perms, stat.S_IRUSR, 'file perms must be readonly for %s not %o' % (path, perms)) def test_skips_unreadable_directory_but_backs_up_rest(self): self.create_file(self.data_dir, 'aaa', 'aaa') bbb = os.path.join(self.data_dir, 'bbb') os.mkdir(bbb, 0) self.create_file(self.data_dir, 'ccc', 'ccc') self.backup() self.restore() # Remove the problematic directory so that verify works. # Don't do this if running as root, since in that case # obnam _can_ back it up. (Yes, this is convoluted.) if os.getuid() != 0: os.rmdir(bbb) self.assert_restored_correctly() def test_skips_unreadable_file_but_backs_up_rest(self): self.create_file(self.data_dir, 'aaa', 'aaa') bbb = self.create_file(self.data_dir, 'bbb', 'bbb') os.chmod(bbb, 0) self.create_file(self.data_dir, 'ccc', 'ccc') self.backup() self.restore() # Remove the problematic directory so that verify works. # Don't do this if running as root, since in that case # obnam _can_ back it up. (Yes, this is convoluted.) if os.getuid() != 0: os.remove(bbb) self.assert_restored_correctly() class VerifyTests(ObnamTestCase): def test_accepts_unchanged_backup(self): self.create_file(self.data_dir, 'foo', 'foo') self.create_file(self.data_dir, 'bar', 'bar') self.backup() self.verify() def test_notices_changed_data(self): self.create_file(self.data_dir, 'foo', 'foo') self.create_file(self.data_dir, 'bar', 'bar') self.backup() self.remove_file(self.data_dir, 'foo') self.remove_file(self.data_dir, 'bar') self.create_file(self.data_dir, 'foo', 'changed data') self.assertRaises(subprocess.CalledProcessError, self.verify, r'^Error: verify failure') class ForgetTests(ObnamTestCase): def setUpHook(self): self.create_file(self.data_dir, 'foo', 'foo') self.create_file(self.data_dir, 'bar', 'bar') self.backup() self.backup() def test_removes_nothing_when_given_no_args(self): genids = self.generations() self.forget() self.assertEqual(genids, self.generations()) def test_removes_specified_generations(self): genids = self.generations() self.forget(genids=genids[:1]) self.assertEqual(genids[1:], self.generations()) def test_removes_according_to_policy(self): genids = self.generations() self.forget(keep='1d') self.assertEqual(genids[1:], self.generations()) def test_pretends(self): genids = self.generations() self.forget(keep='1d', pretend=True) self.assertEqual(genids, self.generations()) def disk_usage(self, root): usage = 0 fs = obnamlib.LocalFS(root) for pathname, st in fs.scan_tree('.'): if stat.S_ISREG(st.st_mode): usage += st.st_blocks * 512 return usage def random_string(self, size): return ''.join(chr(random.randint(0, 255)) for i in xrange(size)) def test_removes_unwanted_data(self): self.create_file(self.data_dir, 'big', self.random_string(1024**2)) self.backup() self.forget(genids=self.generations()) self.remove_encryption_metadata(self.repo_dir, 'chunks') chunks = os.path.join(self.repo_dir, 'chunks') self.assertEqual(self.disk_usage(chunks), 0) def test_removes_unwanted_data_with_empty_generation_remaining(self): self.create_file(self.data_dir, 'big', self.random_string(1024**2)) self.backup() shutil.rmtree(self.data_dir) os.mkdir(self.data_dir) self.backup() genids = self.generations() forgettable = genids[:-1] self.forget(genids=forgettable) self.remove_encryption_metadata(self.repo_dir, 'chunks') chunks = os.path.join(self.repo_dir, 'chunks') self.assertEqual(self.disk_usage(chunks), 0) class EncryptionTests(ObnamTestCase): def setUpHook(self): self.create_file(self.data_dir, 'foo', 'foo') self.backup() def client_keys(self): output = self.obnam(['client-keys', '--repository', self.repo]) return [tuple(line.split()) for line in output.splitlines()] def list_keys(self): output = self.obnam(['list-keys', '--repository', self.repo]) keys = dict() latest_key = None for line in output.splitlines(): if line.startswith('key:'): latest_key = line.split()[1] else: keys[latest_key] = keys.get(latest_key, []) + [line.strip()] for key in keys: keys[key].sort() return keys def test_has_client_key_after_backup(self): self.assertEqual(self.client_keys(), [(self.client_name, self.gpgkey)]) def test_removes_client(self): self.obnam(['remove-client', '--repository', self.repo, self.client_name]) self.assertEqual(self.client_keys(), []) def test_only_client_key_listed_initially(self): self.assertEqual(self.list_keys().keys(), [self.gpgkey]) def test_adds_key(self): self.obnam(['add-key', '--keyid', self.gpgkey2, '--repository', self.repo]) self.assertEqual(sorted(self.list_keys().keys()), sorted([self.gpgkey, self.gpgkey2])) def test_removes_key(self): self.obnam(['add-key', '--keyid', self.gpgkey2, '--repository', self.repo]) self.obnam(['remove-key', '--keyid', self.gpgkey2, '--repository', self.repo]) self.assertEqual(self.list_keys().keys(), [self.gpgkey]) if __name__ == '__main__': logging.basicConfig(filename='blackboxtest.log', level=logging.DEBUG, format='%(levelname)s: %(message)s') unittest.main()