#!/usr/bin/python # # Copyright (C) 2009, 2010 Lars Wirzenius # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. """Run some black box tests of obnam.""" import logging import os import re import shutil import subprocess import sys import tempfile import traceback import obnamlib class BlackBoxTest(object): """Base class for black box tests of Obnam.""" def __init__(self): self.temproot = tempfile.mkdtemp() self.hostid = "hostid" logging.debug('temproot = %s' % self.temproot) logging.debug('hostid = %s' % self.hostid) def tempdir(self): """Create a new temporary directory, create its name. The clean() method will remove this directory. """ dirname = tempfile.mkdtemp(dir=self.temproot) logging.debug('created directory %s' % dirname) return dirname def clean(self): """Remove temporary files.""" logging.debug('removing temproot %s' % self.temproot) shutil.rmtree(self.temproot) def runcmd(self, argv, stderr_ignore=None): """Run an external command. If the command fails (non-zero exit), raise an exception. """ logging.debug('executing %s' % argv) p = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = p.communicate() if stderr_ignore: lines = [line for line in stderr.splitlines() if not re.match(stderr_ignore, line)] stderr = ''.join(lines) sys.stderr.write(stderr) if p.returncode != 0: raise subprocess.CalledProcessError(p.returncode, argv) return stdout def obnam(self, args, stderr_ignore=None): '''Run obnam, adding default options to args.''' return self.runcmd(['./obnam', '--quiet', '--log', 'blackboxtest-obnam.log', '--hostname', self.hostid] + args, stderr_ignore=stderr_ignore) def create_file(self, root, relative, contents): """Create a new file with the desired contents.""" pathname = os.path.join(root, relative) logging.debug('creating file %s' % pathname) f = file(pathname, "w") f.write(contents) f.close() def remove_file(self, root, relative): """Remove a file.""" pathname = os.path.join(root, relative) logging.debug('removing file %s' % pathname) os.remove(pathname) def create_dir(self, root, pathname): """Create a new directory, return name.""" fullname = os.path.join(root, pathname) logging.debug('mkdir %s' % fullname) os.makedirs(fullname) return fullname def get_info(self, root, pathname): """Get the information about a given file. Return a tuple (relativepath, stat) where relativepath is the path relative to root, and stat is the result of os.lstat. """ root_base = os.path.basename(root) del_prefix = root[:-len(root_base)] if pathname == root: return None assert pathname.startswith(root + os.sep), (pathname, root) return pathname[len(root + os.sep):], os.lstat(pathname) def find_everything(self, root): """Find all filesystem objects inside a directory tree. Return list of (pathname, stat) tuples. The pathname will be relative to the root of the directory tree. The stat tuples will be the result of os.lstat for each pathname. """ result = [] for dirname, dirnames, filenames in os.walk(root): result.append(self.get_info(root, dirname)) for filename in filenames: pathname = os.path.join(dirname, filename) result.append(self.get_info(root, pathname)) return [x for x in result if x] def assertWorks(self, func, *args, **kwargs): '''Make sure calling function/method does not raise exceptions.''' try: func(*args, **kwargs) except Exception, e: raise Exception('Calling %s raised exception' % repr(func)) def assertRaises(self, exc, func, *args, **kwargs): '''Make sure calling function/method does raise an exception.''' try: func(*args, **kwargs) except exc: pass except BaseException, e: raise Exception('Calling %s raised wrong exception %s' % (repr(func), repr(e))) else: raise Exception('Calling %s did not raise exception' % repr(func)) def assert_(self, name, condition, message): """Raise an exception if a condition is not met.""" if not condition: raise Exception("%s: %s" % (name, message)) def assertEqual(self, name, value1, value2, message): """Raise an exception if two values are not equal.""" if value1 != value2: raise Exception("%s: %s: %s != %s" % (name, message, repr(value1), repr(value2))) def assert_same_stat(self, name, stat1, stat2): """Are two stat results effectively identical?""" self.assertEqual(name, stat1.st_blocks, stat2.st_blocks, "blocks") self.assertEqual(name, stat1.st_gid, stat2.st_gid, "gid") self.assertEqual(name, stat1.st_mode, stat2.st_mode, "mode") self.assertEqual(name, int(stat1.st_mtime), int(stat2.st_mtime), "mtime") self.assertEqual(name, stat1.st_nlink, stat2.st_nlink, "nlink") self.assertEqual(name, stat1.st_size, stat2.st_size, "size") self.assertEqual(name, stat1.st_uid, stat2.st_uid, "uid") def assert_same_contents(self, relative, root1, root2): """Verify that file contents has been restored correctly.""" path1 = os.path.join(root1, relative) path2 = os.path.join(root2, relative) if not os.path.isfile(path1): return True f1 = file(path1, "r") f2 = file(path2, "r") data1 = f1.read() data2 = f2.read() f1.close() f2.close() self.assertEqual(relative, data1, data2, "content") def report_contents(self, dirname, contents): logging.debug('Contents of %s:' % dirname) for name in sorted(contents.keys()): logging.debug(' %s' % name) def verify(self, data, restored): """Verify that a restored directory is identical to the original. Raise an exception for the first error found. We need to play a little trick. When we backup /foo/bar/foobar, the backup will contain entries for /, /foo, and /foo/bar, and when we restore the backup, it will put foobar into $TARGET/foo/bar/foobar. Thus, to verify data, we need to append data to restored. """ origs = dict(self.find_everything(data)) basename = os.path.basename(data) restored2 = os.path.join(restored, './' + data) restoreds = dict(self.find_everything(restored2)) for name, orig_stat in origs.iteritems(): if name not in restoreds: logging.error('file %s not in restored data' % name) self.report_contents(data, origs) self.report_contents(restored, restoreds) raise Exception("%s not in restored data" % name) restored_stat = restoreds[name] self.assert_same_stat(name, orig_stat, restored_stat) self.assert_same_contents(name, data, restored2) for name, restored_stat in restoreds.iteritems(): if name not in origs: logging.error('spurious file %s in restored data' % name) self.report_contents(data, origs) self.report_contents(restored, restoreds) raise Exception("spurious %s in restored data" % name) def fsck(self, store): self.obnam(['fsck', '--store', store]) def backup(self, store, datas): """Back up a data directory to a store.""" logging.debug('backing up %s to %s' % (datas, store)) self.obnam(['--store', store, 'backup'] + datas) self.fsck(store) def restore(self, store, args=None): """Restore the newest generation from a store. Return name of directorey with restored data. """ restored = self.tempdir() logging.debug('restoring %s to %s' % (store, restored)) self.obnam(["restore", '--to', restored, '--generation', "latest", "--store", store] + (args or [])) return restored def generations(self, store): '''Return all generation ids.''' logging.debug('Looking up generations in %s' % store) stdout = self.obnam(['generations', '--store', store]) return [line.split()[0] for line in stdout.splitlines()] def forget(self, store, genids=None, keep=None, pretend=False): '''Forget specific generations.''' logging.debug('Forgetting in %s' % store) logging.debug('genids: %s' % genids) logging.debug('keep: %s' % keep) args = [] if genids is not None: args += genids if keep is not None: args += ['--keep', keep] if pretend: args += ['--pretend'] self.obnam(['forget', '--store', store] + args) class RestoresIdenticalData(BlackBoxTest): """Generate some data, backup, restore. Result should be identical.""" def test(self): data = self.tempdir() self.create_file(data, "foo", "foo") store = self.tempdir() self.backup(store, [data]) restored = self.restore(store) self.verify(data, restored) class RestoresIndividualFile(BlackBoxTest): '''Verify restore can restore individual files.''' def test(self): data = self.tempdir() self.create_file(data, 'foo', 'foo') self.create_file(data, 'bar', 'bar') store = self.tempdir() self.backup(store, [data]) barpath = os.path.join(data, 'bar') logging.debug('bar path is %s' % barpath) restored = self.restore(store, [barpath]) restored2 = os.path.join(restored, './' + data) restoreds = dict(self.find_everything(restored2)) self.report_contents(restored, restoreds) self.assertEqual('RestoresIndividualFiles', restoreds.keys(), ['bar'], 'Must restore bar and nothing else') class TwoBackupRoots(BlackBoxTest): def test(self): data = self.tempdir() root1 = self.create_dir(data, 'root1') root2 = self.create_dir(data, 'root2') self.create_file(root1, 'file1', 'content1') self.create_file(root2, 'file2', 'content2') store = self.tempdir() self.backup(store, [root1, root2]) restored = self.restore(store) self.verify(data, restored) class TwoGenerations(BlackBoxTest): def test(self): data = self.tempdir() self.create_file(data, "foo", "foo") store = self.tempdir() self.backup(store, [data]) self.create_file(data, 'bar', 'bar') self.remove_file(data, 'foo') self.backup(store, [data]) restored = self.restore(store) self.verify(data, restored) class Symlink(BlackBoxTest): def test(self): data = self.tempdir() self.create_file(data, 'target', 'content1') os.symlink('target', os.path.join(data, 'symlink')) store = self.tempdir() self.backup(store, [data]) restored = self.restore(store) self.verify(data, restored) class Hardlink(BlackBoxTest): def test(self): data = self.tempdir() self.create_file(data, 'target', 'content1') os.link(os.path.join(data, 'target'), os.path.join(data, 'hardlink')) store = self.tempdir() self.backup(store, [data]) restored = self.restore(store) self.verify(data, restored) class ReusesChunks(BlackBoxTest): def test(self): data = self.tempdir() n = 100 for i in range(n): self.create_file(data, 'target%d' % i, 'content1') store = self.tempdir() self.backup(store, [data]) restored = self.restore(store) self.verify(data, restored) fsf = obnamlib.VfsFactory() fs = fsf.new(store) s = obnamlib.Store(fs, obnamlib.DEFAULT_NODE_SIZE, obnamlib.DEFAULT_UPLOAD_QUEUE_SIZE) s.open_host(self.hostid) self.assertEqual('ReusesChunks', len(s.list_chunks()), 1, 'only one chunk when all files are identical') class UsesChunkGroups(BlackBoxTest): def test(self): data = self.tempdir() ngroups = 2 nbytes = (obnamlib.DEFAULT_CHUNK_SIZE * obnamlib.DEFAULT_CHUNK_GROUP_SIZE * ngroups) self.create_file(data, 'file', 'x' * nbytes) store = self.tempdir() self.backup(store, [data]) restored = self.restore(store) self.verify(data, restored) fsf = obnamlib.VfsFactory() fs = fsf.new(store) s = obnamlib.Store(fs, obnamlib.DEFAULT_NODE_SIZE, obnamlib.DEFAULT_UPLOAD_QUEUE_SIZE) s.open_host(self.hostid) self.assertEqual('UsesChunkGroups', len(s.list_chunk_groups()), ngroups, 'must use %d chunk groups' % ngroups) class VerifiesUnchangedBackup(BlackBoxTest): '''Make backup, verify it immediately, make sure it works.''' def test(self): data = self.tempdir() self.create_file(data, 'foo', 'foo') self.create_file(data, 'bar', 'bar') store = self.tempdir() self.backup(store, [data]) self.assertWorks(self.obnam, ['verify', '--root', data, '--store', store]) class VerifyFindsProblemWithChangedData(BlackBoxTest): '''Make backup, change data, verify it, make sure it fails.''' def test(self): data = self.tempdir() self.create_file(data, 'foo', 'foo') self.create_file(data, 'bar', 'bar') store = self.tempdir() self.backup(store, [data]) self.remove_file(data, 'foo') self.remove_file(data, 'bar') self.create_file(data, 'foo', 'changed data') self.assertRaises(subprocess.CalledProcessError, self.obnam, ['verify', '--root', data, '--store', store], r'^Error: verify failure') class ForgetWithoutArgumentsRemovesNothing(BlackBoxTest): def test(self): data = self.tempdir() self.create_file(data, 'foo', 'foo') self.create_file(data, 'bar', 'bar') store = self.tempdir() self.backup(store, [data]) self.backup(store, [data]) genids = self.generations(store) self.forget(store) self.assertEqual('ForgetWithoutArguments', genids, self.generations(store), 'forget should remove nothing by default') class ForgetRemovesSpecifiedGenerations(BlackBoxTest): def test(self): data = self.tempdir() self.create_file(data, 'foo', 'foo') self.create_file(data, 'bar', 'bar') store = self.tempdir() self.backup(store, [data]) self.backup(store, [data]) genids = self.generations(store) self.forget(store, genids=genids[:1]) self.assertEqual('ForgetRemovesSpecifiedGenerations', genids[1:], self.generations(store), 'forget should exactly the specified generations') class ForgetRemovesAccordingToPolicy(BlackBoxTest): def test(self): data = self.tempdir() self.create_file(data, 'foo', 'foo') self.create_file(data, 'bar', 'bar') store = self.tempdir() self.backup(store, [data]) self.backup(store, [data]) genids = self.generations(store) self.forget(store, keep='1d') self.assertEqual('ForgetRemovesAccordingToPolicy', genids[1:], self.generations(store), 'forget should remove according to policy') class ForgetPretends(BlackBoxTest): def test(self): data = self.tempdir() self.create_file(data, 'foo', 'foo') self.create_file(data, 'bar', 'bar') store = self.tempdir() self.backup(store, [data]) self.backup(store, [data]) genids = self.generations(store) self.forget(store, keep='1d', pretend=True) self.assertEqual('ForgetPretends', genids, self.generations(store), 'forget should obey --pretend') class ObnamBlackBoxTests: """Main class for running black box tests of obnam.""" def find_tests(self, wanted): g = globals() return [g[x] for x in g if x != 'BlackBoxTest' and type(g[x]) == type(object) and issubclass(g[x], BlackBoxTest) and (not wanted or x in wanted)] def run(self): logging.info('blackboxtest starts') errors = False tests = self.find_tests(sys.argv[1:]) for klass in tests: logging.info('running test %s' % str(klass)) test = klass() try: test.test() except Exception, e: sys.stderr.write(traceback.format_exc()) logging.error('test %s failed:\n%s' % (str(klass), traceback.format_exc())) errors = True # test.clean() if errors: logging.critical('blackboxtest ends in glorious failure') sys.exit(1) logging.info('blackboxtest ends, no problems') if __name__ == "__main__": logging.basicConfig(filename='blackboxtest.log', level=logging.DEBUG, format='%(levelname)s: %(message)s') ObnamBlackBoxTests().run()