#!/usr/bin/python # # Copyright (C) 2009, 2010 Lars Wirzenius # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. '''Run some black box tests of obnam.''' import logging import os import random import re import shutil import stat import subprocess import sys import tempfile import traceback import unittest import obnamlib class ObnamTestCase(unittest.TestCase): '''Base class for obnam test cases. We use the unittest framework even though these are black box tests, not unit tests. unittest makes implementation of these black box tests convenient, even though that might not be true for all black box tests. This base class provides a fresh environment for each test, and cleans up afterwards. It provides helpers for doing the usual backup operations, and for verifyting results. ''' # These are the keys in test-gpghome. gpgkey = '3B1802F81B321347' gpgkey2 = 'DF3D13AA11E69900' def setUp(self): self.client_name = 'client_name' self.tempdir = tempfile.mkdtemp() self.data = self.mkdir('data') self.repo = self.mkdir('repo') self.restored = self.mkdir('restored') self.gpghome = os.path.join(self.tempdir, 'gpghome') shutil.copytree('test-gpghome', self.gpghome) self.setUpHook() def setUpHook(self): pass def tearDown(self): self.tearDownHook() shutil.rmtree(self.tempdir) def tearDownHook(self): pass def mkdir(self, dirname): abs_dirname = os.path.join(self.tempdir, dirname) os.makedirs(abs_dirname) return abs_dirname def runcmd(self, argv, stderr_ignore=None): '''Run an external command. If the command fails (non-zero exit), raise an exception. If stderr_ignore is not None, it must be a string with a regexp for lines in stderr to ignore. ''' logging.debug('executing %s' % argv) env = dict(os.environ) env['GNUPGHOME'] = self.gpghome p = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env) stdout, stderr = p.communicate() if stderr_ignore: lines = [line for line in stderr.splitlines() if not re.match(stderr_ignore, line)] stderr = ''.join(lines) sys.stderr.write(stderr) if p.returncode != 0: raise subprocess.CalledProcessError(p.returncode, argv) return stdout def obnam(self, args, stderr_ignore=None): '''Run obnam, adding default options to args.''' return self.runcmd(['./obnam', '--quiet', '--log', 'blackboxtest-obnam.log', '--log-level', 'debug', '--encrypt-with', self.gpgkey, '--weak-random', '--client-name', self.client_name] + args, stderr_ignore=stderr_ignore) def backup(self, roots=None): '''Back up the data directory to the repository.''' if roots is None: roots = [self.data] logging.debug('backing up %s to %s' % (roots, self.repo)) self.obnam(['--repository', self.repo, 'backup'] + roots) self.obnam(['fsck', '--repository', self.repo]) def restore(self, pathnames=None): '''Restore the newest generation from a repository. Data is restored to self.restored. ''' logging.debug('restoring %s to %s' % (self.repo, self.restored)) self.obnam(['restore', '--to', self.restored, '--generation=latest', '--repository', self.repo] + (pathnames or [])) def verify(self, stderr_ignore=None): '''Run 'obnam verify' on the repository.''' self.obnam(['verify', '--root', self.data, '--repository', self.repo], stderr_ignore=stderr_ignore) def generations(self): '''Return all generation ids.''' logging.debug('Looking up generations in %s' % self.repo) stdout = self.obnam(['generations', '--repository', self.repo]) return [line.split()[0] for line in stdout.splitlines()] def forget(self, genids=None, keep=None, pretend=False): '''Forget specific generations.''' logging.debug('Forgetting in %s' % self.repo) logging.debug('genids: %s' % genids) logging.debug('keep: %s' % keep) args = [] if genids is not None: args += genids if keep is not None: args += ['--keep', keep] if pretend: args += ['--pretend'] self.obnam(['forget', '--repository', self.repo] + args) def create_file(self, dirname, relative, contents): '''Create a new file with the desired contents.''' pathname = os.path.join(dirname, relative) logging.debug('creating file %s' % pathname) f = open(pathname, 'w') f.write(contents) f.close() def remove_file(self, root, relative): '''Remove a file.''' pathname = os.path.join(root, relative) logging.debug('removing file %s' % pathname) os.remove(pathname) def create_dir(self, root, pathname): '''Create a new directory, return name.''' fullname = os.path.join(root, pathname) logging.debug('mkdir %s' % fullname) os.makedirs(fullname) return fullname def get_info(self, root, pathname): '''Get the information about a given file. Return a tuple (relativepath, stat) where relativepath is the path relative to root, and stat is the result of os.lstat. ''' root_base = os.path.basename(root) del_prefix = root[:-len(root_base)] if pathname == root: return None assert pathname.startswith(root + os.sep), (pathname, root) return pathname[len(root + os.sep):], os.lstat(pathname) def find_everything(self, root): '''Find all filesystem objects inside a directory tree. Return list of (pathname, stat) tuples. The pathname will be relative to the root of the directory tree. The stat tuples will be the result of os.lstat for each pathname. ''' result = [] for dirname, dirnames, filenames in os.walk(root): result.append(self.get_info(root, dirname)) for filename in filenames: pathname = os.path.join(dirname, filename) result.append(self.get_info(root, pathname)) return [x for x in result if x] def open_repository(self): '''Open the repository.''' fs = obnamlib.LocalFS(self.repo) s = obnamlib.Repository(fs, obnamlib.DEFAULT_NODE_SIZE, obnamlib.DEFAULT_UPLOAD_QUEUE_SIZE, obnamlib.DEFAULT_LRU_SIZE, None) s.open_client(self.client_name) return s def assert_equal_stat_fields(self, filename, stat1, stat2, fieldname): field1 = getattr(stat1, fieldname) field2 = getattr(stat2, fieldname) self.assertEqual(field1, field2, '%s stat field %s difference: %s vs %s' % (filename, fieldname, repr(field1), repr(field2))) def assert_same_stat(self, name, stat1, stat2): '''Are two stat results effectively identical?''' class Fake(object): def __init__(self, stat_result): self.st = stat_result def __getattr__(self, name): if name == 'st_mtime': return int(getattr(self.st, name)) else: return getattr(self.st, name) self.assert_equal_stat_fields(name, stat1, stat2, 'st_blocks') self.assert_equal_stat_fields(name, stat1, stat2, 'st_gid') self.assert_equal_stat_fields(name, stat1, stat2, 'st_mode') self.assert_equal_stat_fields(name, Fake(stat1), Fake(stat2), 'st_mtime') self.assert_equal_stat_fields(name, stat1, stat2, 'st_nlink') self.assert_equal_stat_fields(name, stat1, stat2, 'st_size') self.assert_equal_stat_fields(name, stat1, stat2, 'st_uid') def assert_same_contents(self, relative, root1, root2): '''Verify that file contents has been restored correctly.''' path1 = os.path.join(root1, relative) path2 = os.path.join(root2, relative) self.assertFilesEqual(path1, path2) def assertFileExists(self, path): self.assert_(os.path.exists(path), '%s does not exist' % path) def assertIsRegularFile(self, path): self.assert_(os.path.isfile(path), '%s is not a regular file' % path) def assertFilesEqual(self, path1, path2): '''Verify that file contents are equal.''' self.assertFileExists(path1) self.assertFileExists(path2) self.assertIsRegularFile(path1) self.assertIsRegularFile(path2) f1 = open(path1, 'r') f2 = open(path2, 'r') data1 = f1.read() data2 = f2.read() f1.close() f2.close() self.assertEqual(data1, data2, 'contents of %s and %s differ' % (path1, path2)) def report_contents(self, dirname, contents): logging.debug('Contents of %s:' % dirname) for name in sorted(contents.keys()): logging.debug(' %s' % name) def assert_restored_correctly(self): '''Verify that a restored directory is identical to the original. Raise an exception for the first error found. We need to play a little trick. When we backup /foo/bar/foobar, the backup will contain entries for /, /foo, and /foo/bar, and when we restore the backup, it will put foobar into $TARGET/foo/bar/foobar. Thus, to verify data, we need to append data to restored. ''' origs = dict(self.find_everything(self.data)) basename = os.path.basename(self.data) restored2 = os.path.join(self.restored, './' + self.data) restoreds = dict(self.find_everything(restored2)) for name, orig_stat in origs.iteritems(): if name not in restoreds: logging.error('file %s not in restored data' % name) self.report_contents(self.data, origs) self.report_contents(self.restored, restoreds) raise Exception('%s not in restored data' % name) restored_stat = restoreds[name] self.assert_same_stat(name, orig_stat, restored_stat) if stat.S_ISREG(orig_stat.st_mode): self.assert_same_contents(name, self.data, restored2) for name, restored_stat in restoreds.iteritems(): if name not in origs: logging.error('spurious file %s in restored data' % name) self.report_contents(self.data, origs) self.report_contents(self.restored, restoreds) raise Exception('spurious %s in restored data' % name) def remove_encryption_metadata(self, parent, subdir): for x in ['key', 'userkeys']: pathname = os.path.join(parent, subdir, x) if os.path.exists(pathname): os.remove(pathname) class RestoreTests(ObnamTestCase): def setUpHook(self): self.create_file(self.data, 'foo', 'foo') self.create_file(self.data, 'bar', 'bar') self.backup() def test_restores_identical_data(self): self.restore() self.assert_restored_correctly() def test_restores_individual_file(self): bar_orig = os.path.join(self.data, 'bar') self.restore([bar_orig]) restored_files = [os.path.join(self.restored, x) for x, y in self.find_everything(self.restored) if stat.S_ISREG(y.st_mode)] bar_restored = os.path.join(self.restored, './' + bar_orig) bar_restored = os.path.normpath(bar_restored) self.assertEqual(restored_files, [bar_restored]) self.assertFilesEqual(bar_orig, bar_restored) class BackupTests(ObnamTestCase): def test_makes_two_generations(self): self.create_file(self.data, 'foo', 'foo') self.backup() self.create_file(self.data, 'bar', 'bar') self.remove_file(self.data, 'foo') self.backup() self.restore() self.assert_restored_correctly() def test_handles_two_roots(self): root1 = self.create_dir(self.data, 'root1') root2 = self.create_dir(self.data, 'root2') self.create_file(root1, 'file1', 'content1') self.create_file(root2, 'file2', 'content2') self.backup([root1, root2]) self.restore() self.assert_restored_correctly() def test_handles_symlink(self): self.create_file(self.data, 'target', 'content1') os.symlink('target', os.path.join(self.data, 'symlink')) self.backup() self.restore() self.assert_restored_correctly() def test_handles_hardlink(self): self.create_file(self.data, 'target', 'content1') os.link(os.path.join(self.data, 'target'), os.path.join(self.data, 'hardlink')) self.backup() self.restore() self.assert_restored_correctly() def test_does_not_include_roots_from_old_gens_unless_specified_again(self): foo = self.create_dir(self.data, 'foo') self.backup(roots=[foo]) bar = self.create_dir(self.data, 'bar') self.backup(roots=[bar]) self.restore() parent = os.path.join(self.restored, './' + self.data) self.assertEqual(os.listdir(parent), ['bar']) class VerifyTests(ObnamTestCase): def test_accepts_unchanged_backup(self): self.create_file(self.data, 'foo', 'foo') self.create_file(self.data, 'bar', 'bar') self.backup() self.verify() def test_notices_changed_data(self): self.create_file(self.data, 'foo', 'foo') self.create_file(self.data, 'bar', 'bar') self.backup() self.remove_file(self.data, 'foo') self.remove_file(self.data, 'bar') self.create_file(self.data, 'foo', 'changed data') self.assertRaises(subprocess.CalledProcessError, self.verify, r'^Error: verify failure') class ForgetTests(ObnamTestCase): def setUpHook(self): self.create_file(self.data, 'foo', 'foo') self.create_file(self.data, 'bar', 'bar') self.backup() self.backup() def test_removes_nothing_when_given_no_args(self): genids = self.generations() self.forget() self.assertEqual(genids, self.generations()) def test_removes_specified_generations(self): genids = self.generations() self.forget(genids=genids[:1]) self.assertEqual(genids[1:], self.generations()) def test_removes_according_to_policy(self): genids = self.generations() self.forget(keep='1d') self.assertEqual(genids[1:], self.generations()) def test_pretends(self): genids = self.generations() self.forget(keep='1d', pretend=True) self.assertEqual(genids, self.generations()) def disk_usage(self, root): usage = 0 fs = obnamlib.LocalFS(root) for dirname, subdirs, filenames in fs.depth_first('.'): for filename in filenames: st = fs.lstat(os.path.join(dirname, filename)) usage += st.st_blocks * 512 return usage def random_string(self, size): return ''.join(chr(random.randint(0, 255)) for i in xrange(size)) def test_removes_unwanted_data(self): self.create_file(self.data, 'big', self.random_string(1024**2)) self.backup() self.forget(genids=self.generations()) self.remove_encryption_metadata(self.repo, 'chunks') self.assertEqual(self.disk_usage(os.path.join(self.repo, 'chunks')), 0) def test_removes_unwanted_data_with_empty_generation_remaining(self): self.create_file(self.data, 'big', self.random_string(1024**2)) self.backup() shutil.rmtree(self.data) os.mkdir(self.data) self.backup() genids = self.generations() forgettable = genids[:-1] self.forget(genids=forgettable) self.remove_encryption_metadata(self.repo, 'chunks') chunks = os.path.join(self.repo, 'chunks') self.assertEqual(self.disk_usage(chunks), 0) class EncryptionTests(ObnamTestCase): def setUpHook(self): self.create_file(self.data, 'foo', 'foo') self.backup() def client_keys(self): output = self.obnam(['client-keys', '--repository', self.repo]) return [tuple(line.split()) for line in output.splitlines()] def list_keys(self): output = self.obnam(['list-keys', '--repository', self.repo]) keys = dict() latest_key = None for line in output.splitlines(): if line.startswith('key:'): latest_key = line.split()[1] else: keys[latest_key] = keys.get(latest_key, []) + [line.strip()] for key in keys: keys[key].sort() return keys def test_has_client_key_after_backup(self): self.assertEqual(self.client_keys(), [(self.client_name, self.gpgkey)]) def test_removes_client(self): self.obnam(['remove-client', '--repository', self.repo, self.client_name]) self.assertEqual(self.client_keys(), []) def test_only_client_key_listed_initially(self): self.assertEqual(self.list_keys().keys(), [self.gpgkey]) def test_adds_key(self): self.obnam(['add-key', '--keyid', self.gpgkey2, '--repository', self.repo]) self.assertEqual(sorted(self.list_keys().keys()), sorted([self.gpgkey, self.gpgkey2])) def test_removes_key(self): self.obnam(['add-key', '--keyid', self.gpgkey2, '--repository', self.repo]) self.obnam(['remove-key', '--keyid', self.gpgkey2, '--repository', self.repo]) self.assertEqual(self.list_keys().keys(), [self.gpgkey]) if __name__ == '__main__': logging.basicConfig(filename='blackboxtest.log', level=logging.DEBUG, format='%(levelname)s: %(message)s') unittest.main()