summaryrefslogtreecommitdiff
path: root/blackboxtest
diff options
context:
space:
mode:
Diffstat (limited to 'blackboxtest')
-rwxr-xr-xblackboxtest286
1 files changed, 286 insertions, 0 deletions
diff --git a/blackboxtest b/blackboxtest
new file mode 100755
index 0000000..c4d79c0
--- /dev/null
+++ b/blackboxtest
@@ -0,0 +1,286 @@
+#!/usr/bin/python
+#
+# Copyright (C) 2009, 2010 Lars Wirzenius <liw@liw.fi>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+
+'''Run some black box tests for genbackupdata.'''
+
+
+import hashlib
+import logging
+import os
+import random
+import re
+import shutil
+import stat
+import subprocess
+import sys
+import tempfile
+import traceback
+import unittest
+
+
+class GenbackupdataTestCase(unittest.TestCase):
+
+ '''Base class for genbackupdata test cases.
+
+ We use the unittest framework even though these are black box tests,
+ not unit tests. unittest makes implementation of these black box
+ tests convenient, even though that might not be true for all black
+ box tests.
+
+ This base class provides a fresh environment for each test, and
+ cleans up afterwards. It provides helpers for doing the usual
+ backup operations, and for verifyting results.
+
+ '''
+
+ def setUp(self):
+ self.tempdir = tempfile.mkdtemp()
+ self.setUpHook()
+
+ def setUpHook(self):
+ pass
+
+ def tearDown(self):
+ self.tearDownHook()
+ shutil.rmtree(self.tempdir)
+
+ def tearDownHook(self):
+ pass
+
+ def path(self, *relatives):
+ return os.path.join(self.tempdir, *relatives)
+
+ def mkdir(self, dirname):
+ abs_dirname = os.path.join(self.tempdir, dirname)
+ os.makedirs(abs_dirname)
+ return abs_dirname
+
+ def runcmd(self, argv, stderr_ignore=None):
+ '''Run an external command.
+
+ If the command fails (non-zero exit), raise an exception.
+
+ If stderr_ignore is not None, it must be a string with a
+ regexp for lines in stderr to ignore.
+
+ '''
+
+ logging.debug('executing %s' % argv)
+
+ p = subprocess.Popen(argv, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ stdout, stderr = p.communicate()
+ if stderr_ignore:
+ lines = [line for line in stderr.splitlines()
+ if not re.match(stderr_ignore, line)]
+ stderr = ''.join(lines)
+ sys.stderr.write(stderr)
+ if p.returncode != 0:
+ raise subprocess.CalledProcessError(p.returncode, argv)
+ return stdout
+
+ def genbackupdata(self, args, stderr_ignore=None):
+ '''Run genbackupdata, with some default arguments.'''
+ return self.runcmd(['./genbackupdata', '--quiet'] +
+ args, stderr_ignore=stderr_ignore)
+
+ def create_file(self, dirname, relative, contents):
+ '''Create a new file with the desired contents.'''
+
+ pathname = os.path.join(dirname, relative)
+ logging.debug('creating file %s' % pathname)
+ f = open(pathname, 'w')
+ f.write(contents)
+ f.close()
+
+ def remove_file(self, root, relative):
+ '''Remove a file.'''
+
+ pathname = os.path.join(root, relative)
+ logging.debug('removing file %s' % pathname)
+ os.remove(pathname)
+
+ def create_dir(self, root, pathname):
+ '''Create a new directory, return name.'''
+ fullname = os.path.join(root, pathname)
+ logging.debug('mkdir %s' % fullname)
+ os.makedirs(fullname)
+ return fullname
+
+ def get_info(self, root, pathname):
+ '''Get the information about a given file.
+
+ Return a tuple (relativepath, stat) where relativepath is the
+ path relative to root, and stat is the result of os.lstat.
+
+ '''
+
+ root_base = os.path.basename(root)
+ del_prefix = root[:-len(root_base)]
+ if pathname == root:
+ return None
+ assert pathname.startswith(root + os.sep), (pathname, root)
+ return pathname[len(root + os.sep):], os.lstat(pathname)
+
+ def find_everything(self, root):
+ '''Find all filesystem objects inside a directory tree.
+
+ Return list of (pathname, stat) tuples. The pathname will be
+ relative to the root of the directory tree. The stat tuples
+ will be the result of os.lstat for each pathname.
+
+ '''
+
+ result = []
+ for dirname, dirnames, filenames in os.walk(root):
+ result.append(self.get_info(root, dirname))
+ for filename in filenames:
+ pathname = os.path.join(dirname, filename)
+ result.append(self.get_info(root, pathname))
+ return [x for x in result if x]
+
+ def apparent_size(self, root):
+ '''Return sum of length of regular files in directory, recursively.'''
+
+ size = 0
+ for dirname, subdirs, filenames in os.walk(self.path(root)):
+ for filename in filenames:
+ pathname = self.path(dirname, filename)
+ st = os.lstat(pathname)
+ if stat.S_ISREG(st.st_mode):
+ size += st.st_size
+ return size
+
+ def checksum(self, pathname):
+ '''Return MD5 checksum for contents of a file.'''
+ s = hashlib.new('md5')
+ f = open(pathname, 'rb')
+ while True:
+ data = f.read(64*1024)
+ if not data:
+ break
+ s.update(data)
+ f.close()
+ return s.hexdigest()
+
+ def checksums(self, root):
+ '''Return sorted list of (pathname, checksum) pairs for reg. files.'''
+ result = []
+ prefix = self.path(root) + os.sep
+ for dirname, subdirs, filenames in os.walk(self.path(root)):
+ for filename in filenames:
+ pathname = self.path(dirname, filename)
+ st = os.lstat(pathname)
+ if stat.S_ISREG(st.st_mode):
+ assert pathname.startswith(prefix)
+ relative = pathname[len(prefix):]
+ result.append((relative, self.checksum(pathname)))
+ result.sort()
+ return result
+
+ def assert_equal_stat_fields(self, filename, stat1, stat2, fieldname):
+ field1 = getattr(stat1, fieldname)
+ field2 = getattr(stat2, fieldname)
+ self.assertEqual(field1, field2,
+ '%s stat field %s difference: %s vs %s' %
+ (filename, fieldname, repr(field1), repr(field2)))
+
+ def assert_same_stat(self, name, stat1, stat2):
+ '''Are two stat results effectively identical?'''
+
+ class Fake(object):
+
+ def __init__(self, stat_result):
+ self.st = stat_result
+
+ def __getattr__(self, name):
+ if name == 'st_mtime':
+ return int(getattr(self.st, name))
+ else:
+ return getattr(self.st, name)
+
+ self.assert_equal_stat_fields(name, stat1, stat2, 'st_blocks')
+ self.assert_equal_stat_fields(name, stat1, stat2, 'st_gid')
+ self.assert_equal_stat_fields(name, stat1, stat2, 'st_mode')
+ self.assert_equal_stat_fields(name, Fake(stat1), Fake(stat2), 'st_mtime')
+ self.assert_equal_stat_fields(name, stat1, stat2, 'st_nlink')
+ self.assert_equal_stat_fields(name, stat1, stat2, 'st_size')
+ self.assert_equal_stat_fields(name, stat1, stat2, 'st_uid')
+
+ def assert_same_contents(self, relative, root1, root2):
+ '''Verify that file contents has been restored correctly.'''
+
+ path1 = os.path.join(root1, relative)
+ path2 = os.path.join(root2, relative)
+
+ self.assertFilesEqual(path1, path2)
+
+ def assertFileExists(self, path):
+ self.assert_(os.path.exists(path), '%s does not exist' % path)
+
+ def assertIsRegularFile(self, path):
+ self.assert_(os.path.isfile(path), '%s is not a regular file' % path)
+
+ def assertFilesEqual(self, path1, path2):
+ '''Verify that file contents are equal.'''
+
+ self.assertFileExists(path1)
+ self.assertFileExists(path2)
+ self.assertIsRegularFile(path1)
+ self.assertIsRegularFile(path2)
+
+ f1 = open(path1, 'r')
+ f2 = open(path2, 'r')
+
+ data1 = f1.read()
+ data2 = f2.read()
+
+ f1.close()
+ f2.close()
+
+ self.assertEqual(data1, data2,
+ 'contents of %s and %s differ' % (path1, path2))
+
+
+class GenbackupdataTests(GenbackupdataTestCase):
+
+ def test_returns_success_with_help_option(self):
+ self.genbackupdata(['--help'])
+ self.assertTrue(True)
+
+ def test_creates_requested_amount_of_data(self):
+ bytes = 12765
+ self.genbackupdata([self.path('data'), '--create=%d' % bytes])
+ self.assertEqual(self.apparent_size('data'), bytes)
+
+ def test_creates_same_data_every_time(self):
+ size = '10m' # big enough to allow both ample text and binary data
+ self.genbackupdata([self.path('data1'), '--create', size])
+ self.genbackupdata([self.path('data2'), '--create', size])
+ sums1 = self.checksums('data1')
+ sums2 = self.checksums('data2')
+ self.assertEqual(len(sums1), len(sums2))
+ for n in range(1, len(sums1)):
+ self.assertEqual(sums1[:n], sums2[:n])
+
+if __name__ == '__main__':
+ logging.basicConfig(filename='blackboxtest.log',
+ level=logging.DEBUG,
+ format='%(levelname)s: %(message)s')
+ unittest.main()