diff options
author | Lars Wirzenius <liw@gytha> | 2008-06-10 22:20:05 +0300 |
---|---|---|
committer | Lars Wirzenius <liw@gytha> | 2008-06-10 22:20:05 +0300 |
commit | 4b378dde1770d97d1aab9f460685093d733a717c (patch) | |
tree | fd9e8378a1ecd0aae54aeb30f92059d62ab31bf7 /obnam | |
parent | fa8f864c610707128e78b13b84ca5e8b353894fc (diff) | |
download | obnam-4b378dde1770d97d1aab9f460685093d733a717c.tar.gz |
Renamed obnam/ to obnamlib/ so that we can later rename cli.py to obnam.
Diffstat (limited to 'obnam')
49 files changed, 0 insertions, 9281 deletions
diff --git a/obnam/__init__.py b/obnam/__init__.py deleted file mode 100644 index 51e5ce15..00000000 --- a/obnam/__init__.py +++ /dev/null @@ -1,55 +0,0 @@ -# Copyright (C) 2006 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""The init file for the obnam module.""" - - -NAME = "obnam" -VERSION = "0.9.2" - - -from exception import ObnamException - -import backend -import cache -import cfgfile -import cmp -import config -import context -import filelist -import format -import gpg -import io -import log -import map -import obj -import progress -import rsync -import utils -import varint -import walk - -from app import Application -from oper import Operation, OperationFactory -from store import Store - -from oper_backup import Backup -from oper_forget import Forget -from oper_generations import ListGenerations -from oper_restore import Restore -from oper_show_generations import ShowGenerations - diff --git a/obnam/app.py b/obnam/app.py deleted file mode 100644 index c085a70b..00000000 --- a/obnam/app.py +++ /dev/null @@ -1,511 +0,0 @@ -# Copyright (C) 2008 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Main program for Obnam.""" - - -import logging -import os -import re -import stat -import time - -import obnam - - - -# Maximum number of files per file group we create. -MAX_PER_FILEGROUP = 16 - - -class Application: - - """Main program logic for Obnam, a backup application.""" - - def __init__(self, context): - self._context = context - self._exclusion_strings = [] - self._exclusion_regexps = [] - self._filelist = None - self._prev_gen = None - self._store = obnam.Store(self._context) - self._total = 0 - - # When we traverse the file system tree while making a backup, - # we process children before the parent. This is necessary for - # functional updates of trees. For every directory, we need - # to keep track of its children. This dict is used for that. - # It is indexed by the absolute path to the directory, and - # contains a list of the subdirectories in that directory. - # When we're done with a directory (i.e., we generate its - # DirObject), we remove the directory from this dict. This - # means that we need only data for one path from the root of - # the directory tree to the current directory, not for the - # entire directory tree. - self._subdirs = {} - - def get_context(self): - """Get the context for the backup application.""" - return self._context - - def get_store(self): - """Get the Store for the backup application.""" - return self._store - - def load_host(self): - """Load the host block into memory.""" - self.get_store().fetch_host_block() - return self.get_store().get_host_block() - - def get_exclusion_regexps(self): - """Return list of regexp to exclude things from backup.""" - - config = self.get_context().config - strings = config.getvalues("backup", "exclude") - strings = [s.strip() for s in strings if s.strip()] - if self._exclusion_strings != strings: - self._exclusion_strings = strings - self._exclusion_regexps = [] - for string in strings: - logging.debug("Compiling exclusion pattern '%s'" % string) - self._exclusion_regexps.append(re.compile(string)) - - return self._exclusion_regexps - - def prune(self, dirname, dirnames, filenames): - """Remove excluded items from dirnames and filenames. - - Because this is called by obnam.walk.depth_first, the lists - are modified in place. - - """ - - self._prune_one_list(dirname, dirnames) - self._prune_one_list(dirname, filenames) - - def _prune_one_list(self, dirname, basenames): - """Prune one list of basenames based on exlusion list. - - Because this is called from self.prune, the list is modified - in place. - - """ - - dirname = obnam.io.unsolve(self._context, dirname) - - i = 0 - while i < len(basenames): - path = os.path.join(dirname, basenames[i]) - for regexp in self.get_exclusion_regexps(): - if regexp.search(path): - logging.debug("Excluding %s" % path) - logging.debug(" based on %s" % regexp.pattern) - del basenames[i] - break - else: - i += 1 - - def file_is_unchanged(self, stat1, stat2): - """Is a file unchanged from the previous generation? - - Given the stat results from the previous generation and the - current file, return True if the file is identical from the - previous generation (i.e., no new data to back up). - - """ - - fields = ("mode", "dev", "nlink", "uid", "gid", "size", "mtime") - for field in fields: - field = "st_" + field - if getattr(stat1, field) != getattr(stat2, field): - return False - return True - - def filegroup_is_unchanged(self, dirname, fg, filenames, stat=os.lstat): - """Is a filegroup unchanged from the previous generation? - - Given a filegroup and a list of files in the given directory, - return True if all files in the filegroup are unchanged from - the previous generation. - - The optional stat argument can be used by unit tests to - override the use of os.lstat. - - """ - - for old_name in fg.get_names(): - if old_name not in filenames: - return False # file has been deleted - - old_stat = fg.get_stat(old_name) - new_stat = stat(os.path.join(dirname, old_name)) - if not self.file_is_unchanged(old_stat, new_stat): - return False # file has changed - - return True # everything seems to be as before - - def dir_is_unchanged(self, old, new): - """Has a directory changed since the previous generation? - - Return True if a directory, or its files or subdirectories, - has changed since the previous generation. - - """ - - return (old.get_name() == new.get_name() and - self.file_is_unchanged(old.get_stat(), new.get_stat()) and - sorted(old.get_dirrefs()) == sorted(new.get_dirrefs()) and - sorted(old.get_filegrouprefs()) == - sorted(new.get_filegrouprefs())) - - def set_prevgen_filelist(self, filelist): - """Set the Filelist object from the previous generation. - - This is used when looking up files in previous generations. We - only look at one generation's Filelist, since they're big. Note - that Filelist objects are the _old_ way of storing file meta - data, and we will no use better ways that let us look further - back in history. - - """ - - logging.debug("Setting previous generation FILELIST.") - self._filelist = filelist - - def get_previous_generation(self): - """Get the previous generation for a backup run.""" - return self._prev_gen - - def set_previous_generation(self, gen): - """Set the previous generation for a backup run.""" - self._prev_gen = gen - - def find_file_by_name(self, filename): - """Find a backed up file given its filename. - - Return FILE component, or None if no file with the given name - could be found. - - """ - - if self._filelist: - fc = self._filelist.find(filename) - if fc != None: - return fc - - return None - - def compute_signature(self, filename): - """Compute rsync signature for a filename. - - Return the identifier. Put the signature object in the queue to - be uploaded. - - """ - - logging.debug("Computing rsync signature for %s" % filename) - sigdata = obnam.rsync.compute_signature(self._context, filename) - id = obnam.obj.object_id_new() - sig = obnam.obj.SignatureObject(id=id, sigdata=sigdata) - self.get_store().queue_object(sig) - return sig - - def find_unchanged_filegroups(self, dirname, filegroups, filenames, - stat=os.lstat): - """Return list of filegroups that are unchanged. - - The filenames and stat arguments have the same meaning as - for the filegroup_is_unchanged method. - - """ - - unchanged = [] - - for filegroup in filegroups: - if self.filegroup_is_unchanged(dirname, filegroup, filenames, - stat=stat): - unchanged.append(filegroup) - - logging.debug("There are %d unchanged filegroups in %s" % - (len(unchanged), dirname)) - return unchanged - - def get_file_in_previous_generation(self, pathname): - """Return non-directory file in previous generation, or None.""" - if self._filelist: - logging.debug("Have FILELIST, searching it for %s" % pathname) - file = self.find_file_by_name(pathname) - if file: - logging.debug("Found in prevgen FILELIST: %s" % pathname) - return file - else: - logging.debug("Not found in FILELIST.") - else: - logging.debug("No FILELIST for previous generation.") - gen = self.get_previous_generation() - if gen: - logging.debug("Looking up file in previous gen: %s" % pathname) - return self.get_store().lookup_file(gen, pathname) - else: - logging.debug("No previous gen in which to find %s" % pathname) - return None - - def _reuse_existing(self, old_file): - logging.debug("Re-using existing file contents: %s" % - old_file.first_string_by_kind(obnam.cmp.FILENAME)) - return (old_file.first_string_by_kind(obnam.cmp.CONTREF), - old_file.first_string_by_kind(obnam.cmp.SIGREF), - old_file.first_string_by_kind(obnam.cmp.DELTAREF)) - - def _get_old_sig(self, old_file): - old_sigref = old_file.first_string_by_kind(obnam.cmp.SIGREF) - if not old_sigref: - return None - old_sig = self.get_store().get_object(old_sigref) - if not old_sig: - return None - return old_sig.first_string_by_kind(obnam.cmp.SIGDATA) - - def _compute_delta(self, old_file, filename): - old_sig_data = self._get_old_sig(old_file) - if old_sig_data: - logging.debug("Computing delta for %s" % filename) - old_contref = old_file.first_string_by_kind(obnam.cmp.CONTREF) - old_deltaref = old_file.first_string_by_kind(obnam.cmp.DELTAREF) - deltapart_ids = obnam.rsync.compute_delta(self.get_context(), - old_sig_data, filename) - delta_id = obnam.obj.object_id_new() - delta = obnam.obj.DeltaObject(id=delta_id, - deltapart_refs=deltapart_ids, - cont_ref=old_contref, - delta_ref=old_deltaref) - self.get_store().queue_object(delta) - - sig = self.compute_signature(filename) - - return None, sig.get_id(), delta.get_id() - else: - logging.debug("Signature for previous version not found for %s" % - filename) - return self._backup_new(filename) - - def _backup_new(self, filename): - logging.debug("Storing new file %s" % filename) - contref = obnam.io.create_file_contents_object(self._context, - filename) - sig = self.compute_signature(filename) - sigref = sig.get_id() - deltaref = None - return contref, sigref, deltaref - - def add_to_filegroup(self, fg, filename): - """Add a file to a filegroup.""" - logging.debug("Backing up %s" % filename) - self._context.progress.update_current_action(filename) - st = os.lstat(filename) - if stat.S_ISREG(st.st_mode): - unsolved = obnam.io.unsolve(self.get_context(), filename) - old_file = self.get_file_in_previous_generation(unsolved) - if old_file: - old_st = old_file.first_by_kind(obnam.cmp.STAT) - old_st = obnam.cmp.parse_stat_component(old_st) - if self.file_is_unchanged(old_st, st): - contref, sigref, deltaref = self._reuse_existing(old_file) - else: - contref, sigref, deltaref = self._compute_delta(old_file, - filename) - else: - contref, sigref, deltaref = self._backup_new(filename) - else: - contref = None - sigref = None - deltaref = None - fg.add_file(os.path.basename(filename), st, contref, sigref, deltaref) - - def make_filegroups(self, filenames): - """Make list of new FILEGROUP objects. - - Return list of object identifiers to the FILEGROUP objects. - - """ - - list = [] - for filename in filenames: - if (not list or - len(list[-1].get_files()) >= MAX_PER_FILEGROUP): - id = obnam.obj.object_id_new() - list.append(obnam.obj.FileGroupObject(id=id)) - self.add_to_filegroup(list[-1], filename) - - self.get_store().queue_objects(list) - return list - - def _make_absolute(self, basename, relatives): - return [os.path.join(basename, name) for name in relatives] - - def get_dir_in_previous_generation(self, dirname): - """Return directory in previous generation, or None.""" - gen = self.get_previous_generation() - if gen: - logging.debug("Looking up in previous generation: %s" % dirname) - return self.get_store().lookup_dir(gen, dirname) - else: - logging.debug("No previous generation to search for %s" % dirname) - return None - - def select_files_to_back_up(self, dirname, filenames, stat=os.lstat): - """Select files to backup in a directory, compared to previous gen. - - Look up the directory in the previous generation, and see which - files need backing up compared to that generation. - - Return list of unchanged filegroups, plus list of filenames - that need backing up. - - """ - - unsolved = obnam.io.unsolve(self.get_context(), dirname) - logging.debug("Selecting files to backup in %s (unsolved)" % unsolved) - logging.debug("There are %d filenames currently" % len(filenames)) - - filenames = filenames[:] - old_dir = self.get_dir_in_previous_generation(unsolved) - if old_dir: - logging.debug("Found directory in previous generation") - old_groups = [self.get_store().get_object(id) - for id in old_dir.get_filegrouprefs()] - filegroups = self.find_unchanged_filegroups(dirname, old_groups, - filenames, - stat=stat) - for fg in filegroups: - for name in fg.get_names(): - filenames.remove(name) - - return filegroups, filenames - else: - logging.debug("Did not find directory in previous generation") - return [], filenames - - def backup_one_dir(self, dirname, subdirs, filenames, is_root=False): - """Back up non-recursively one directory. - - Return obnam.obj.DirObject that refers to the directory. - - subdirs is the list of subdirectories (as DirObject) for this - directory. - - """ - - logging.debug("Backing up non-recursively: %s" % dirname) - filegroups, filenames = self.select_files_to_back_up(dirname, - filenames) - logging.debug("Selected %d existing file groups, %d filenames" % - (len(filegroups), len(filenames))) - filenames = self._make_absolute(dirname, filenames) - - filegroups += self.make_filegroups(filenames) - filegrouprefs = [fg.get_id() for fg in filegroups] - - dirrefs = [subdir.get_id() for subdir in subdirs] - - basename = os.path.basename(dirname) - if not basename and dirname.endswith(os.sep): - basename = os.path.basename(dirname[:-len(os.sep)]) - assert basename - logging.debug("Creating DirObject, basename: %s" % basename) - if is_root: - name = obnam.io.unsolve(self.get_context(), dirname) - else: - name = basename - dir = obnam.obj.DirObject(id=obnam.obj.object_id_new(), - name=name, - stat=os.lstat(dirname), - dirrefs=dirrefs, - filegrouprefs=filegrouprefs) - - unsolved = obnam.io.unsolve(self.get_context(), dirname) - old_dir = self.get_dir_in_previous_generation(unsolved) - if old_dir and self.dir_is_unchanged(old_dir, dir): - logging.debug("Dir is unchanged: %s" % dirname) - return old_dir - else: - logging.debug("Dir has changed: %s" % dirname) - self.get_store().queue_object(dir) - return dir - - def backup_one_root(self, root): - """Backup one root for the next generation.""" - - logging.debug("Backing up root %s" % root) - - resolved = obnam.io.resolve(self._context, root) - logging.debug("Root resolves to %s" % resolved) - - if not os.path.isdir(resolved): - raise obnam.ObnamException("Not a directory: %s" % root) - # FIXME: This needs to be able to handle non-directories, too! - - subdirs_for_dir = {} - root_object = None - - for tuple in obnam.walk.depth_first(resolved, prune=self.prune): - dirname, dirnames, filenames = tuple - filenames.sort() - logging.debug("Walked to directory %s" % dirname) - logging.debug(" with dirnames: %s" % dirnames) - logging.debug(" and filenames: %s" % filenames) - self.get_context().progress.update_current_action(dirname) - - subdirs = subdirs_for_dir.get(dirname, []) - - is_root = (dirname == resolved) - - dir = self.backup_one_dir(dirname, subdirs, filenames, - is_root=is_root) - - if not is_root: - parent = os.path.dirname(dirname) - if parent not in subdirs_for_dir: - subdirs_for_dir[parent] = [] - subdirs_for_dir[parent].append(dir) - else: - root_object = dir - - if dirname in subdirs_for_dir: - del subdirs_for_dir[dirname] - - self._total += 1 + len(filenames) - self.get_context().progress.update_total_files(self._total) - - return root_object - - def backup(self, roots): - """Backup all the roots.""" - - start = int(time.time()) - root_objs = [] - self._total = 0 - for root in roots: - root_objs.append(self.backup_one_root(root)) - end = int(time.time()) - - dirrefs = [o.get_id() for o in root_objs] - gen = obnam.obj.GenerationObject(id=obnam.obj.object_id_new(), - dirrefs=dirrefs, start=start, - end=end) - self.get_store().queue_object(gen) - return gen diff --git a/obnam/appTests.py b/obnam/appTests.py deleted file mode 100644 index 3973706d..00000000 --- a/obnam/appTests.py +++ /dev/null @@ -1,754 +0,0 @@ -# Copyright (C) 2008 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Unit tests for app.py.""" - - -import os -import re -import shutil -import socket -import tempfile -import unittest - -import obnam - - -class ApplicationTests(unittest.TestCase): - - def setUp(self): - context = obnam.context.Context() - self.app = obnam.Application(context) - - def testReturnsEmptyExclusionListInitially(self): - self.failUnlessEqual(self.app.get_exclusion_regexps(), []) - - def setup_excludes(self): - config = self.app.get_context().config - config.remove_option("backup", "exclude") - config.append("backup", "exclude", "pink") - config.append("backup", "exclude", "pretty") - - def testReturnsRightNumberOfExclusionPatterns(self): - self.setup_excludes() - self.failUnlessEqual(len(self.app.get_exclusion_regexps()), 2) - - def testReturnsRegexpObjects(self): - self.setup_excludes() - for item in self.app.get_exclusion_regexps(): - self.failUnlessEqual(type(item), type(re.compile("."))) - - def testPrunesMatchingFilenames(self): - self.setup_excludes() - dirname = "/dir" - dirnames = ["subdir1", "subdir2"] - filenames = ["filename", "pink", "file-is-pretty-indeed"] - self.app.prune(dirname, dirnames, filenames) - self.failUnlessEqual(filenames, ["filename"]) - - def testPrunesMatchingFilenames(self): - self.setup_excludes() - dirname = "/dir" - dirnames = ["subdir", "pink, dir-is-pretty-indeed"] - filenames = ["filename1", "filename2"] - self.app.prune(dirname, dirnames, filenames) - self.failUnlessEqual(dirnames, ["subdir"]) - - def testSetsPreviousGenerationToNoneInitially(self): - self.failUnlessEqual(self.app.get_previous_generation(), None) - - def testSetsPreviousGenerationCorrectly(self): - self.app.set_previous_generation("pink") - self.failUnlessEqual(self.app.get_previous_generation(), "pink") - - -class ApplicationLoadHostBlockTests(unittest.TestCase): - - def setUp(self): - context = obnam.context.Context() - cache = obnam.cache.Cache(context.config) - context.be = obnam.backend.init(context.config, context.cache) - self.app = obnam.Application(context) - - def tearDown(self): - for x in ["cache", "store"]: - dirname = self.app._context.config.get("backup", x) - if os.path.isdir(dirname): - shutil.rmtree(dirname) - - def testCreatesNewHostBlockWhenNoneExists(self): - host = self.app.load_host() - self.failUnlessEqual(host.get_id(), socket.gethostname()) - self.failUnlessEqual(host.get_generation_ids(), []) - self.failUnlessEqual(host.get_map_block_ids(), []) - self.failUnlessEqual(host.get_contmap_block_ids(), []) - - def testLoadsActualHostBlockWhenOneExists(self): - context = obnam.context.Context() - cache = obnam.cache.Cache(context.config) - context.be = obnam.backend.init(context.config, context.cache) - host_id = context.config.get("backup", "host-id") - temp = obnam.obj.HostBlockObject(host_id=host_id, - gen_ids=["pink", "pretty"]) - obnam.io.upload_host_block(context, temp.encode()) - - host = self.app.load_host() - self.failUnlessEqual(host.get_generation_ids(), ["pink", "pretty"]) - - -class ApplicationMakeFileGroupsTests(unittest.TestCase): - - def make_tempfiles(self, n): - list = [] - for i in range(n): - fd, name = tempfile.mkstemp(dir=self.tempdir) - os.close(fd) - if (i % 2) == 0: - os.remove(name) - os.mkfifo(name) - list.append(name) - return list - - def setUp(self): - context = obnam.context.Context() - self.app = obnam.Application(context) - - self.tempdir = tempfile.mkdtemp() - self.tempfiles = self.make_tempfiles(obnam.app.MAX_PER_FILEGROUP + 1) - - def tearDown(self): - shutil.rmtree(self.tempdir) - - def testReturnsNoFileGroupsForEmptyListOfFiles(self): - self.failUnlessEqual(self.app.make_filegroups([]), []) - - def testReturnsOneFileGroupForOneFile(self): - filenames = self.tempfiles[:1] - self.failUnlessEqual(len(self.app.make_filegroups(filenames)), 1) - - def testReturnsOneFileGroupForMaxFilesPerGroup(self): - filenames = self.tempfiles[:obnam.app.MAX_PER_FILEGROUP] - self.failUnlessEqual(len(self.app.make_filegroups(filenames)), 1) - - def testReturnsTwoFileGroupsForMaxFilesPerGroupPlusOne(self): - filenames = self.tempfiles[:obnam.app.MAX_PER_FILEGROUP + 1] - self.failUnlessEqual(len(self.app.make_filegroups(filenames)), 2) - - def testUsesJustBasenames(self): - list = self.app.make_filegroups(self.tempfiles[:1]) - fg = list[0] - self.failIf("/" in fg.get_names()[0]) - - -class ApplicationUnchangedFileRecognitionTests(unittest.TestCase): - - def setUp(self): - context = obnam.context.Context() - self.app = obnam.Application(context) - - def testSameFileWhenStatIsIdentical(self): - st = obnam.utils.make_stat_result() - self.failUnless(self.app.file_is_unchanged(st, st)) - - def testSameFileWhenIrrelevantFieldsChange(self): - st1 = obnam.utils.make_stat_result() - st2 = obnam.utils.make_stat_result(st_ino=42, - st_atime=42, - st_blocks=42, - st_blksize=42, - st_rdev=42) - self.failUnless(self.app.file_is_unchanged(st1, st2)) - - def testChangedFileWhenDevChanges(self): - st1 = obnam.utils.make_stat_result() - st2 = obnam.utils.make_stat_result(st_dev=42) - self.failIf(self.app.file_is_unchanged(st1, st2)) - - def testChangedFileWhenModeChanges(self): - st1 = obnam.utils.make_stat_result() - st2 = obnam.utils.make_stat_result(st_mode=42) - self.failIf(self.app.file_is_unchanged(st1, st2)) - - def testChangedFileWhenNlinkChanges(self): - st1 = obnam.utils.make_stat_result() - st2 = obnam.utils.make_stat_result(st_nlink=42) - self.failIf(self.app.file_is_unchanged(st1, st2)) - - def testChangedFileWhenUidChanges(self): - st1 = obnam.utils.make_stat_result() - st2 = obnam.utils.make_stat_result(st_uid=42) - self.failIf(self.app.file_is_unchanged(st1, st2)) - - def testChangedFileWhenGidChanges(self): - st1 = obnam.utils.make_stat_result() - st2 = obnam.utils.make_stat_result(st_gid=42) - self.failIf(self.app.file_is_unchanged(st1, st2)) - - def testChangedFileWhenSizeChanges(self): - st1 = obnam.utils.make_stat_result() - st2 = obnam.utils.make_stat_result(st_size=42) - self.failIf(self.app.file_is_unchanged(st1, st2)) - - def testChangedFileWhenMtimeChanges(self): - st1 = obnam.utils.make_stat_result() - st2 = obnam.utils.make_stat_result(st_mtime=42) - self.failIf(self.app.file_is_unchanged(st1, st2)) - - -class ApplicationUnchangedFileGroupTests(unittest.TestCase): - - def setUp(self): - context = obnam.context.Context() - self.app = obnam.Application(context) - self.dir = "dirname" - self.stats = { - "dirname/pink": obnam.utils.make_stat_result(st_mtime=42), - "dirname/pretty": obnam.utils.make_stat_result(st_mtime=105), - } - - def mock_stat(self, filename): - self.failUnless(filename.startswith(self.dir)) - return self.stats[filename] - - def mock_filegroup(self, filenames): - fg = obnam.obj.FileGroupObject(id=obnam.obj.object_id_new()) - for filename in filenames: - st = self.mock_stat(os.path.join(self.dir, filename)) - fg.add_file(filename, st, None, None, None) - return fg - - def testSameFileGroupWhenAllFilesAreIdentical(self): - filenames = ["pink", "pretty"] - fg = self.mock_filegroup(filenames) - self.failUnless(self.app.filegroup_is_unchanged(self.dir, fg, - filenames, - stat=self.mock_stat)) - - def testChangedFileGroupWhenFileHasChanged(self): - filenames = ["pink", "pretty"] - fg = self.mock_filegroup(filenames) - self.stats["dirname/pink"] = obnam.utils.make_stat_result(st_mtime=1) - self.failIf(self.app.filegroup_is_unchanged(self.dir, fg, filenames, - stat=self.mock_stat)) - - def testChangedFileGroupWhenFileHasBeenRemoved(self): - filenames = ["pink", "pretty"] - fg = self.mock_filegroup(filenames) - self.failIf(self.app.filegroup_is_unchanged(self.dir, fg, - filenames[:1], - stat=self.mock_stat)) - - -class ApplicationUnchangedDirTests(unittest.TestCase): - - def setUp(self): - context = obnam.context.Context() - self.app = obnam.Application(context) - - def make_dir(self, name, dirrefs, filegrouprefs, stat=None): - if stat is None: - stat = obnam.utils.make_stat_result() - return obnam.obj.DirObject(id=obnam.obj.object_id_new(), - name=name, - stat=stat, - dirrefs=dirrefs, - filegrouprefs=filegrouprefs) - - def testSameDirWhenNothingHasChanged(self): - dir = self.make_dir("name", [], ["pink", "pretty"]) - self.failUnless(self.app.dir_is_unchanged(dir, dir)) - - def testChangedDirWhenFileGroupHasBeenRemoved(self): - dir1 = self.make_dir("name", [], ["pink", "pretty"]) - dir2 = self.make_dir("name", [], ["pink"]) - self.failIf(self.app.dir_is_unchanged(dir1, dir2)) - - def testChangedDirWhenFileGroupHasBeenAdded(self): - dir1 = self.make_dir("name", [], ["pink"]) - dir2 = self.make_dir("name", [], ["pink", "pretty"]) - self.failIf(self.app.dir_is_unchanged(dir1, dir2)) - - def testChangedDirWhenDirHasBeenRemoved(self): - dir1 = self.make_dir("name", ["pink", "pretty"], []) - dir2 = self.make_dir("name", ["pink"], []) - self.failIf(self.app.dir_is_unchanged(dir1, dir2)) - - def testChangedDirWhenDirHasBeenAdded(self): - dir1 = self.make_dir("name", ["pink"], []) - dir2 = self.make_dir("name", ["pink", "pretty"], []) - self.failIf(self.app.dir_is_unchanged(dir1, dir2)) - - def testChangedDirWhenNameHasChanged(self): - dir1 = self.make_dir("name1", [], []) - dir2 = self.make_dir("name2", [], []) - self.failIf(self.app.dir_is_unchanged(dir1, dir2)) - - def testSameDirWhenIrrelevantStatFieldsHaveChanged(self): - stat = obnam.utils.make_stat_result(st_ino=42, - st_atime=42, - st_blocks=42, - st_blksize=42, - st_rdev=42) - - dir1 = self.make_dir("name", [], []) - dir2 = self.make_dir("name", [], [], stat=stat) - self.failUnless(self.app.dir_is_unchanged(dir1, dir2)) - - def testChangedDirWhenDevHasChanged(self): - dir1 = self.make_dir("name1", [], []) - dir2 = self.make_dir("name2", [], [], - stat=obnam.utils.make_stat_result(st_dev=105)) - self.failIf(self.app.dir_is_unchanged(dir1, dir2)) - - def testChangedDirWhenModeHasChanged(self): - dir1 = self.make_dir("name1", [], []) - dir2 = self.make_dir("name2", [], [], - stat=obnam.utils.make_stat_result(st_mode=105)) - self.failIf(self.app.dir_is_unchanged(dir1, dir2)) - - def testChangedDirWhenNlinkHasChanged(self): - dir1 = self.make_dir("name1", [], []) - dir2 = self.make_dir("name2", [], [], - stat=obnam.utils.make_stat_result(st_nlink=105)) - self.failIf(self.app.dir_is_unchanged(dir1, dir2)) - - def testChangedDirWhenUidHasChanged(self): - dir1 = self.make_dir("name1", [], []) - dir2 = self.make_dir("name2", [], [], - stat=obnam.utils.make_stat_result(st_uid=105)) - self.failIf(self.app.dir_is_unchanged(dir1, dir2)) - - def testChangedDirWhenGidHasChanged(self): - dir1 = self.make_dir("name1", [], []) - dir2 = self.make_dir("name2", [], [], - stat=obnam.utils.make_stat_result(st_gid=105)) - self.failIf(self.app.dir_is_unchanged(dir1, dir2)) - - def testChangedDirWhenSizeHasChanged(self): - dir1 = self.make_dir("name1", [], []) - dir2 = self.make_dir("name2", [], [], - stat=obnam.utils.make_stat_result(st_size=105)) - self.failIf(self.app.dir_is_unchanged(dir1, dir2)) - - def testChangedDirWhenMtimeHasChanged(self): - dir1 = self.make_dir("name1", [], []) - dir2 = self.make_dir("name2", [], [], - stat=obnam.utils.make_stat_result(st_mtime=105)) - self.failIf(self.app.dir_is_unchanged(dir1, dir2)) - - -class ApplicationFindUnchangedFilegroupsTests(unittest.TestCase): - - def setUp(self): - context = obnam.context.Context() - self.app = obnam.Application(context) - self.dirname = "dirname" - self.stats = { - "dirname/pink": obnam.utils.make_stat_result(st_mtime=42), - "dirname/pretty": obnam.utils.make_stat_result(st_mtime=105), - } - self.names = ["pink", "pretty"] - self.pink = self.mock_filegroup(["pink"]) - self.pretty = self.mock_filegroup(["pretty"]) - self.groups = [self.pink, self.pretty] - - def mock_filegroup(self, filenames): - fg = obnam.obj.FileGroupObject(id=obnam.obj.object_id_new()) - for filename in filenames: - st = self.mock_stat(os.path.join(self.dirname, filename)) - fg.add_file(filename, st, None, None, None) - return fg - - def mock_stat(self, filename): - return self.stats[filename] - - def find(self, filegroups, filenames): - return self.app.find_unchanged_filegroups(self.dirname, filegroups, - filenames, - stat=self.mock_stat) - - def testReturnsEmptyListForEmptyListOfGroups(self): - self.failUnlessEqual(self.find([], self.names), []) - - def testReturnsEmptyListForEmptyListOfFilenames(self): - self.failUnlessEqual(self.find(self.groups, []), []) - - def testReturnsPinkGroupWhenPrettyIsChanged(self): - self.stats["dirname/pretty"] = obnam.utils.make_stat_result() - self.failUnlessEqual(self.find(self.groups, self.names), [self.pink]) - - def testReturnsPrettyGroupWhenPinkIsChanged(self): - self.stats["dirname/pink"] = obnam.utils.make_stat_result() - self.failUnlessEqual(self.find(self.groups, self.names), [self.pretty]) - - def testReturnsPinkAndPrettyWhenBothAreUnchanged(self): - self.failUnlessEqual(set(self.find(self.groups, self.names)), - set(self.groups)) - - def testReturnsEmptyListWhenEverythingIsChanged(self): - self.stats["dirname/pink"] = obnam.utils.make_stat_result() - self.stats["dirname/pretty"] = obnam.utils.make_stat_result() - self.failUnlessEqual(self.find(self.groups, self.names), []) - - -class ApplicationGetDirInPreviousGenerationTests(unittest.TestCase): - - class MockStore: - - def __init__(self): - self.dict = { - "pink": obnam.obj.DirObject(id="id", name="pink"), - } - - def lookup_dir(self, gen, pathname): - return self.dict.get(pathname, None) - - def setUp(self): - context = obnam.context.Context() - self.app = obnam.Application(context) - self.app._store = self.MockStore() - self.app.set_previous_generation("prevgen") - - def testReturnsNoneIfDirectoryDidNotExist(self): - self.failUnlessEqual(self.app.get_dir_in_previous_generation("xx"), - None) - - def testReturnsDirObjectIfDirectoryDidExist(self): - dir = self.app.get_dir_in_previous_generation("pink") - self.failUnlessEqual(dir.get_name(), "pink") - - -class ApplicationGetFileInPreviousGenerationTests(unittest.TestCase): - - class MockStore: - - def __init__(self): - self.dict = { - "pink": obnam.cmp.Component(obnam.cmp.FILE, []) - } - - def lookup_file(self, gen, pathname): - return self.dict.get(pathname, None) - - def setUp(self): - context = obnam.context.Context() - self.app = obnam.Application(context) - self.app._store = self.MockStore() - self.app.set_previous_generation("prevgen") - - def testReturnsNoneIfPreviousGenerationIsUnset(self): - self.app.set_previous_generation(None) - self.failUnlessEqual(self.app.get_file_in_previous_generation("xx"), - None) - - def testReturnsNoneIfFileDidNotExist(self): - self.failUnlessEqual(self.app.get_file_in_previous_generation("xx"), - None) - - def testReturnsFileComponentIfFileDidExist(self): - cmp = self.app.get_file_in_previous_generation("pink") - self.failUnlessEqual(cmp.get_kind(), obnam.cmp.FILE) - - -class ApplicationSelectFilesToBackUpTests(unittest.TestCase): - - class MockStore: - - def __init__(self, objs): - self._objs = objs - - def get_object(self, id): - for obj in self._objs: - if obj.get_id() == id: - return obj - return None - - def setUp(self): - self.dirname = "dirname" - self.stats = { - "dirname/pink": obnam.utils.make_stat_result(st_mtime=42), - "dirname/pretty": obnam.utils.make_stat_result(st_mtime=105), - } - self.names = ["pink", "pretty"] - self.pink = self.mock_filegroup(["pink"]) - self.pretty = self.mock_filegroup(["pretty"]) - self.groups = [self.pink, self.pretty] - - self.dir = obnam.obj.DirObject(id="id", name=self.dirname, - filegrouprefs=[x.get_id() - for x in self.groups]) - - store = self.MockStore(self.groups + [self.dir]) - - context = obnam.context.Context() - self.app = obnam.Application(context) - self.app._store = store - self.app.get_dir_in_previous_generation = self.mock_get_dir_in_prevgen - - def mock_get_dir_in_prevgen(self, dirname): - if dirname == self.dirname: - return self.dir - else: - return None - - def mock_filegroup(self, filenames): - fg = obnam.obj.FileGroupObject(id=obnam.obj.object_id_new()) - for filename in filenames: - st = self.mock_stat(os.path.join(self.dirname, filename)) - fg.add_file(filename, st, None, None, None) - return fg - - def mock_stat(self, filename): - return self.stats[filename] - - def select(self): - return self.app.select_files_to_back_up(self.dirname, self.names, - stat=self.mock_stat) - - def testReturnsNoOldGroupsIfDirectoryDidNotExist(self): - self.dir = None - self.failUnlessEqual(self.select(), ([], self.names)) - - def testReturnsNoOldGroupsIfEverythingIsChanged(self): - self.stats["dirname/pink"] = obnam.utils.make_stat_result() - self.stats["dirname/pretty"] = obnam.utils.make_stat_result() - self.failUnlessEqual(self.select(), ([], self.names)) - - def testReturnsOneGroupAndOneFileWhenJustOneIsChanged(self): - self.stats["dirname/pink"] = obnam.utils.make_stat_result() - self.failUnlessEqual(self.select(), ([self.pretty], ["pink"])) - - def testReturnsBothGroupsWhenNothingIsChanged(self): - self.failUnlessEqual(self.select(), (self.groups, [])) - - -class ApplicationFindFileByNameTests(unittest.TestCase): - - def setUp(self): - context = obnam.context.Context() - self.app = obnam.Application(context) - - def testFindsFileInfoInFilelistFromPreviousGeneration(self): - stat = obnam.utils.make_stat_result() - fc = obnam.filelist.create_file_component_from_stat("pink", stat, - "contref", - "sigref", - "deltaref") - filelist = obnam.filelist.Filelist() - filelist.add_file_component("pink", fc) - self.app.set_prevgen_filelist(filelist) - file = self.app.find_file_by_name("pink") - self.failUnlessEqual( - obnam.cmp.parse_stat_component( - file.first_by_kind(obnam.cmp.STAT)), - stat) - self.failUnlessEqual(file.first_string_by_kind(obnam.cmp.CONTREF), - "contref") - self.failUnlessEqual(file.first_string_by_kind(obnam.cmp.SIGREF), - "sigref") - self.failUnlessEqual(file.first_string_by_kind(obnam.cmp.DELTAREF), - "deltaref") - - def testFindsNoFileInfoInFilelistForNonexistingFile(self): - filelist = obnam.filelist.Filelist() - self.app.set_prevgen_filelist(filelist) - self.failUnlessEqual(self.app.find_file_by_name("pink"), None) - - -class ApplicationBackupsOneDirectoryTests(unittest.TestCase): - - def abs(self, relative_name): - return os.path.join(self.dirname, relative_name) - - def make_file(self, name): - file(self.abs(name), "w").close() - - def make_dirobject(self, relative_name): - return obnam.obj.DirObject(id=obnam.obj.object_id_new(), - name=self.abs(relative_name)) - - def setUp(self): - context = obnam.context.Context() - self.app = obnam.Application(context) - self.dirname = tempfile.mkdtemp() - - def tearDown(self): - shutil.rmtree(self.dirname) - - def testWithCorrectName(self): - dir = self.app.backup_one_dir(self.dirname, [], [], is_root=True) - self.failUnlessEqual(dir.get_name(), self.dirname) - - def testWithCorrectStat(self): - dir = self.app.backup_one_dir(self.dirname, [], []) - self.failUnlessEqual(dir.get_stat(), os.stat(self.dirname)) - - def testWithCorrectNumberOfDirrefsWhenThereAreNoneGiven(self): - dir = self.app.backup_one_dir(self.dirname, [], []) - self.failUnlessEqual(dir.get_dirrefs(), []) - - def testWithCorrectNumberOfFilegrouprefsWhenThereAreNoneGiven(self): - dir = self.app.backup_one_dir(self.dirname, [], []) - self.failUnlessEqual(dir.get_filegrouprefs(), []) - - def _filegroups(self, file_count): - max = obnam.app.MAX_PER_FILEGROUP - return (file_count + max - 1) / max - - def testWithCorrectNumberOfFilegrouprefsWhenSomeAreGiven(self): - self.make_file("pink") - self.make_file("pretty") - files = os.listdir(self.dirname) - files = [name for name in files if os.path.isfile(self.abs(name))] - dir = self.app.backup_one_dir(self.dirname, [], files) - self.failUnlessEqual(len(dir.get_filegrouprefs()), - self._filegroups(len(files))) - - def testWithCorrectNumberOfDirrefsWhenSomeAreGiven(self): - os.mkdir(self.abs("pink")) - os.mkdir(self.abs("pretty")) - subdirs = [self.make_dirobject(_) for _ in ["pink", "pretty"]] - dir = self.app.backup_one_dir(self.dirname, subdirs, []) - self.failUnlessEqual(len(dir.get_dirrefs()), 2) - - -class ApplicationBackupOneRootTests(unittest.TestCase): - - _tree = ( - "file0", - "pink/", - "pink/file1", - "pink/dir1/", - "pink/dir1/dir2/", - "pink/dir1/dir2/file2", - ) - - def abs(self, relative_name): - return os.path.join(self.dirname, relative_name) - - def mktree(self, tree): - for name in tree: - if name.endswith("/"): - name = self.abs(name[:-1]) - self.dirs.append(name) - os.mkdir(name) - else: - name = self.abs(name) - self.files.append(name) - file(name, "w").close() - - def mock_backup_one_dir(self, dirname, subdirs, filenames, is_root=False): - self.dirs_walked.append(dirname) - assert dirname not in self.subdirs_walked - self.subdirs_walked[dirname] = [os.path.join(dirname, x.get_name()) - for x in subdirs] - return self.real_backup_one_dir(dirname, subdirs, filenames, - is_root=is_root) - - def find_subdirs(self): - dict = {} - for dirname, dirnames, filenames in os.walk(self.dirname): - dict[dirname] = [os.path.join(dirname, _) for _ in dirnames] - return dict - - def setUp(self): - context = obnam.context.Context() - self.app = obnam.Application(context) - self.real_backup_one_dir = self.app.backup_one_dir - self.app.backup_one_dir = self.mock_backup_one_dir - self.dirs_walked = [] - self.subdirs_walked = {} - self.dirname = tempfile.mkdtemp() - self.dirs = [self.dirname] - self.files = [] - self.mktree(self._tree) - - def tearDown(self): - shutil.rmtree(self.dirname) - - def testRaisesErrorForNonDirectory(self): - self.failUnlessRaises(obnam.ObnamException, - self.app.backup_one_root, - self.abs("file0")) - - def testReturnsDirObject(self): - ret = self.app.backup_one_root(self.dirname) - self.failUnless(isinstance(ret, obnam.obj.DirObject)) - - def testWalksToTheRightDirectories(self): - self.app.backup_one_root(self.dirname) - self.failUnlessEqual(self.dirs_walked, list(reversed(self.dirs))) - - def testFindsTheRightSubdirs(self): - self.app.backup_one_root(self.dirname) - self.failUnlessEqual(self.subdirs_walked, self.find_subdirs()) - - -class ApplicationBackupTests(unittest.TestCase): - - _tree = ( - "file0", - "pink/", - "pink/file1", - "pink/dir1/", - "pink/dir1/dir2/", - "pink/dir1/dir2/file2", - "pretty/", - ) - - def abs(self, relative_name): - return os.path.join(self.dirname, relative_name) - - def mktree(self, tree): - for name in tree: - if name.endswith("/"): - name = self.abs(name[:-1]) - os.mkdir(name) - else: - name = self.abs(name) - file(name, "w").close() - - def mock_backup_one_root(self, root): - self.roots_backed_up.append(root) - return self.real_backup_one_root(root) - - def setUp(self): - self.dirname = tempfile.mkdtemp() - self.mktree(self._tree) - self.roots_backed_up = [] - context = obnam.context.Context() - self.app = obnam.Application(context) - self.real_backup_one_root = self.app.backup_one_root - self.app.backup_one_root = self.mock_backup_one_root - - def testCallsBackupOneRootForEachRoot(self): - dirs = [self.abs(x) for x in ["pink", "pretty"]] - self.app.backup(dirs) - self.failUnlessEqual(self.roots_backed_up, dirs) - - def testReturnsGenerationObject(self): - ret = self.app.backup([self.abs("pink"), self.abs("pretty")]) - self.failUnless(isinstance(ret, obnam.obj.GenerationObject)) - - def testReturnsGenerationWithTheRightRootObjects(self): - gen = self.app.backup([self.abs("pink"), self.abs("pretty")]) - self.failUnlessEqual(len(gen.get_dirrefs()), 2) - - def testReturnsGenerationWithTimeStamps(self): - gen = self.app.backup([self.abs("pink"), self.abs("pretty")]) - self.failIfEqual(gen.get_start_time(), None) - self.failIfEqual(gen.get_end_time(), None) diff --git a/obnam/backend.py b/obnam/backend.py deleted file mode 100644 index 73c812ad..00000000 --- a/obnam/backend.py +++ /dev/null @@ -1,388 +0,0 @@ -# Copyright (C) 2006, 2007 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Backup program backend for communicating with the backup server.""" - - -import logging -import os -import pwd -import stat -import urlparse - -import paramiko - -import uuid -import obnam.cache -import obnam.cmp -import obnam.map -import obnam.obj - - -# Block filenames are created using the following scheme: -# -# For each backup run, we create one directory, named by a UUID. Inside -# this directory we create sub-directories, named by sequential integers, -# up to a certain number of levels. The actual block files are created at -# the lowest level, and we create the next lowest level directory when -# we've reached some maximum of files in the directory. -# -# The rationale is that having too many files in one directory makes all -# operations involving that directory slow, in many filesystems, because -# of linear searches. By putting, say, only 256 files per directory, we -# can keep things reasonably fast. However, if we create a a lot of blocks, -# we'll end up creating a lot of directories, too. Thus, several levels of -# directories are needed. -# -# With 256 files per directory, and three levels of directories, and one -# megabyte per block file, we can create 16 terabytes of backup data without -# exceeding contraints. After that, we get more than 256 entries per -# directory, making things slow, but it'll still work. - -MAX_BLOCKS_PER_DIR = 256 -LEVELS = 3 - - -def parse_store_url(url): - """Parse a store url - - The url must either be a plain pathname, or it starts with sftp:// - and specifies a remote store. Return a tuple username, host, port, - path, where elements can be None if they are meant to be the default - or are not relevant. - - Note that we follow the bzr (and lftp?) syntax: sftp://foo/bar is an - absolute path, /foo, and sftp://foo/~/bar is "bar" relative to the - user's home directory. - - """ - - user = host = port = path = None - (scheme, netloc, path, query, fragment) = urlparse.urlsplit(url) - - if scheme == "sftp": - if "@" in netloc: - (user, netloc) = netloc.split("@", 1) - if ":" in netloc: - (host, port) = netloc.split(":", 1) - port = int(port) - else: - host = netloc - if path.startswith("/~/"): - path = path[3:] - else: - path = url - - return user, host, port, path - - -class DummyProgressReporter: - - def nop(self, *args): - pass - - update_current_action = nop - update_uploaded = nop - update_downloaded = nop - - -class Backend: - - def __init__(self, config, cache): - self.config = config - self.url = config.get("backup", "store") - - self.user, self.host, self.port, self.path = parse_store_url(self.url) - if self.user is None: - self.user = get_default_user() - if self.port is None: - self.port = 22 # 22 is the default port for ssh - - self.blockdir = None - self.dircounts = [0] * LEVELS - self.sftp_transport = None - self.sftp_client = None - self.bytes_read = 0 - self.bytes_written = 0 - self.set_progress_reporter(DummyProgressReporter()) - self.cache = cache - self.blockdir = str(uuid.uuid4()) - - def set_progress_reporter(self, progress): - """Set progress reporter to be used""" - self.progress = progress - - def get_bytes_read(self): - """Return number of bytes read from the store during this run""" - return self.bytes_read - - def get_bytes_written(self): - """Return number of bytes written to the store during this run""" - return self.bytes_written - - def increment_dircounts(self): - """Increment the counter for lowest dir level, and more if need be""" - level = len(self.dircounts) - 1 - while level >= 0: - self.dircounts[level] += 1 - if self.dircounts[level] <= MAX_BLOCKS_PER_DIR: - break - self.dircounts[level] = 0 - level -= 1 - - def generate_block_id(self): - """Generate a new identifier for the block, when stored remotely""" - self.increment_dircounts() - id = self.blockdir - for i in self.dircounts: - id = os.path.join(id, "%d" % i) - return id - - def block_remote_pathname(self, block_id): - """Return pathname on server for a given block id""" - return os.path.join(self.path, block_id) - - def use_gpg(self): - """Should we use gpg to encrypt/decrypt blocks?""" - no_gpg = self.config.getboolean("backup", "no-gpg") - if no_gpg: - return False - encrypt_to = self.config.get("backup", "gpg-encrypt-to").strip() - return encrypt_to - - def upload_block(self, block_id, block, to_cache): - """Upload block to server, and possibly to cache as well.""" - logging.debug("Uploading block %s" % block_id) - if self.use_gpg(): - logging.debug("Encrypting block %s before upload" % block_id) - block = obnam.gpg.encrypt(self.config, block) - logging.debug("Uploading block %s (%d bytes)" % (block_id, len(block))) - self.progress.update_current_action("Uploading block") - self.really_upload_block(block_id, block) - if to_cache and self.config.get("backup", "cache"): - logging.debug("Putting uploaded block to cache, as well") - self.cache.put_block(block_id, block) - - def download_block(self, block_id): - """Download a block from the remote server - - Return the unparsed block (a string), or raise an exception for errors. - - """ - - logging.debug("Downloading block %s" % block_id) - self.progress.update_current_action("Downloading block") - block = self.really_download_block(block_id) - - if self.use_gpg(): - logging.debug("Decrypting downloaded block %s before using it" % - block_id) - block = obnam.gpg.decrypt(self.config, block) - - return block - - def remove(self, block_id): - """Remove a block from the remote server""" - pathname = self.block_remote_pathname(block_id) - try: - self.remove_pathname(pathname) - except IOError: - # We ignore any errors in removing a file. - pass - - -class SftpBackend(Backend): - - io_size = 64 * 1024 - - def load_key(self, filename): - """Load an SSH private key from a file.""" - try: - return paramiko.DSSKey.from_private_key_file(filename) - except paramiko.SSHException: - return paramiko.RSAKey.from_private_key_file(filename) - - def connect_sftp(self): - """Connect to the server, unless already connected""" - if self.sftp_transport is None: - ssh_key_file = self.config.get("backup", "ssh-key") - logging.debug("Getting private key from %s" % ssh_key_file) - pkey = self.load_key(ssh_key_file) - - logging.debug("Connecting to sftp server: host=%s, port=%d" % - (self.host, self.port)) - self.sftp_transport = paramiko.Transport((self.host, self.port)) - - logging.debug("Authenticating as user %s" % self.user) - self.sftp_transport.connect(username=self.user, pkey=pkey) - - logging.debug("Opening sftp client") - self.sftp_client = self.sftp_transport.open_sftp_client() - - def close(self): - """Close the connection, if any.""" - if self.sftp_transport: - self.sftp_transport.close() - - def sftp_makedirs(self, dirname, mode=0777): - """Create dirname, if it doesn't exist, and all its parents, too""" - stack = [] - while dirname: - stack.append(dirname) - dirname2 = os.path.dirname(dirname) - if dirname2 == dirname: - dirname = None - else: - dirname = dirname2 - - while stack: - dirname, stack = stack[-1], stack[:-1] - try: - self.sftp_client.lstat(dirname).st_mode - except IOError: - exists = False - else: - exists = True - if not exists: - logging.debug("Creating remote directory %s" % dirname) - self.sftp_client.mkdir(dirname, mode=mode) - - def really_upload_block(self, block_id, block): - self.connect_sftp() - pathname = self.block_remote_pathname(block_id) - self.sftp_makedirs(os.path.dirname(pathname)) - f = self.sftp_client.file(pathname, "w") - self.sftp_client.chmod(pathname, 0600) - for offset in range(0, len(block), self.io_size): - block_part = block[offset:offset+self.io_size] - f.write(block_part) - self.bytes_written += len(block_part) - self.progress.update_uploaded(self.bytes_written) - f.close() - - def really_download_block(self, block_id): - try: - self.connect_sftp() - f = self.sftp_client.file(self.block_remote_pathname(block_id), - "r") - block_parts = [] - while True: - block_part = f.read(self.io_size) - if not block_part: - break - block_parts.append(block_part) - self.bytes_read += len(block_part) - self.progress.update_downloaded(self.bytes_read) - block = "".join(block_parts) - f.close() - if self.config.get("backup", "cache"): - self.cache.put_block(block_id, block) - except IOError, e: - logging.warning("I/O error: %s" % str(e)) - raise e - return block - - def sftp_listdir_abs(self, dirname): - """Like SFTPClient's listdir_attr, but absolute pathnames""" - items = self.sftp_client.listdir_attr(dirname) - for item in items: - item.filename = os.path.join(dirname, item.filename) - return items - - def sftp_recursive_listdir(self, dirname="."): - """Similar to SFTPClient's listdir_attr, but recursively""" - list = [] - logging.debug("sftp: listing files in %s" % dirname) - unprocessed = self.sftp_listdir_abs(dirname) - while unprocessed: - item, unprocessed = unprocessed[0], unprocessed[1:] - if stat.S_ISDIR(item.st_mode): - logging.debug("sftp: listing files in %s" % item.filename) - unprocessed += self.sftp_listdir_abs(item.filename) - elif stat.S_ISREG(item.st_mode): - list.append(item.filename) - return list - - def list(self): - """Return list of all files on the remote server""" - return self.sftp_recursive_listdir(self.path) - - def remove_pathname(self, pathname): - self.sftp_client.remove(pathname) - - -class FileBackend(Backend): - - def close(self): - pass - - def really_upload_block(self, block_id, block): - dir_full = os.path.join(self.path, os.path.dirname(block_id)) - if not os.path.isdir(dir_full): - os.makedirs(dir_full, 0700) - fd = os.open(self.block_remote_pathname(block_id), - os.O_WRONLY | os.O_TRUNC | os.O_CREAT, - 0600) - f = os.fdopen(fd, "w") - f.write(block) - self.bytes_written += len(block) - self.progress.update_uploaded(self.bytes_written) - f.close() - - def really_download_block(self, block_id): - try: - f = file(self.block_remote_pathname(block_id), "r") - block = f.read() - self.bytes_read += len(block) - self.progress.update_uploaded(self.bytes_read) - f.close() - except IOError, e: - raise e - return block - - def list(self): - """Return list of all files on the remote server""" - list = [] - for dirpath, _, filenames in os.walk(self.path): - if dirpath.startswith(self.path): - dirpath = dirpath[len(self.path):] - if dirpath.startswith(os.sep): - dirpath = dirpath[len(os.sep):] - list += [os.path.join(dirpath, x) for x in filenames] - return list - - def remove_pathname(self, pathname): - """Remove a block from the remote server""" - if os.path.exists(pathname): - os.remove(pathname) - - -def get_default_user(): - """Return the username of the current user""" - if "LOGNAME" in os.environ: - return os.environ["LOGNAME"] - else: - return pwd.getpwuid(os.getuid())[0] - - -def init(config, cache): - """Initialize the subsystem and return an opaque backend object""" - _, host, _, _ = parse_store_url(config.get("backup", "store")) - if host is None: - return FileBackend(config, cache) - else: - return SftpBackend(config, cache) diff --git a/obnam/backendTests.py b/obnam/backendTests.py deleted file mode 100644 index 632615b5..00000000 --- a/obnam/backendTests.py +++ /dev/null @@ -1,281 +0,0 @@ -# Copyright (C) 2006 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Unit tests for obnam.backend""" - - -import os -import pwd -import shutil -import stat -import tempfile -import unittest - -import obnam - - -class GetDefaultUserTest(unittest.TestCase): - - def setUp(self): - self.orig = os.environ.get("LOGNAME", None) - - def tearDown(self): - if self.orig is not None: - os.environ["LOGNAME"] = self.orig - else: - del os.environ["LOGNAME"] - - def testLogname(self): - os.environ["LOGNAME"] = "pink" - self.failUnlessEqual(obnam.backend.get_default_user(), "pink") - - def testLognameWhenItIsPink(self): - # Just in case the user's name is "pink"... - os.environ["LOGNAME"] = "pretty" - self.failUnlessEqual(obnam.backend.get_default_user(), "pretty") - - def testNoLogname(self): - del os.environ["LOGNAME"] - user = obnam.backend.get_default_user() - uid = pwd.getpwnam(user)[2] - self.failUnlessEqual(uid, os.getuid()) - - -class ParseStoreUrlTests(unittest.TestCase): - - def test(self): - cases = ( - ("", None, None, None, ""), - ("foo", None, None, None, "foo"), - ("/", None, None, None, "/"), - ("sftp://host", None, "host", None, ""), - ("sftp://host/", None, "host", None, "/"), - ("sftp://host/foo", None, "host", None, "/foo"), - ("sftp://user@host/foo", "user", "host", None, "/foo"), - ("sftp://host:22/foo", None, "host", 22, "/foo"), - ("sftp://user@host:22/foo", "user", "host", 22, "/foo"), - ("sftp://host/~/foo", None, "host", None, "foo"), - ) - for case in cases: - user, host, port, path = obnam.backend.parse_store_url(case[0]) - self.failUnlessEqual(user, case[1]) - self.failUnlessEqual(host, case[2]) - self.failUnlessEqual(port, case[3]) - self.failUnlessEqual(path, case[4]) - - -class UseGpgTests(unittest.TestCase): - - def setUp(self): - self.config = obnam.config.default_config() - self.config.set("backup", "gpg-encrypt-to", "") - self.cache = obnam.cache.Cache(self.config) - self.be = obnam.backend.Backend(self.config, self.cache) - - def testDoNotUseByDefault(self): - self.failIf(self.be.use_gpg()) - - def testUseIfRequested(self): - self.config.set("backup", "gpg-encrypt-to", "pink") - self.failUnless(self.be.use_gpg()) - - def testDoNotUseEvenIfRequestedIfNoGpgIsSet(self): - self.config.set("backup", "gpg-encrypt-to", "pink") - self.config.set("backup", "no-gpg", "true") - self.failIf(self.be.use_gpg()) - - -class DircountTests(unittest.TestCase): - - def setUp(self): - self.config = obnam.config.default_config() - self.cache = obnam.cache.Cache(self.config) - self.be = obnam.backend.Backend(self.config, self.cache) - - def testInit(self): - self.failUnlessEqual(len(self.be.dircounts), obnam.backend.LEVELS) - for i in range(obnam.backend.LEVELS): - self.failUnlessEqual(self.be.dircounts[i], 0) - - def testIncrementOnce(self): - self.be.increment_dircounts() - self.failUnlessEqual(self.be.dircounts, [0, 0, 1]) - - def testIncrementMany(self): - for i in range(obnam.backend.MAX_BLOCKS_PER_DIR): - self.be.increment_dircounts() - self.failUnlessEqual(self.be.dircounts, - [0, 0, obnam.backend.MAX_BLOCKS_PER_DIR]) - - self.be.increment_dircounts() - self.failUnlessEqual(self.be.dircounts, [0, 1, 0]) - - self.be.increment_dircounts() - self.failUnlessEqual(self.be.dircounts, [0, 1, 1]) - - def testIncrementTop(self): - self.be.dircounts = [0] + \ - [obnam.backend.MAX_BLOCKS_PER_DIR] * (obnam.backend.LEVELS -1) - self.be.increment_dircounts() - self.failUnlessEqual(self.be.dircounts, [1, 0, 0]) - - -class LocalBackendBase(unittest.TestCase): - - def setUp(self): - self.cachedir = "tmp.cachedir" - self.rootdir = "tmp.rootdir" - - os.mkdir(self.cachedir) - os.mkdir(self.rootdir) - - config_list = ( - ("backup", "cache", self.cachedir), - ("backup", "store", self.rootdir) - ) - - self.config = obnam.config.default_config() - for section, item, value in config_list: - self.config.set(section, item, value) - - self.cache = obnam.cache.Cache(self.config) - - def tearDown(self): - shutil.rmtree(self.cachedir) - shutil.rmtree(self.rootdir) - del self.cachedir - del self.rootdir - del self.config - - -class InitTests(LocalBackendBase): - - def testInit(self): - be = obnam.backend.init(self.config, self.cache) - self.failUnlessEqual(be.url, self.rootdir) - - -class IdTests(LocalBackendBase): - - def testGenerateBlockId(self): - be = obnam.backend.init(self.config, self.cache) - self.failIfEqual(be.blockdir, None) - id = be.generate_block_id() - self.failUnless(id.startswith(be.blockdir)) - id2 = be.generate_block_id() - self.failIfEqual(id, id2) - - -class UploadTests(LocalBackendBase): - - def testUpload(self): - self.config.set("backup", "gpg-home", "") - self.config.set("backup", "gpg-encrypt-to", "") - self.config.set("backup", "gpg-sign-with", "") - be = obnam.backend.init(self.config, self.cache) - id = be.generate_block_id() - block = "pink is pretty" - ret = be.upload_block(id, block, False) - self.failUnlessEqual(ret, None) - self.failUnlessEqual(be.get_bytes_read(), 0) - self.failUnlessEqual(be.get_bytes_written(), len(block)) - - pathname = os.path.join(self.rootdir, id) - self.failUnless(os.path.isfile(pathname)) - - st = os.lstat(pathname) - self.failUnlessEqual(stat.S_IMODE(st.st_mode), 0600) - - f = file(pathname, "r") - data = f.read() - f.close() - self.failUnlessEqual(block, data) - - def testUploadToCache(self): - cachedir = self.config.get("backup", "cache") - self.failUnlessEqual(os.listdir(cachedir), []) - - self.config.set("backup", "gpg-home", "") - self.config.set("backup", "gpg-encrypt-to", "") - self.config.set("backup", "gpg-sign-with", "") - self.config.set("backup", "cache", cachedir) - - be = obnam.backend.init(self.config, self.cache) - id = be.generate_block_id() - block = "pink is pretty" - ret = be.upload_block(id, block, True) - self.failIfEqual(os.listdir(cachedir), []) - - -class DownloadTests(LocalBackendBase): - - def testOK(self): - self.config.set("backup", "gpg-home", "") - self.config.set("backup", "gpg-encrypt-to", "") - self.config.set("backup", "gpg-sign-with", "") - - be = obnam.backend.init(self.config, self.cache) - id = be.generate_block_id() - block = "pink is still pretty" - be.upload_block(id, block, False) - - success = be.download_block(id) - self.failUnlessEqual(type(success), type("")) - self.failUnlessEqual(be.get_bytes_read(), len(block)) - self.failUnlessEqual(be.get_bytes_written(), len(block)) - - def testError(self): - be = obnam.backend.init(self.config, self.cache) - id = be.generate_block_id() - self.failUnlessRaises(IOError, be.download_block, id) - - -class FileListTests(LocalBackendBase): - - def testFileList(self): - self.config.set("backup", "gpg-home", "") - self.config.set("backup", "gpg-encrypt-to", "") - self.config.set("backup", "gpg-sign-with", "") - - be = obnam.backend.init(self.config, self.cache) - self.failUnlessEqual(be.list(), []) - - id = "pink" - block = "pretty" - be.upload_block(id, block, False) - list = be.list() - self.failUnlessEqual(list, [id]) - - filename = os.path.join(self.rootdir, id) - f = file(filename, "r") - block2 = f.read() - f.close() - self.failUnlessEqual(block, block2) - - -class RemoveTests(LocalBackendBase): - - def test(self): - be = obnam.backend.init(self.config, self.cache) - id = be.generate_block_id() - block = "pink is still pretty" - be.upload_block(id, block, False) - - self.failUnlessEqual(be.list(), [id]) - - be.remove(id) - self.failUnlessEqual(be.list(), []) diff --git a/obnam/cache.py b/obnam/cache.py deleted file mode 100644 index f7b86f1c..00000000 --- a/obnam/cache.py +++ /dev/null @@ -1,62 +0,0 @@ -# Copyright (C) 2006 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Block cache for backup program""" - - -import os - - -class Cache: - - def __init__(self, config): - self.cachedir = config.get("backup", "cache") - - def cache_pathname(self, block_id): - """Return pathname in local block cache for a given block id""" - return os.path.join(self.cachedir, block_id) - - - def create_new_cache_file(self, block_id): - """Create a new file in the local block cache, open for writing""" - pathname = self.cache_pathname(block_id) - dirname = os.path.dirname(pathname) - if not os.path.isdir(dirname): - os.makedirs(dirname, 0700) - return file(pathname + ".new", "w", 0600) - - def close_new_cache_file(self, block_id, f): - """Close a file opened by open_cache_file""" - f.close() - pathname = self.cache_pathname(block_id) - os.rename(pathname + ".new", pathname) - - def put_block(self, block_id, block): - """Put a block into the cache""" - f = self.create_new_cache_file(block_id) - f.write(block) - self.close_new_cache_file(block_id, f) - - def get_block(self, block_id): - """Return the contents of a block in the block cache, or None""" - try: - f = file(self.cache_pathname(block_id), "r") - block = f.read() - f.close() - except IOError, e: - return None - return block diff --git a/obnam/cacheTests.py b/obnam/cacheTests.py deleted file mode 100644 index fd42ead1..00000000 --- a/obnam/cacheTests.py +++ /dev/null @@ -1,81 +0,0 @@ -# Copyright (C) 2006 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Unit tests for obnam.cache""" - - -import os -import shutil -import unittest - -import obnam - - -class CacheBase(unittest.TestCase): - - def setUp(self): - self.cachedir = "tmp.cachedir" - - config_list = ( - ("backup", "cache", self.cachedir), - ) - - self.config = obnam.cfgfile.ConfigFile() - for section, item, value in config_list: - if not self.config.has_section(section): - self.config.add_section(section) - self.config.set(section, item, value) - - def tearDown(self): - if os.path.exists(self.cachedir): - shutil.rmtree(self.cachedir) - del self.cachedir - del self.config - - -class InitTests(CacheBase): - - def testInit(self): - cache = obnam.cache.Cache(self.config) - self.failIf(os.path.isdir(self.cachedir)) - - -class PutTests(CacheBase): - - def testPut(self): - cache = obnam.cache.Cache(self.config) - id = "pink" - block = "pretty" - cache.put_block(id, block) - - pathname = os.path.join(self.cachedir, id) - self.failUnless(os.path.isfile(pathname)) - f = file(pathname, "r") - self.failUnlessEqual(f.read(), block) - f.close() - - -class GetTests(CacheBase): - - def testGet(self): - cache = obnam.cache.Cache(self.config) - id = "pink" - block = "pretty" - self.failUnlessEqual(cache.get_block(id), None) - - cache.put_block(id, block) - self.failUnlessEqual(cache.get_block(id), block) diff --git a/obnam/cfgfile.py b/obnam/cfgfile.py deleted file mode 100644 index 52994388..00000000 --- a/obnam/cfgfile.py +++ /dev/null @@ -1,326 +0,0 @@ -# Copyright (C) 2006 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Configuration file I/O - -This module is similar to Python's standard ConfigParser module, but -can handle options with a list of values. This is important for Obnam, -since some of its options need to be able to be specified multiple -times. For example, exclude patterns for files. - -There seems to be no good way of extending the ConfigParser class, -so this is written from scratch. - -The way it works: - - foo = bar - # foo now has one value, "bar" - foo += foobar - # note the +=; foo now has two values, "bar" and "foobar" - foo = pink - # foo now has one value again, "pink" - -This also works across configuration files. - -This module does not support the interpolation or defaults features -of ConfigParser. It should otherwise be compatible. - -""" - - -import re - -import obnam - - -class Error(obnam.ObnamException): - - pass - - -class DuplicationError(Error): - - def __init__(self, section): - self._msg = "section %s already exists" % section - - -class NoSectionError(Error): - - def __init__(self, section): - self._msg = "configuration file does not have section %s" % section - - -class NoOptionError(Error): - - def __init__(self, section, option): - self._msg = ("configuration file does not have option %s " - "in section %s" % (option, section)) - - -class ParsingError(Error): - - def __init__(self, filename, lineno): - if filename is None: - self._msg = "Syntax error on line %d of unnamed file" % lineno - else: - self._msg = "Syntax error in %s, line %d" % (filename, lineno) - - -class ConfigFile: - - def __init__(self): - self._dict = {} - - def optionxform(self, option): - """Transform name of option into canonical form""" - return option.lower() - - def has_section(self, section): - """Does this configuration file have a particular section?""" - return section in self._dict - - def add_section(self, section): - """Add a new, empty section called section""" - if self.has_section(section): - raise DuplicationError(section) - self._dict[section] = {} - - def parse_string(self, str): - """Parse a string as a configuration file""" - - def sections(self): - """Return all sections we know about""" - return sorted(self._dict.keys()) - - def options(self, section): - """Return list of option names used in a given section""" - if not self.has_section(section): - raise NoSectionError(section) - return sorted(self._dict[section].keys()) - - def has_option(self, section, option): - """Does a section have a particular option?""" - if not self.has_section(section): - raise NoSectionError(section) - option = self.optionxform(option) - return option in self._dict[section] - - def set(self, section, option, value): - """Set the value of an option in a section - - Note that this replaces all existing values. - - """ - if not self.has_section(section): - raise NoSectionError(section) - option = self.optionxform(option) - self._dict[section][option] = [value] - - def get(self, section, option): - """Return the value of an option in a section - - Note that this can return a string or a list of strings, depending - on whether the option has a single value, or several. If the option - has not been set, NoOptionError is raised. - - """ - if not self.has_section(section): - raise NoSectionError(section) - option = self.optionxform(option) - if not self.has_option(section, option): - raise NoOptionError(section, option) - value = self._dict[section][option] - if len(value) == 1: - return value[0] - else: - return value - - def getint(self, section, option): - """Return value of an option in a section as an integer - - If the value is not a single string encoding an integer, then - ValueError is raised. - - """ - return int(self.get(section, option), 0) - - def getfloat(self, section, option): - """Return value of an option in a section as a floating point value - - If the value is not a single string encoding a floating point, then - ValueError is raised. - - """ - return float(self.get(section, option)) - - def getboolean(self, section, option): - """Convert value of option in section into a boolean value - - The value must be a single string that is "yes", "true", "on", or - "1" for True (ignoring upper/lowercase), or "no", "false", "off", or - "0" for False. Any other value will cause ValueError to be raised. - - """ - value = self.get(section, option) - value = value.lower().strip() - if value in ["yes", "on", "true", "1"]: - return True - if value in ["no", "off", "false", "0"]: - return False - raise ValueError - - def getvalues(self, section, option): - """Return list of values for an option in a section - - Note that the return value is always a list of strings. It might - be empty. - - """ - values = self.get(section, option) - if values == "": - return [] - if type(values) != type([]): - values = [values] - return values - - def append(self, section, option, value): - """Append a new value for an option""" - if not self.has_section(section): - raise NoSectionError(section) - option = self.optionxform(option) - if self.has_option(section, option): - self._dict[section][option].append(value) - else: - self._dict[section][option] = [value] - - def items(self, section): - """Return list of (option, value) pairs for a section - - Note that the value is a single string, or a list of strings, - similar to the get method. - """ - - list = [] - for option in self.options(section): - list.append((option, self.get(section, option))) - return list - - def remove_option(self, section, option): - """Remove an option (all values) from a section""" - if not self.has_section(section): - raise NoSectionError(section) - option = self.optionxform(option) - if self.has_option(section, option): - del self._dict[section][option] - return True - else: - return False - - def remove_section(self, section): - """Remove a section""" - if self.has_section(section): - del self._dict[section] - return True - else: - return False - - def write(self, f): - """Write configuration file to open file""" - for section in self.sections(): - f.write("[%s]\n" % section) - for option in self.options(section): - values = self.get(section, option) - if type(values) != type([]): - f.write("%s = %s\n" % (option, values)) - else: - if values: - f.write("%s = %s\n" % (option, values[0])) - for value in values[1:]: - f.write("%s += %s\n" % (option, value)) - - # Regular expression patterns for parsing configuration files. - comment_pattern = re.compile(r"\s*(#.*)?$") - section_pattern = re.compile(r"\[(?P<section>.*)\]$") - option_line1_pattern = re.compile(r"(?P<option>\S*)\s*(?P<op>\+?=)" + - r"(?P<value>.*)$") - option_line2_pattern = re.compile(r"\s+(?P<value>.*)$") - - def handle_section(self, section, option, match): - section = match.group("section") - if not self.has_section(section): - # It's OK for the section to exist already. We might be reading - # several configuration files into the same CfgFile object. - self.add_section(section) - return section, option - - def handle_option_line1(self, section, option, match): - option = match.group("option") - op = match.group("op") - value = match.group("value") - value = value.strip() - if op == "+=": - self.append(section, option, value) - else: - self.set(section, option, value) - return section, option - - def handle_option_line2(self, section, option, match): - value = match.group("value") - - values = self.get(section, option) - if type(values) != type([]): - values = [values] - if values: - values[-1] = values[-1] + " " + value.strip() - - self.remove_option(section, option) - for value in values: - self.append(section, option, value) - - return section, option - - def handle_comment(self, section, option, match): - return section, option - - def readfp(self, f, filename=None): - """Read configuration file from open file""" - filename = filename or getattr(f, "filename", None) - - lineno = 0 - section = None - option = None - - matchers = ((self.comment_pattern, self.handle_comment), - (self.section_pattern, self.handle_section), - (self.option_line1_pattern, self.handle_option_line1), - (self.option_line2_pattern, self.handle_option_line2), - ) - - while True: - line = f.readline() - if not line: - break - lineno += 1 - - m = None - for pattern, func in matchers: - m = pattern.match(line) - if m: - section, option = func(section, option, m) - break - if not m: - raise ParsingError(filename, lineno) diff --git a/obnam/cfgfileTests.py b/obnam/cfgfileTests.py deleted file mode 100644 index af572943..00000000 --- a/obnam/cfgfileTests.py +++ /dev/null @@ -1,380 +0,0 @@ -# Copyright (C) 2006 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Unit tests for obnam.cfgfile.""" - - -import StringIO -import unittest - -import obnam - - -class ParsingErrorTests(unittest.TestCase): - - def testSaysUnnamedFileIfNameIsNone(self): - e = obnam.cfgfile.ParsingError(None, 42) - self.failUnless("unnamed file" in str(e)) - - def testIncludesFilenameInMessage(self): - e = obnam.cfgfile.ParsingError("pink", 42) - self.failUnless("pink" in str(e)) - - def testIncludesLineNumberInMessage(self): - e = obnam.cfgfile.ParsingError("pink", 42) - self.failUnless("42" in str(e)) - - -class SectionTests(unittest.TestCase): - - def setUp(self): - self.cf = obnam.cfgfile.ConfigFile() - - def tearDown(self): - self.cf = None - - def testEmptySections(self): - self.failUnlessEqual(self.cf.sections(), []) - - def testAddSectionNew(self): - self.cf.add_section("foo") - self.failUnlessEqual(self.cf.sections(), ["foo"]) - - def testAddSectionExisting(self): - self.cf.add_section("foo") - self.failUnlessRaises(obnam.cfgfile.DuplicationError, - self.cf.add_section, - "foo") - - def testHasSectionForExisting(self): - self.cf.add_section("foo") - self.failUnless(self.cf.has_section("foo")) - - def testHasSectionForNotExisting(self): - self.failIf(self.cf.has_section("foo")) - - def testSectionsEmpty(self): - self.failUnlessEqual(self.cf.sections(), []) - - def testSectionsOne(self): - self.cf.add_section("foo") - self.failUnlessEqual(self.cf.sections(), ["foo"]) - - def testSectionsMany(self): - list = ["%d" % x for x in range(100)] - for section in list: - self.cf.add_section(section) - self.failUnlessEqual(self.cf.sections(), sorted(list)) - - def testRemoveSectionNonExistentSection(self): - self.failUnlessEqual(self.cf.remove_section("foo"), False) - - def testRemoveSectionExistingSection(self): - self.cf.add_section("foo") - self.failUnlessEqual(self.cf.remove_section("foo"), True) - - -class OptionsTests(unittest.TestCase): - - def setUp(self): - self.cf = obnam.cfgfile.ConfigFile() - - def tearDown(self): - self.cf = None - - def testOptionsNonExistentSection(self): - self.failUnlessRaises(obnam.cfgfile.NoSectionError, - self.cf.options, - "foo") - - def testOptionsEmptySection(self): - self.cf.add_section("foo") - self.failUnlessEqual(self.cf.options("foo"), []) - - def testOptionsNonEmptySection(self): - self.cf.add_section("foo") - options = ["%d" % x for x in range(100)] - for option in options: - self.cf.set("foo", option, option) - self.failUnlessEqual(self.cf.options("foo"), sorted(options)) - - def testHasOptionNonExistingSection(self): - self.failUnlessRaises(obnam.cfgfile.NoSectionError, - self.cf.has_option, - "foo", "bar") - - def testHasOptionNonExistingOption(self): - self.cf.add_section("foo") - self.failIf(self.cf.has_option("foo", "bar")) - - def testHasOptionExistingOption(self): - self.cf.add_section("foo") - self.cf.set("foo", "bar", "foobar") - self.failUnless(self.cf.has_option("foo", "bar")) - - def testGetNonExistingSection(self): - self.failUnlessRaises(obnam.cfgfile.NoSectionError, - self.cf.get, - "foo", "bar") - - def testGetNonExistingOption(self): - self.cf.add_section("foo") - self.failUnlessRaises(obnam.cfgfile.NoOptionError, - self.cf.get, - "foo", "bar") - - def testSetNonExistingSection(self): - self.failUnlessRaises(obnam.cfgfile.NoSectionError, - self.cf.set, - "foo", "bar", "foobar") - - def testSetAndGet(self): - self.cf.add_section("foo") - self.cf.set("foo", "bar", "foobar") - self.failUnlessEqual(self.cf.get("foo", "bar"), "foobar") - - def testGetValuesSingle(self): - self.cf.add_section("foo") - self.cf.append("foo", "bar", "foobar") - self.failUnlessEqual(self.cf.getvalues("foo", "bar"), ["foobar"]) - - def testGetValuesMultiple(self): - self.cf.add_section("foo") - self.cf.append("foo", "bar", "foobar") - self.cf.append("foo", "bar", "baz") - self.failUnlessEqual(self.cf.getvalues("foo", "bar"), - ["foobar", "baz"]) - - def testGetValuesForEmptyValueReturnsEmptyList(self): - self.cf.add_section("foo") - self.cf.append("foo", "bar", "") - self.failUnlessEqual(self.cf.getvalues("foo", "bar"), []) - - def testAppendNonExistingSection(self): - self.failUnlessRaises(obnam.cfgfile.NoSectionError, - self.cf.append, - "foo", "bar", "foobar") - - def testAppendFirstValue(self): - self.cf.add_section("foo") - self.cf.append("foo", "bar", "foobar") - self.failUnlessEqual(self.cf.get("foo", "bar"), "foobar") - - def testAppendSecondValue(self): - self.cf.add_section("foo") - self.cf.append("foo", "bar", "foobar") - self.cf.append("foo", "bar", "baz") - self.failUnlessEqual(self.cf.get("foo", "bar"), ["foobar", "baz"]) - - def testOptionXform(self): - self.cf.add_section("foo") - self.cf.set("foo", "BAR", "foobar") - self.failUnless(self.cf.has_option("foo", "bar")) - self.failUnlessEqual(self.cf.options("foo"), ["bar"]) - self.failUnlessEqual(self.cf.get("foo", "bar"), "foobar") - - def testGetIntNonInteger(self): - self.cf.add_section("foo") - self.cf.set("foo", "bar", "foobar") - self.failUnlessRaises(ValueError, - self.cf.getint, - "foo", "bar") - - def testGetIntDecimalInteger(self): - self.cf.add_section("foo") - self.cf.set("foo", "bar", "12765") - self.failUnlessEqual(self.cf.getint("foo", "bar"), 12765) - - def testGetIntHexadecimalInteger(self): - self.cf.add_section("foo") - self.cf.set("foo", "bar", "0x12765") - self.failUnlessEqual(self.cf.getint("foo", "bar"), 0x12765) - - def testGetIntOctalInteger(self): - self.cf.add_section("foo") - self.cf.set("foo", "bar", "033") - self.failUnlessEqual(self.cf.getint("foo", "bar"), 3*8 + 3) - - def testGetFloatNonFloat(self): - self.cf.add_section("foo") - self.cf.set("foo", "bar", "foobar") - self.failUnlessRaises(ValueError, - self.cf.getfloat, - "foo", "bar") - - def testGetFloat(self): - self.cf.add_section("foo") - self.cf.set("foo", "bar", "12.765") - self.failUnlessEqual(self.cf.getfloat("foo", "bar"), 12.765) - - def testGetBooleanBad(self): - self.cf.add_section("foo") - self.cf.set("foo", "bar", "foobar") - self.failUnlessRaises(ValueError, - self.cf.getboolean, - "foo", "bar") - - def testGetBooleanTrue(self): - self.cf.add_section("foo") - for x in ["yes", "true", "on", "1"]: - self.cf.set("foo", "bar", x) - self.failUnlessEqual(self.cf.getboolean("foo", "bar"), True) - - def testGetBooleanFalse(self): - self.cf.add_section("foo") - for x in ["no", "false", "off", "0"]: - self.cf.set("foo", "bar", x) - self.failUnlessEqual(self.cf.getboolean("foo", "bar"), False) - - def testItemsNonExistentSection(self): - self.failUnlessRaises(obnam.cfgfile.NoSectionError, - self.cf.items, - "foo") - - def testItemsEmpty(self): - self.cf.add_section("foo") - self.failUnlessEqual(self.cf.items("foo"), []) - - def testItemsNonEmpty(self): - self.cf.add_section("foo") - options = ["%d" % x for x in range(4)] - for option in options: - self.cf.append("foo", option, option) - self.cf.append("foo", option, option) - self.failUnlessEqual(self.cf.items("foo"), - [("0", ["0", "0"]), - ("1", ["1", "1"]), - ("2", ["2", "2"]), - ("3", ["3", "3"])]) - - def testRemoveOptionNonExistentSection(self): - self.failUnlessRaises(obnam.cfgfile.NoSectionError, - self.cf.remove_option, - "foo", "bar") - - def testRemoveOptionNonExistentOption(self): - self.cf.add_section("foo") - self.failUnlessEqual(self.cf.remove_option("foo", "bar"), False) - - def testRemoveOptionExistingOption(self): - self.cf.add_section("foo") - self.cf.set("foo", "bar", "foobar") - self.failUnlessEqual(self.cf.remove_option("foo", "bar"), True) - self.failUnlessEqual(self.cf.items("foo"), []) - - -class WriteTests(unittest.TestCase): - - def testSingleValue(self): - cf = obnam.cfgfile.ConfigFile() - cf.add_section("foo") - cf.set("foo", "bar", "foobar") - f = StringIO.StringIO() - cf.write(f) - self.failUnlessEqual(f.getvalue(), """\ -[foo] -bar = foobar -""") - - def testMultiValue(self): - cf = obnam.cfgfile.ConfigFile() - cf.add_section("foo") - cf.append("foo", "bar", "foobar") - cf.append("foo", "bar", "baz") - f = StringIO.StringIO() - cf.write(f) - self.failUnlessEqual(f.getvalue(), """\ -[foo] -bar = foobar -bar += baz -""") - - -class ReadTests(unittest.TestCase): - - def parse(self, file_contents): - cf = obnam.cfgfile.ConfigFile() - f = StringIO.StringIO(file_contents) - cf.readfp(f) - return cf - - def testEmpty(self): - cf = self.parse("") - self.failUnlessEqual(cf.sections(), []) - - def testEmptySection(self): - cf = self.parse("[foo]\n") - self.failUnlessEqual(cf.sections(), ["foo"]) - - def testTwoEmptySection(self): - cf = self.parse("[foo]\n[bar]\n") - self.failUnlessEqual(cf.sections(), ["bar", "foo"]) - - def testParsingError(self): - self.failUnlessRaises(obnam.cfgfile.ParsingError, - self.parse, "xxxx") - - def testComment(self): - cf = self.parse("# blah\n[foo]\n\n\n") - self.failUnlessEqual(cf.sections(), ["foo"]) - - def testSingleLineSingleValue(self): - cf = self.parse("[foo]\nbar = foobar\n") - self.failUnlessEqual(cf.get("foo", "bar"), "foobar") - - def testSingleLineTwoValues(self): - cf = self.parse("[foo]\nbar = foobar\nbar += baz\n") - self.failUnlessEqual(cf.get("foo", "bar"), ["foobar", "baz"]) - - def testContinuationLine(self): - cf = self.parse("[foo]\nbar = foo\n bar\n") - self.failUnlessEqual(cf.get("foo", "bar"), "foo bar") - - def testSingleLineTwiceButOnlyOneResultValue(self): - cf = self.parse("[foo]\nbar = foobar\nbar = baz\n") - self.failUnlessEqual(cf.get("foo", "bar"), "baz") - - def testReadTwo(self): - f1 = StringIO.StringIO("""\ -[backup] -store = pink -""") - f2 = StringIO.StringIO("""\ -[backup] -cache = pretty -""") - cf = obnam.cfgfile.ConfigFile() - cf.readfp(f1) - cf.readfp(f2) - self.failUnlessEqual(cf.get("backup", "store"), "pink") - self.failUnlessEqual(cf.get("backup", "cache"), "pretty") - - -class ReadWriteTest(unittest.TestCase): - - def test(self): - cf = obnam.cfgfile.ConfigFile() - cf.add_section("foo") - cf.append("foo", "bar", "foobar") - cf.append("foo", "bar", "baz") - f = StringIO.StringIO() - cf.write(f) - f.seek(0, 0) - cf2 = obnam.cfgfile.ConfigFile() - cf2.readfp(f) - self.failUnlessEqual(cf2.sections(), ["foo"]) - self.failUnlessEqual(cf2.items("foo"), [("bar", ["foobar", "baz"])]) diff --git a/obnam/cmp.py b/obnam/cmp.py deleted file mode 100644 index d83092eb..00000000 --- a/obnam/cmp.py +++ /dev/null @@ -1,317 +0,0 @@ -# Copyright (C) 2006, 2007, 2008 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Obnam data components""" - - -import obnam - - -# Constants for component kinds - -_component_kinds = {} - -COMPOSITE_FLAG = 0x01 -REF_FLAG = 0x02 - -def _define_kind(code, is_composite, is_ref, name): - code = code << 2 - if is_composite: - code = code | COMPOSITE_FLAG - if is_ref: - code = code | REF_FLAG - assert code not in _component_kinds - assert is_composite in [True, False] - assert is_ref in [True, False] - assert (is_composite, is_ref) != (True, True) - assert name not in _component_kinds.values() - _component_kinds[code] = (is_composite, name) - return code - -def _define_composite(code, name): - return _define_kind(code, True, False, name) - -def _define_ref(code, name): - return _define_kind(code, False, True, name) - -def _define_plain(code, name): - return _define_kind(code, False, False, name) - -OBJID = _define_plain( 1, "OBJID") -OBJKIND = _define_plain( 2, "OBJKIND") -BLKID = _define_plain( 3, "BLKID") -FILECHUNK = _define_plain( 4, "FILECHUNK") -OBJECT = _define_composite( 5, "OBJECT") -OBJMAP = _define_composite( 6, "OBJMAP") -# 7-19 have been obsoleted and should not exist anywhere in the universe. -CONTREF = _define_ref( 20, "CONTREF") -NAMEIPAIR = _define_composite( 21, "NAMEIPAIR") -# 22 has been obsoleted and should not exist anywhere in the universe. -FILENAME = _define_plain( 23, "FILENAME") -SIGDATA = _define_plain( 24, "SIGDATA") -SIGREF = _define_ref( 25, "SIGREF") -GENREF = _define_ref( 26, "GENREF") -OBJREF = _define_ref( 28, "OBJREF") -BLOCKREF = _define_ref( 29, "BLOCKREF") -MAPREF = _define_ref( 30, "MAPREF") -FILEPARTREF = _define_ref( 31, "FILEPARTREF") -FORMATVERSION = _define_plain( 32, "FORMATVERSION") -FILE = _define_composite( 33, "FILE") -FILELISTREF = _define_ref( 34, "FILELISTREF") -CONTMAPREF = _define_ref( 35, "CONTMAPREF") -DELTAREF = _define_ref( 36, "DELTAREF") -DELTADATA = _define_plain( 37, "DELTADATA") -STAT = _define_plain( 38, "STAT") -GENSTART = _define_plain( 39, "GENSTART") -GENEND = _define_plain( 40, "GENEND") -DELTAPARTREF = _define_ref( 41, "DELTAPARTREF") -DIRREF = _define_ref( 42, "DIRREF") -FILEGROUPREF = _define_ref( 43, "FILEGROUPREF") - - -def kind_name(kind): - """Return a textual name for a numeric component kind""" - if kind in _component_kinds: - return _component_kinds[kind][1] - else: - return "UNKNOWN" - - -def kind_is_composite(kind): - """Is a kind supposed to be composite?""" - if kind in _component_kinds: - return _component_kinds[kind][0] - else: - return False - - -def kind_is_reference(kind): - """Is a kind a reference to an object?""" - if kind & REF_FLAG: - return True - else: - return False - - -class Component: - - def __init__(self, kind, value): - self.kind = kind - assert type(value) in [type(""), type([])], \ - "Value type is %s instead of string or list" % type(value) - if type(value) == type(""): - self.str = value - self.subcomponents = [] - else: - self.str = None - for x in value: - assert isinstance(x, Component) - self.subcomponents = value[:] - - def get_kind(self): - """Return kind kind of a component""" - return self.kind - - def get_string_value(self): - """Return string value of leaf component""" - assert self.str is not None - return self.str - - def get_varint_value(self): - """Return integer value of leaf component""" - assert self.str is not None - return obnam.varint.decode(self.str, 0)[0] - - def get_subcomponents(self): - """Return list of subcomponents of composite component""" - assert self.str is None - return self.subcomponents - - def is_composite(self): - """Is a component a leaf component or a composite one?""" - return self.str is None - - def find_by_kind(self, wanted_kind): - """Find subcomponents of a desired kind""" - return [c for c in self.subcomponents if c.get_kind() == wanted_kind] - - def first_by_kind(self, wanted_kind): - """Find first subcomponent of a desired kind""" - for c in self.subcomponents: - if c.get_kind() == wanted_kind: - return c - return None - - def find_strings_by_kind(self, wanted_kind): - """Find subcomponents by kind, return their string values""" - return [c.get_string_value() - for c in find_by_kind(self.subcomponents, wanted_kind)] - - def first_string_by_kind(self, wanted_kind): - """Find first subcomponent by kind, return its string value""" - c = self.first_by_kind(wanted_kind) - if c: - return c.get_string_value() - else: - return None - - def first_varint_by_kind(self, wanted_kind): - """Find first subcomponent by kind, return its integer value""" - c = self.first_by_kind(wanted_kind) - if c: - return c.get_varint_value() - else: - return None - - def encode(self): - """Encode a component as a string""" - if self.is_composite(): - snippets = [] - for sub in self.get_subcomponents(): - snippets.append(sub.encode()) - encoded = "".join(snippets) - else: - encoded = self.str - return "%s%s%s" % (obnam.varint.encode(len(encoded)), - obnam.varint.encode(self.kind), - encoded) - - -class Parser: - - def __init__(self, encoded, pos=0, end=None): - self.encoded = encoded - self.pos = pos - if end is None or end > len(encoded): - self.end = len(encoded) - else: - self.end = end - - def decode(self): - """Parse one component, and its value if type is composite""" - if self.pos >= self.end: - return None - - size, self.pos = obnam.varint.decode(self.encoded, self.pos) - kind, self.pos = obnam.varint.decode(self.encoded, self.pos) - - if kind_is_composite(kind): - parser = Parser(self.encoded, self.pos, self.pos + size) - value = parser.decode_all() - else: - value = self.encoded[self.pos:self.pos + size] - - self.pos += size - - return Component(kind, value) - - def decode_all(self): - """Decode all remaining components and values""" - list = [] - while True: - c = self.decode() - if c is None: - break - list.append(c) - return list - - -def find_by_kind(components, wanted_kind): - """Find components of a desired kind in a list of components""" - return [c for c in components if c.get_kind() == wanted_kind] - - -def first_by_kind(components, wanted_kind): - """Find first component of a desired kind in a list of components""" - for c in components: - if c.get_kind() == wanted_kind: - return c - return None - - -def find_strings_by_kind(components, wanted_kind): - """Find components by kind, return their string values""" - return [c.get_string_value() - for c in find_by_kind(components, wanted_kind)] - - -def first_string_by_kind(components, wanted_kind): - """Find first component by kind, return its string value""" - c = first_by_kind(components, wanted_kind) - if c: - return c.get_string_value() - else: - return None - - -def first_varint_by_kind(components, wanted_kind): - """Find first component by kind, return its integer value""" - c = first_by_kind(components, wanted_kind) - if c: - return c.get_varint_value() - else: - return None - - -def create_stat_component(st): - """Create a STAT component, given a stat result""" - return Component(obnam.cmp.STAT, - obnam.varint.encode(st.st_mode) + - obnam.varint.encode(st.st_ino) + - obnam.varint.encode(st.st_dev) + - obnam.varint.encode(st.st_nlink) + - obnam.varint.encode(st.st_uid) + - obnam.varint.encode(st.st_gid) + - obnam.varint.encode(st.st_size) + - obnam.varint.encode(st.st_atime) + - obnam.varint.encode(st.st_mtime) + - obnam.varint.encode(st.st_ctime) + - obnam.varint.encode(st.st_blocks) + - obnam.varint.encode(st.st_blksize) + - obnam.varint.encode(st.st_rdev)) - - -def parse_stat_component(stat_component): - """Return an object like a stat result from a decoded stat_component""" - value = stat_component.get_string_value() - pos = 0 - st_mode, pos = obnam.varint.decode(value, pos) - st_ino, pos = obnam.varint.decode(value, pos) - st_dev, pos = obnam.varint.decode(value, pos) - st_nlink, pos = obnam.varint.decode(value, pos) - st_uid, pos = obnam.varint.decode(value, pos) - st_gid, pos = obnam.varint.decode(value, pos) - st_size, pos = obnam.varint.decode(value, pos) - st_atime, pos = obnam.varint.decode(value, pos) - st_mtime, pos = obnam.varint.decode(value, pos) - st_ctime, pos = obnam.varint.decode(value, pos) - st_blocks, pos = obnam.varint.decode(value, pos) - st_blksize, pos = obnam.varint.decode(value, pos) - st_rdev, pos = obnam.varint.decode(value, pos) - return obnam.utils.make_stat_result(st_mode=st_mode, - st_ino=st_ino, - st_dev=st_dev, - st_nlink=st_nlink, - st_uid=st_uid, - st_gid=st_gid, - st_size=st_size, - st_atime=st_atime, - st_mtime=st_mtime, - st_ctime=st_ctime, - st_blocks=st_blocks, - st_blksize=st_blksize, - st_rdev=st_rdev) diff --git a/obnam/cmpTests.py b/obnam/cmpTests.py deleted file mode 100644 index 245b1349..00000000 --- a/obnam/cmpTests.py +++ /dev/null @@ -1,377 +0,0 @@ -# Copyright (C) 2006, 2007, 2008 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Unit tests for obnam.cmp.""" - - -import os -import unittest - - -import obnam - - -class ComponentKindNameTests(unittest.TestCase): - - def test(self): - t = obnam.cmp.kind_name - c = obnam.cmp - self.failUnlessEqual(t(-12765), "UNKNOWN") - - names = ( - "OBJID", - "OBJKIND", - "BLKID", - "FILECHUNK", - "OBJECT", - "OBJMAP", - "CONTREF", - "NAMEIPAIR", - "FILENAME", - "SIGDATA", - "SIGREF", - "GENREF", - "OBJREF", - "BLOCKREF", - "MAPREF", - "FILEPARTREF", - "FORMATVERSION", - "FILE", - "FILELISTREF", - "CONTMAPREF", - "DELTAREF", - "DELTADATA", - "STAT", - "GENSTART", - "GENEND", - "DELTAPARTREF", - "DIRREF", - "FILEGROUPREF", - ) - - for name in names: - self.failUnlessEqual(t(getattr(c, name)), name) - - -class RefComponentTests(unittest.TestCase): - - def test(self): - kinds = obnam.cmp._component_kinds - for kind in kinds: - self.failUnlessEqual(kinds[kind][1].endswith("REF"), - obnam.cmp.kind_is_reference(kind)) - - -class CreateComponentTests(unittest.TestCase): - - def testCreateLeaf(self): - c = obnam.cmp.Component(1, "pink") - self.failIfEqual(c, None) - self.failUnlessEqual(c.get_kind(), 1) - self.failUnlessEqual(c.get_string_value(), "pink") - self.failUnlessEqual(c.is_composite(), False) - - def testCreateComposite(self): - leaf1 = obnam.cmp.Component(1, "pink") - leaf2 = obnam.cmp.Component(2, "pretty") - c = obnam.cmp.Component(3, [leaf1, leaf2]) - self.failUnlessEqual(c.get_kind(), 3) - self.failUnlessEqual(c.is_composite(), True) - self.failUnlessEqual(c.get_subcomponents(), [leaf1, leaf2]) - - -class ComponentParserTest(unittest.TestCase): - - def testDecodeEmptyString(self): - parser = obnam.cmp.Parser("") - self.failUnlessEqual(parser.decode(), None) - - def testDecodePlainComponent(self): - c = obnam.cmp.Component(obnam.cmp.OBJID, "pink") - encoded = c.encode() - parser = obnam.cmp.Parser(encoded) - c2 = parser.decode() - self.failUnlessEqual(parser.pos, len(encoded)) - self.failUnlessEqual(encoded, c2.encode()) - - def testDecodeCompositeComponent(self): - subs = [obnam.cmp.Component(obnam.cmp.OBJID, str(i)) - for i in range(100)] - c = obnam.cmp.Component(obnam.cmp.OBJECT, subs) - encoded = c.encode() - parser = obnam.cmp.Parser(encoded) - c2 = parser.decode() - self.failUnlessEqual(parser.pos, len(encoded)) - self.failUnlessEqual(encoded, c2.encode()) - - def testDecodeAllEmptyString(self): - parser = obnam.cmp.Parser("") - self.failUnlessEqual(parser.decode_all(), []) - - def testDecodeAllPlainComponents(self): - list = [obnam.cmp.Component(obnam.cmp.OBJID, str(i)) - for i in range(100)] - encoded = "".join(c.encode() for c in list) - - parser = obnam.cmp.Parser(encoded) - list2 = parser.decode_all() - self.failUnlessEqual(parser.pos, len(encoded)) - - encoded2 = "".join(c.encode() for c in list2) - self.failUnlessEqual(encoded, encoded2) - - -class ComponentDecodeAllTests(unittest.TestCase): - - def remove_component(self, list, kind, value): - self.failUnlessEqual(list[0].get_kind(), kind) - self.failUnlessEqual(list[0].get_string_value(), value) - del list[0] - - def testDecodeAll(self): - c1 = obnam.cmp.Component(1, "pink") - c2 = obnam.cmp.Component(2, "pretty") - e1 = c1.encode() - e2 = c2.encode() - e = e1 + e2 - list = obnam.cmp.Parser(e).decode_all() - self.remove_component(list, 1, "pink") - self.remove_component(list, 2, "pretty") - self.failUnlessEqual(list, []) - - -class ComponentFindTests(unittest.TestCase): - - def setUp(self): - list = [(1, "pink"), (2, "pretty"), (3, "black"), (3, "box")] - list += [(4, obnam.varint.encode(4))] - list = [obnam.cmp.Component(a, b) for a, b in list] - self.c = obnam.cmp.Component(42, list) - - def match(self, result, kind, value): - self.failUnless(len(result) > 0) - c = result[0] - self.failUnlessEqual(c.get_kind(), kind) - self.failUnlessEqual(c.get_string_value(), value) - del result[0] - - def testFindAllOnes(self): - result = self.c.find_by_kind(1) - self.match(result, 1, "pink") - self.failUnlessEqual(result, []) - - def testFindAllTwos(self): - result = self.c.find_by_kind(2) - self.match(result, 2, "pretty") - self.failUnlessEqual(result, []) - - def testFindAllThrees(self): - result = self.c.find_by_kind(3) - self.match(result, 3, "black") - self.match(result, 3, "box") - self.failUnlessEqual(result, []) - - def testFindAllNones(self): - result = self.c.find_by_kind(0) - self.failUnlessEqual(result, []) - - def testFindFirstOne(self): - result = [self.c.first_by_kind(1)] - self.match(result, 1, "pink") - self.failUnlessEqual(result, []) - - def testFindFirstTwo(self): - result = [self.c.first_by_kind(2)] - self.match(result, 2, "pretty") - self.failUnlessEqual(result, []) - - def testFindFirstThree(self): - result = [self.c.first_by_kind(3)] - self.match(result, 3, "black") - self.failUnlessEqual(result, []) - - def testFindFirstNone(self): - result = self.c.first_by_kind(0) - self.failUnlessEqual(result, None) - - def testFindAllStringOnes(self): - result = self.c.find_strings_by_kind(1) - self.failUnlessEqual(result, ["pink"]) - - def testFindAllStringTwos(self): - result = self.c.find_strings_by_kind(2) - self.failUnlessEqual(result, ["pretty"]) - - def testFindAllStringThrees(self): - result = self.c.find_strings_by_kind(3) - self.failUnlessEqual(result, ["black", "box"]) - - def testFindAllStringNones(self): - result = self.c.find_strings_by_kind(0) - self.failUnlessEqual(result, []) - - def testFindFirstStringOne(self): - result = self.c.first_string_by_kind(1) - self.failUnlessEqual(result, "pink") - - def testFindFirstStringTwo(self): - result = self.c.first_string_by_kind(2) - self.failUnlessEqual(result, "pretty") - - def testFindFirstStringThree(self): - result = self.c.first_string_by_kind(3) - self.failUnlessEqual(result, "black") - - def testFindFirstStringNone(self): - result = self.c.first_string_by_kind(0) - self.failUnlessEqual(result, None) - - def testFindFirstVarintByKind(self): - result = self.c.first_varint_by_kind(4) - self.failUnlessEqual(result, 4) - - def testFindFirstVarintByKindWhenMissing(self): - result = self.c.first_varint_by_kind(0) - self.failUnlessEqual(result, None) - - -class FindTests(unittest.TestCase): - - def setUp(self): - self.list = [(1, "pink"), (2, "pretty"), (3, "black"), (3, "box")] - self.list = [obnam.cmp.Component(a, b) for a, b in self.list] - - def tearDown(self): - del self.list - - def match(self, result, kind, value): - self.failUnless(len(result) > 0) - c = result[0] - self.failUnlessEqual(c.get_kind(), kind) - self.failUnlessEqual(c.get_string_value(), value) - del result[0] - - def testFindAllOnes(self): - result = obnam.cmp.find_by_kind(self.list, 1) - self.match(result, 1, "pink") - self.failUnlessEqual(result, []) - - def testFindAllTwos(self): - result = obnam.cmp.find_by_kind(self.list, 2) - self.match(result, 2, "pretty") - self.failUnlessEqual(result, []) - - def testFindAllThrees(self): - result = obnam.cmp.find_by_kind(self.list, 3) - self.match(result, 3, "black") - self.match(result, 3, "box") - self.failUnlessEqual(result, []) - - def testFindAllNones(self): - result = obnam.cmp.find_by_kind(self.list, 0) - self.failUnlessEqual(result, []) - - def testFindFirstOne(self): - result = [obnam.cmp.first_by_kind(self.list, 1)] - self.match(result, 1, "pink") - self.failUnlessEqual(result, []) - - def testFindFirstTwo(self): - result = [obnam.cmp.first_by_kind(self.list, 2)] - self.match(result, 2, "pretty") - self.failUnlessEqual(result, []) - - def testFindFirstThree(self): - result = [obnam.cmp.first_by_kind(self.list, 3)] - self.match(result, 3, "black") - self.failUnlessEqual(result, []) - - def testFindFirstNone(self): - result = obnam.cmp.first_by_kind(self.list, 0) - self.failUnlessEqual(result, None) - - def testFindAllStringOnes(self): - result = obnam.cmp.find_strings_by_kind(self.list, 1) - self.failUnlessEqual(result, ["pink"]) - - def testFindAllStringTwos(self): - result = obnam.cmp.find_strings_by_kind(self.list, 2) - self.failUnlessEqual(result, ["pretty"]) - - def testFindAllStringThrees(self): - result = obnam.cmp.find_strings_by_kind(self.list, 3) - self.failUnlessEqual(result, ["black", "box"]) - - def testFindAllStringNones(self): - result = obnam.cmp.find_strings_by_kind(self.list, 0) - self.failUnlessEqual(result, []) - - def testFindFirstStringOne(self): - result = obnam.cmp.first_string_by_kind(self.list, 1) - self.failUnlessEqual(result, "pink") - - def testFindFirstStringTwo(self): - result = obnam.cmp.first_string_by_kind(self.list, 2) - self.failUnlessEqual(result, "pretty") - - def testFindFirstStringThree(self): - result = obnam.cmp.first_string_by_kind(self.list, 3) - self.failUnlessEqual(result, "black") - - def testFindFirstStringNone(self): - result = obnam.cmp.first_string_by_kind(self.list, 0) - self.failUnlessEqual(result, None) - - -class GetVarintVAlueTest(unittest.TestCase): - - def test(self): - c = obnam.cmp.Component(1, obnam.varint.encode(12765)) - self.failUnlessEqual(c.get_varint_value(), 12765) - - -class FindVarintTests(unittest.TestCase): - - def test(self): - values = range(0, 1024, 17) - - list = [] - for i in values: - encoded = obnam.varint.encode(i) - c = obnam.cmp.Component(i, encoded) - list.append(c) - - for i in values: - self.failUnlessEqual(obnam.cmp.first_varint_by_kind(list, i), i) - self.failUnlessEqual(obnam.cmp.first_varint_by_kind(list, -1), None) - - -class StatTests(unittest.TestCase): - - def testEncodeDecode(self): - st1 = os.stat("Makefile") - stat = obnam.cmp.create_stat_component(st1) - st2 = obnam.cmp.parse_stat_component(stat) - - names1 = [x for x in dir(st1) if x.startswith("st_")] - names2 = [x for x in dir(st2) if x.startswith("st_")] - names1.sort() - names2.sort() - self.failUnlessEqual(names1, names2) - for name in names1: - self.failUnlessEqual(st1.__getattribute__(name), - st2.__getattribute__(name)) diff --git a/obnam/config.py b/obnam/config.py deleted file mode 100644 index 4bfd3ab5..00000000 --- a/obnam/config.py +++ /dev/null @@ -1,302 +0,0 @@ -# Copyright (C) 2006 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Obnam configuration and option handling""" - - -import optparse -import os -import pwd -import socket -import sys - -import obnam.defaultconfig - - -def default_config(): - """Return a obnam.cfgfile.ConfigFile with the default builtin config""" - config = obnam.cfgfile.ConfigFile() - for section, item, value in obnam.defaultconfig.items: - if not config.has_section(section): - config.add_section(section) - config.set(section, item, value) - - if config.get("backup", "host-id") == "": - config.set("backup", "host-id", socket.gethostname()) - - return config - - -def build_parser(): - """Create command line parser""" - parser = optparse.OptionParser(version="%s %s" % - (obnam.NAME, obnam.VERSION)) - - parser.add_option("--host-id", - metavar="ID", - help="use ID to identify this host") - - parser.add_option("--block-size", - type="int", - metavar="SIZE", - help="make blocks that are about SIZE kilobytes") - - parser.add_option("--cache", - metavar="DIR", - help="store cached blocks in DIR") - - parser.add_option("--store", - metavar="DIR", - help="use DIR for local block storage (not caching)") - - parser.add_option("--target", "-C", - metavar="DIR", - help="resolve filenames relative to DIR") - - parser.add_option("--object-cache-size", - metavar="COUNT", - help="set object cache maximum size to COUNT objects" + - " (default depends on block size") - - parser.add_option("--log-file", - metavar="FILE", - help="append log messages to FILE") - - parser.add_option("--log-level", - metavar="LEVEL", - help="set log level to LEVEL, one of debug, info, " + - "warning, error, critical (default is warning)") - - parser.add_option("--ssh-key", - metavar="FILE", - help="read ssh private key from FILE (and public key " + - "from FILE.pub)") - - parser.add_option("--odirect-read", - metavar="PROGRAM", - help="use PROGRAM to read contents of plain files " + - "(default is helper that avoids buffer cache)") - - parser.add_option("--odirect-pipe", - metavar="PROGRAM", - help="use PROGRAM as the odirect_pipe program " + - "(default is helper that avoids buffer cache)") - - parser.add_option("--gpg-home", - metavar="DIR", - help="use DIR as the location for GnuPG keyrings and " + - "other data files") - - parser.add_option("--gpg-encrypt-to", - metavar="KEYID", - action="append", - help="add KEYID to list of keys to use for encryption") - - parser.add_option("--gpg-sign-with", - metavar="KEYID", - help="sign backups with KEYID") - - parser.add_option("--no-gpg", action="store_true", - help="don't use gpg at all") - - parser.add_option("--exclude", - metavar="REGEXP", - action="append", - help="exclude pathnames matching REGEXP") - - parser.add_option("--progress", - dest="report_progress", - action="store_true", default=False, - help="report progress when backups are made") - - parser.add_option("--generation-times", - action="store_true", default=False, - help="show generation start/end times " + - "with the 'generations' command") - - parser.add_option("--no-configs", - action="store_true", default=False, - help="don't read any configuration files not " + - "explicitly named with --config") - - parser.add_option("--config", - dest="configs", - action="append", - metavar="FILE", - help="also read FILE when reading configuration files") - - return parser - - -# For unit testing purposes. - -_config_file_log = [] -def remember_config_file(pathname): _config_file_log.append(pathname) -def forget_config_file_log(): del _config_file_log[:] -def get_config_file_log(): return _config_file_log[:] - - -def read_config_file(config, filename): - """Read a config file, if it exists""" - if os.path.exists(filename): - f = file(filename) - config.readfp(f, filename) - f.close() - remember_config_file(filename) - - -def parse_options(config, argv): - """Parse command line arguments and set config values accordingly - - This also reads all the default configuration files at the opportune - moment. - - """ - - parser = build_parser() - (options, args) = parser.parse_args(argv) - - paths = [] - if not options.no_configs: - paths += get_default_paths() - if options.configs: - paths += options.configs - - for filename in paths: - read_config_file(config, filename) - - if options.host_id is not None: - config.set("backup", "host-id", options.host_id) - if options.block_size is not None: - config.set("backup", "block-size", "%d" % options.block_size) - if options.cache is not None: - config.set("backup", "cache", options.cache) - if options.store is not None: - config.set("backup", "store", options.store) - if options.target is not None: - config.set("backup", "target-dir", options.target) - if options.object_cache_size is not None: - config.set("backup", "object-cache-size", options.object_cache_size) - if options.log_file is not None: - config.set("backup", "log-file", options.log_file) - if options.log_level is not None: - config.set("backup", "log-level", options.log_level) - if options.ssh_key is not None: - config.set("backup", "ssh-key", options.ssh_key) - if options.odirect_read is not None: - config.set("backup", "odirect-read", options.odirect_read) - if options.odirect_pipe is not None: - config.set("backup", "odirect-pipe", options.odirect_pipe) - if options.gpg_home is not None: - config.set("backup", "gpg-home", options.gpg_home) - if options.gpg_encrypt_to is not None: - config.remove_option("backup", "gpg-encrypt-to") - for keyid in options.gpg_encrypt_to: - config.append("backup", "gpg-encrypt-to", keyid) - if options.gpg_sign_with is not None: - config.set("backup", "gpg-sign-with", options.gpg_sign_with) - if options.no_gpg is True: - config.set("backup", "no-gpg", "true") - if options.exclude is not None: - config.remove_option("backup", "exclude") - for pattern in options.exclude: - config.append("backup", "exclude", pattern) - if options.report_progress: - config.set("backup", "report-progress", "true") - else: - config.set("backup", "report-progress", "false") - if options.generation_times: - config.set("backup", "generation-times", "true") - else: - config.set("backup", "generation-times", "false") - - return args - - -def print_option_names(f=sys.stdout): - """Write to stdout a list of option names""" - # Note that this is ugly, since it uses undocumented underscored - # attributes, but it's the only way I could find to make it work. - parser = build_parser() - for option in parser.option_list: - for name in option._short_opts + option._long_opts: - f.write("%s\n" % name) - - -def write_defaultconfig(config, output=sys.stdout): - """Write to stdout a new defaultconfig.py, using values from config""" - - items = [] - for section in config.sections(): - for key in config.options(section): - items.append(' ("%s", "%s", "%s"),' % - (section, key, config.get(section, key))) - - output.write("import socket\nitems = (\n%s\n)\n""" % "\n".join(items)) - - -# Allow unit tests to override default path list. - -_default_paths = None -if "default_paths" in dir(obnam.defaultconfig): - _default_paths = obnam.defaultconfig.default_paths - -def set_default_paths(default_paths): - global _default_paths - _default_paths = default_paths - - -def get_default_paths(): - """Return list of paths to look for config files""" - - if _default_paths is not None: - return _default_paths - - list = [] - - list.append("/usr/share/obnam/obnam.conf") - - if get_uid() == 0: - list.append("/etc/obnam/obnam.conf") - else: - list.append(os.path.join(get_home(), ".obnam", "obnam.conf")) - - return list - - -# We use a little wrapper layer around the os.* stuff to allow unit tests -# to override things. - -_uid = None -_home = None - -def get_uid(): - if _uid is None: - return os.getuid() - else: - return _uid - -def get_home(): - if _home is None: - return pwd.getpwuid(get_uid()).pw_dir - else: - return _home - -def set_uid_and_home(uid, home): - global _uid, _home - _uid = uid - _home = home diff --git a/obnam/configTests.py b/obnam/configTests.py deleted file mode 100644 index 567b902a..00000000 --- a/obnam/configTests.py +++ /dev/null @@ -1,301 +0,0 @@ -# Copyright (C) 2006 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Unit tests for obnam.config.""" - - -import os -import shutil -import StringIO -import unittest - - -import obnam - - -class CommandLineParsingTests(unittest.TestCase): - - def setUp(self): - obnam.config.set_default_paths([]) - - def tearDown(self): - obnam.config.set_default_paths(None) - - def config_as_string(self, config): - f = StringIO.StringIO() - config.write(f) - return f.getvalue() - - def testDefaultConfig(self): - config = obnam.config.default_config() - self.failUnless(config.has_section("backup")) - needed = ["block-size", "cache", "store", "target-dir", - "host-id", "object-cache-size", "log-level", "ssh-key", - "odirect-read", "log-file", "gpg-home", "gpg-encrypt-to", - "gpg-sign-with", "no-gpg", "exclude", "odirect-pipe", - "report-progress", "generation-times"] - needed.sort() - actual = config.options("backup") - actual.sort() - self.failUnlessEqual(actual, needed) - - def testEmpty(self): - config = obnam.config.default_config() - obnam.config.parse_options(config, []) - self.failUnlessEqual(self.config_as_string(config), - self.config_as_string(obnam.config.default_config())) - - def testHostId(self): - config = obnam.config.default_config() - obnam.config.parse_options(config, ["--host-id=pink"]) - self.failUnlessEqual(config.get("backup", "host-id"), "pink") - - def testBlockSize(self): - config = obnam.config.default_config() - obnam.config.parse_options(config, ["--block-size=12765"]) - self.failUnlessEqual(config.getint("backup", "block-size"), 12765) - obnam.config.parse_options(config, ["--block-size=42"]) - self.failUnlessEqual(config.getint("backup", "block-size"), 42) - - def testCacheDir(self): - config = obnam.config.default_config() - obnam.config.parse_options(config, ["--cache=/tmp/foo"]) - self.failUnlessEqual(config.get("backup", "cache"), "/tmp/foo") - - def testLocalStore(self): - config = obnam.config.default_config() - obnam.config.parse_options(config, ["--store=/tmp/foo"]) - self.failUnlessEqual(config.get("backup", "store"), "/tmp/foo") - - def testTargetDir(self): - config = obnam.config.default_config() - obnam.config.parse_options(config, ["--target=/tmp/foo"]) - self.failUnlessEqual(config.get("backup", "target-dir"), "/tmp/foo") - - def testObjectCacheSize(self): - config = obnam.config.default_config() - obnam.config.parse_options(config, ["--object-cache-size=42"]) - self.failUnlessEqual(config.get("backup", "object-cache-size"), "42") - - def testOdirectRead(self): - config = obnam.config.default_config() - obnam.config.parse_options(config, ["--odirect-read=x"]) - self.failUnlessEqual(config.get("backup", "odirect-read"), "x") - - def testOdirectPipe(self): - config = obnam.config.default_config() - obnam.config.parse_options(config, ["--odirect-pipe=x"]) - self.failUnlessEqual(config.get("backup", "odirect-pipe"), "x") - - def testLogFile(self): - config = obnam.config.default_config() - obnam.config.parse_options(config, ["--log-file=x"]) - self.failUnlessEqual(config.get("backup", "log-file"), "x") - - def testLogLevel(self): - config = obnam.config.default_config() - obnam.config.parse_options(config, ["--log-level=info"]) - self.failUnlessEqual(config.get("backup", "log-level"), "info") - - def testSshKey(self): - config = obnam.config.default_config() - obnam.config.parse_options(config, ["--ssh-key=foo"]) - self.failUnlessEqual(config.get("backup", "ssh-key"), "foo") - - def testGpgHome(self): - config = obnam.config.default_config() - obnam.config.parse_options(config, ["--gpg-home=foo"]) - self.failUnlessEqual(config.get("backup", "gpg-home"), "foo") - - def testGpgEncryptTo(self): - config = obnam.config.default_config() - obnam.config.parse_options(config, ["--gpg-encrypt-to=foo"]) - self.failUnlessEqual(config.get("backup", "gpg-encrypt-to"), "foo") - - def testGpgSignWith(self): - config = obnam.config.default_config() - obnam.config.parse_options(config, ["--gpg-sign-with=foo"]) - self.failUnlessEqual(config.get("backup", "gpg-sign-with"), "foo") - - def testNoGpgIsUnset(self): - config = obnam.config.default_config() - obnam.config.parse_options(config, []) - self.failUnlessEqual(config.get("backup", "no-gpg"), "false") - - def testNoGpgIsUnsetButDefaultIsTrue(self): - config = obnam.config.default_config() - config.set("backup", "no-gpg", "true") - obnam.config.parse_options(config, []) - self.failUnlessEqual(config.get("backup", "no-gpg"), "true") - - def testNoGpgIsSet(self): - config = obnam.config.default_config() - obnam.config.parse_options(config, ["--no-gpg"]) - self.failUnlessEqual(config.get("backup", "no-gpg"), "true") - - def testGenerationTimes(self): - config = obnam.config.default_config() - obnam.config.parse_options(config, ["--generation-times"]) - self.failUnlessEqual(config.get("backup", "generation-times"), "true") - - def testExclude(self): - config = obnam.config.default_config() - obnam.config.parse_options(config, ["--exclude=foo"]) - self.failUnlessEqual(config.get("backup", "exclude"), "foo") - - def testReportProgress(self): - config = obnam.config.default_config() - self.failIf(config.getboolean("backup", "report-progress")) - obnam.config.parse_options(config, ["--progress"]) - self.failUnless(config.getboolean("backup", "report-progress")) - - def testNoConfigs(self): - parser = obnam.config.build_parser() - options, args = parser.parse_args([]) - self.failUnlessEqual(options.no_configs, False) - options, args = parser.parse_args(["--no-configs"]) - self.failUnlessEqual(options.no_configs, True) - - def testConfig(self): - parser = obnam.config.build_parser() - options, args = parser.parse_args([]) - self.failUnlessEqual(options.configs, None) - options, args = parser.parse_args(["--config=pink"]) - self.failUnlessEqual(options.configs, ["pink"]) - - -class ConfigReadingOptionsTests(unittest.TestCase): - - names = ["tmp.1.conf", "tmp.2.conf", "tmp.3.conf"] - - def setUp(self): - obnam.config.forget_config_file_log() - for name in self.names: - f = file(name, "w") - f.write("[backup]\nblock-size = 1024\n") - f.close() - obnam.config.set_default_paths(self.names) - - def tearDown(self): - obnam.config.set_default_paths(None) - for name in self.names: - if os.path.exists(name): - os.remove(name) - - def testNoDefaults(self): - obnam.config.set_default_paths([]) - config = obnam.config.default_config() - obnam.config.parse_options(config, []) - self.failUnlessEqual(obnam.config.get_config_file_log(), []) - - def testDefaults(self): - config = obnam.config.default_config() - obnam.config.parse_options(config, []) - self.failUnlessEqual(obnam.config.get_config_file_log(), self.names) - - def testNoConfigsOption(self): - config = obnam.config.default_config() - obnam.config.parse_options(config, ["--no-configs"]) - self.failUnlessEqual(obnam.config.get_config_file_log(), []) - - def testNoConfigsOptionPlusConfigOption(self): - config = obnam.config.default_config() - obnam.config.parse_options(config, ["--no-configs"] + - ["--config=%s" % x for x in self.names]) - self.failUnlessEqual(obnam.config.get_config_file_log(), self.names) - - def testDefaultsPlusConfigOption(self): - config = obnam.config.default_config() - obnam.config.parse_options(config, ["--config=/dev/null"]) - self.failUnlessEqual(obnam.config.get_config_file_log(), - self.names + ["/dev/null"]) - - -class ConfigFileReadingTests(unittest.TestCase): - - def setUp(self): - self.filename = "unittest.conf" - f = file(self.filename, "w") - f.write("""\ -[backup] -store = pink -cache = pretty -""") - f.close() - - def tearDown(self): - os.remove(self.filename) - - def testReadConfigFile(self): - config = obnam.config.default_config() - obnam.config.read_config_file(config, self.filename) - self.failUnlessEqual(config.get("backup", "store"), "pink") - self.failUnlessEqual(config.get("backup", "cache"), "pretty") - - def testDefaultConfigsForRoot(self): - config = obnam.config.default_config() - obnam.config.set_uid_and_home(0, "/root") - configs = obnam.config.get_default_paths() - self.failUnlessEqual(configs, - ["/usr/share/obnam/obnam.conf", - "/etc/obnam/obnam.conf"]) - - def testDefaultConfigsForUser(self): - config = obnam.config.default_config() - obnam.config.set_uid_and_home(12765, "/home/pretty") - configs = obnam.config.get_default_paths() - self.failUnlessEqual(configs, - ["/usr/share/obnam/obnam.conf", - "/home/pretty/.obnam/obnam.conf"]) - - -class PrintOptionsTests(unittest.TestCase): - - def test(self): - f = StringIO.StringIO() - obnam.config.print_option_names(f=f) - self.failIfEqual(f.getvalue(), "") - - -class WriteDefaultConfigTests(unittest.TestCase): - - def test(self): - config = obnam.config.default_config() - f = StringIO.StringIO() - obnam.config.write_defaultconfig(config, output=f) - s = f.getvalue() - self.failUnless(s.startswith("import socket")) - self.failUnless("\nitems =" in s) - - -class GetUidAndHomeTests(unittest.TestCase): - - def testGetUid(self): - obnam.config.set_uid_and_home(None, None) - self.failIfEqual(obnam.config.get_uid(), None) - - def testGetHome(self): - obnam.config.set_uid_and_home(None, None) - self.failIfEqual(obnam.config.get_home(), None) - - def testGetUidFaked(self): - obnam.config.set_uid_and_home(42, "pretty") - self.failUnlessEqual(obnam.config.get_uid(), 42) - - def testGetHomeFaked(self): - obnam.config.set_uid_and_home(42, "pink") - self.failUnlessEqual(obnam.config.get_home(), "pink") diff --git a/obnam/context.py b/obnam/context.py deleted file mode 100644 index b82ee4fe..00000000 --- a/obnam/context.py +++ /dev/null @@ -1,37 +0,0 @@ -# Copyright (C) 2006 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Processing context for Obnam""" - - -import obnam.config -import obnam.map -import obnam.obj - - -class Context: - - def __init__(self): - self.config = obnam.config.default_config() - self.cache = None - self.be = None - self.map = obnam.map.create() - self.contmap = obnam.map.create() - self.oq = obnam.obj.ObjectQueue() - self.content_oq = obnam.obj.ObjectQueue() - self.progress = obnam.progress.ProgressReporter(self.config) - self.object_cache = None diff --git a/obnam/contextTests.py b/obnam/contextTests.py deleted file mode 100644 index bd85535a..00000000 --- a/obnam/contextTests.py +++ /dev/null @@ -1,42 +0,0 @@ -# Copyright (C) 2006 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Unit tests for obnam.context.""" - - -import unittest - - -import obnam - - -class ContextCreateTests(unittest.TestCase): - - def test(self): - context = obnam.context.Context() - attrs = [x for x in dir(context) if not x.startswith("_")] - attrs.sort() - self.failUnlessEqual(attrs, - ["be", "cache", "config", "content_oq", "contmap", "map", "object_cache", - "oq", "progress"]) - self.failUnlessEqual(context.be, None) - self.failUnlessEqual(context.cache, None) - self.failIfEqual(context.config, None) - self.failIfEqual(context.map, None) - self.failIfEqual(context.oq, None) - self.failIfEqual(context.content_oq, None) - self.failUnlessEqual(context.object_cache, None) diff --git a/obnam/defaultconfig.py b/obnam/defaultconfig.py deleted file mode 100644 index 63a5c401..00000000 --- a/obnam/defaultconfig.py +++ /dev/null @@ -1,26 +0,0 @@ -# This module has the default config values. It will be overwritten by -# "make install". The values here are suitable for development purposes, -# but not for real use. - -items = ( - ("backup", "host-id", ""), - ("backup", "block-size", "%d" % (1024 * 1024)), - ("backup", "cache", "tmp.cache"), - ("backup", "store", "tmp.store"), - ("backup", "ssh-key", "ssh-key"), - ("backup", "target-dir", "."), - ("backup", "object-cache-size", "0"), - ("backup", "log-file", ""), - ("backup", "log-level", "warning"), - ("backup", "odirect-read", "./odirect_read"), - ("backup", "odirect-pipe", "./odirect_pipe"), - ("backup", "gpg-home", "sample-gpg-home"), - ("backup", "gpg-encrypt-to", "490C9ED1"), - ("backup", "gpg-sign-with", "490C9ED1"), - ("backup", "no-gpg", "false"), - ("backup", "exclude", ""), - ("backup", "report-progress", "false"), - ("backup", "generation-times", "false"), -) - -default_paths = [] diff --git a/obnam/exception.py b/obnam/exception.py deleted file mode 100644 index 130378dd..00000000 --- a/obnam/exception.py +++ /dev/null @@ -1,26 +0,0 @@ -# Copyright (C) 2006 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Obnam exception""" - - -class ObnamException(Exception): - - def __str__(self): - return self._msg - - diff --git a/obnam/exceptionTests.py b/obnam/exceptionTests.py deleted file mode 100644 index 5b911442..00000000 --- a/obnam/exceptionTests.py +++ /dev/null @@ -1,36 +0,0 @@ -# Copyright (C) 2006 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Unit tests for obnam.exception""" - - -import unittest - -import obnam - - -class SampleException(obnam.ObnamException): - - def __init__(self, msg): - self._msg = msg - - -class ExceptionTests(unittest.TestCase): - - def test(self): - e = SampleException("pink") - self.failUnlessEqual(str(e), "pink") diff --git a/obnam/filelist.py b/obnam/filelist.py deleted file mode 100644 index 18aa67df..00000000 --- a/obnam/filelist.py +++ /dev/null @@ -1,114 +0,0 @@ -# Copyright (C) 2006 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""List of files in a backup generation""" - - -import os - - -import obnam - - -def create_file_component(pathname, contref, sigref, deltaref): - """Create a FILE component for a given pathname (and metadata)""" - return create_file_component_from_stat(pathname, os.lstat(pathname), - contref, sigref, deltaref) - - -def create_file_component_from_stat(pathname, st, contref, sigref, deltaref): - """Create a FILE component given pathname, stat results, etc""" - subs = [] - - subs.append(obnam.cmp.Component(obnam.cmp.FILENAME, pathname)) - - subs.append(obnam.cmp.create_stat_component(st)) - - if contref: - subs.append(obnam.cmp.Component(obnam.cmp.CONTREF, contref)) - if sigref: - subs.append(obnam.cmp.Component(obnam.cmp.SIGREF, sigref)) - if deltaref: - subs.append(obnam.cmp.Component(obnam.cmp.DELTAREF, deltaref)) - - return obnam.cmp.Component(obnam.cmp.FILE, subs) - - -class Filelist: - - """Handle the metadata for one generation of backups""" - - def __init__(self): - self.dict = {} - - def num_files(self): - """Return the number of files in a file list""" - return len(self.dict) - - def list_files(self): - """Return list of all file in the file list currently""" - return self.dict.keys() - - def add(self, pathname, contref, sigref, deltaref): - """Add a file (and its metadata) to a file list""" - self.dict[pathname] = create_file_component(pathname, - contref, - sigref, - deltaref) - - def add_file_component(self, pathname, file_cmp): - """Add a file component to a file list""" - self.dict[pathname] = file_cmp - - def find(self, pathname): - """Get the FILE component that corresponds to a pathname""" - return self.dict.get(pathname, None) - - def find_matching_inode(self, pathname, stat_result): - """Find the FILE component that matches stat_result""" - prev = self.find(pathname) - if prev: - prev_stat = prev.first_by_kind(obnam.cmp.STAT) - prev_st = obnam.cmp.parse_stat_component(prev_stat) - fields = ["st_dev", - "st_mode", - "st_nlink", - "st_uid", - "st_gid", - "st_size", - "st_mtime"] - for field in fields: - a_value = stat_result.__getattribute__(field) - b_value = prev_st.__getattribute__(field) - if a_value != b_value: - return None - return prev - else: - return None - - def to_object(self, object_id): - """Create an unencoded FILELIST object from a file list""" - o = obnam.obj.FileListObject(id=object_id) - for pathname in self.dict: - o.add(self.dict[pathname]) - return o - - def from_object(self, o): - """Add to file list data from a backup object""" - for file in o.find_by_kind(obnam.cmp.FILE): - pathname = file.first_string_by_kind(obnam.cmp.FILENAME) - self.dict[pathname] = file diff --git a/obnam/filelistTests.py b/obnam/filelistTests.py deleted file mode 100644 index 6215ad50..00000000 --- a/obnam/filelistTests.py +++ /dev/null @@ -1,139 +0,0 @@ -# Copyright (C) 2006 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Unit tests for obnam.filelist.""" - - -import os -import unittest - - -import obnam - - -class FileComponentTests(unittest.TestCase): - - filename = "README" - - def testCreate(self): - c = obnam.filelist.create_file_component(self.filename, "pink", - "pretty", "black") - self.check(c) - - def testCreateFromStatResult(self): - st = os.lstat(self.filename) - c = obnam.filelist.create_file_component_from_stat(self.filename, st, - "pink", "pretty", - "black") - self.check(c) - - def check(self, c): - self.failIfEqual(c, None) - self.failUnlessEqual(c.first_string_by_kind(obnam.cmp.FILENAME), - self.filename) - - c_stat = c.first_by_kind(obnam.cmp.STAT) - c_st = obnam.cmp.parse_stat_component(c_stat) - - st = os.lstat(self.filename) - self.failUnlessEqual(c_st.st_mode, st.st_mode) - self.failUnlessEqual(c_st.st_ino, st.st_ino) - self.failUnlessEqual(c_st.st_dev, st.st_dev) - self.failUnlessEqual(c_st.st_nlink, st.st_nlink) - self.failUnlessEqual(c_st.st_uid, st.st_uid) - self.failUnlessEqual(c_st.st_gid, st.st_gid) - self.failUnlessEqual(c_st.st_size, st.st_size) - self.failUnlessEqual(c_st.st_atime, st.st_atime) - self.failUnlessEqual(c_st.st_mtime, st.st_mtime) - self.failUnlessEqual(c_st.st_ctime, st.st_ctime) - self.failUnlessEqual(c_st.st_blocks, st.st_blocks) - self.failUnlessEqual(c_st.st_blksize, st.st_blksize) - self.failUnlessEqual(c_st.st_rdev, st.st_rdev) - - self.failUnlessEqual(c.first_string_by_kind(obnam.cmp.CONTREF), - "pink") - self.failUnlessEqual(c.first_string_by_kind(obnam.cmp.SIGREF), - "pretty") - self.failUnlessEqual(c.first_string_by_kind(obnam.cmp.DELTAREF), - "black") - - -class FilelistTests(unittest.TestCase): - - def testCreate(self): - fl = obnam.filelist.Filelist() - self.failUnlessEqual(fl.num_files(), 0) - - def testAddFind(self): - fl = obnam.filelist.Filelist() - fl.add(".", "pink", None, None) - self.failUnlessEqual(fl.num_files(), 1) - c = fl.find(".") - self.failUnlessEqual(c.get_kind(), obnam.cmp.FILE) - - def testListFiles(self): - fl = obnam.filelist.Filelist() - fl.add(".", "pink", None, None) - self.failUnlessEqual(fl.list_files(), ["."]) - - def testAddFileComponent(self): - fl = obnam.filelist.Filelist() - fc = obnam.filelist.create_file_component(".", "pink", None, None) - fl.add_file_component(".", fc) - self.failUnlessEqual(fl.num_files(), 1) - c = fl.find(".") - self.failUnlessEqual(c.get_kind(), obnam.cmp.FILE) - - def testToFromObject(self): - fl = obnam.filelist.Filelist() - fl.add(".", "pretty", None, None) - o = fl.to_object("pink") - self.failUnlessEqual(o.get_kind(), obnam.obj.FILELIST) - self.failUnlessEqual(o.get_id(), "pink") - - fl2 = obnam.filelist.Filelist() - fl2.from_object(o) - self.failIfEqual(fl2, None) - self.failUnlessEqual(type(fl), type(fl2)) - self.failUnlessEqual(fl2.num_files(), 1) - - c = fl2.find(".") - self.failIfEqual(c, None) - self.failUnlessEqual(c.get_kind(), obnam.cmp.FILE) - - -class FindTests(unittest.TestCase): - - def testFindInodeSuccessful(self): - pathname = "Makefile" - fl = obnam.filelist.Filelist() - fl.add(pathname, "pink", None, None) - st = os.lstat(pathname) - c = fl.find_matching_inode(pathname, st) - stat = c.first_by_kind(obnam.cmp.STAT) - st2 = obnam.cmp.parse_stat_component(stat) - self.failUnlessEqual(st.st_mtime, st2.st_mtime) - - def testFindInodeUnsuccessful(self): - pathname = "Makefile" - fl = obnam.filelist.Filelist() - fl.add(pathname, "pink", None, None) - st = os.lstat(".") - c = fl.find_matching_inode(pathname, st) - self.failUnlessEqual(c, None) - c = fl.find_matching_inode("plirps", st) - self.failUnlessEqual(c, None) diff --git a/obnam/format.py b/obnam/format.py deleted file mode 100644 index d383733f..00000000 --- a/obnam/format.py +++ /dev/null @@ -1,171 +0,0 @@ -# Copyright (C) 2006 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Format data for presentation""" - - -import os -import stat -import time - - -import obnam - - -def permissions(mode): - """Return a string like "ls -l" to indicate the permissions""" - - ru = wu = xu = rg = wg = xg = ro = wo = xo = "-" - - if mode & stat.S_IRUSR: - ru = "r" - if mode & stat.S_IWUSR: - wu = "w" - if mode & stat.S_IXUSR: - xu = "x" - if mode & stat.S_ISUID: - if mode & stat.S_IXUSR: - xu = "s" - else: - xu = "S" - - if mode & stat.S_IRGRP: - rg = "r" - if mode & stat.S_IWGRP: - wg = "w" - if mode & stat.S_IXGRP: - xg = "x" - if mode & stat.S_ISGID: - if mode & stat.S_IXGRP: - xg = "s" - else: - xg = "S" - - if mode & stat.S_IROTH: - ro = "r" - if mode & stat.S_IWOTH: - wo = "w" - if mode & stat.S_IXOTH: - xo = "x" - if mode & stat.S_ISVTX: - if mode & stat.S_IXOTH: - xo = "t" - else: - xo = "T" - - return ru + wu + xu + rg + wg + xg + ro + wo + xo - - -def filetype(mode): - """Return character to show the type of a file, like 'ls -l'""" - tests = [(stat.S_ISDIR, "d"), - (stat.S_ISCHR, "c"), - (stat.S_ISBLK, "b"), - (stat.S_ISREG, "-"), - (stat.S_ISFIFO, "p"), - (stat.S_ISLNK, "l"), - (stat.S_ISSOCK, "s"), - ] - for func, result in tests: - if func(mode): - return result - return "?" - - -def filemode(mode): - """Format the entire file mode like 'ls -l'""" - return filetype(mode) + permissions(mode) - - -def inode_fields(file_component): - format_integer = lambda x: "%d" % x - - fields = [("st_mode", filemode), - ("st_nlink", format_integer), - ("st_uid", format_integer), - ("st_gid", format_integer), - ("st_size", format_integer), - ("st_mtime", timestamp), - ] - - list = [] - stat_component = file_component.first_by_kind(obnam.cmp.STAT) - st = obnam.cmp.parse_stat_component(stat_component) - for kind, func in fields: - list.append(func(st.__getattribute__(kind))) - return list - - -def timestamp(seconds): - """Format a time stamp given in seconds since epoch""" - return time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(seconds)) - - -class Listing: - - """Format listings of contents of backups. - - The listings are formatted similar to the Unix 'ls -l' command. - - """ - - def __init__(self, context, output_file): - self._context = context - self._output = output_file - self._get_object = obnam.io.get_object - - def get_objects(self, refs): - list = [] - for ref in refs: - o = self._get_object(self._context, ref) - if o: - list.append(o) - return list - - def walk(self, dirs, filegroups, fullpath=None): - self.format(dirs, filegroups) - for dir in dirs: - dirrefs = dir.get_dirrefs() - fgrefs = dir.get_filegrouprefs() - if dirrefs or fgrefs: - name = dir.get_name() - if fullpath: - name = os.path.join(fullpath, name) - self._output.write("\n%s:\n" % name) - self.walk(self.get_objects(dirrefs), self.get_objects(fgrefs), - fullpath=name) - - def format(self, dirs, filegroups): - list = [] - - for dir in dirs: - list.append((dir.get_name(), dir.get_stat())) - for fg in filegroups: - for name in fg.get_names(): - list.append((name, fg.get_stat(name))) - - list.sort() - - for name, stat in list: - self._output.write("%s %d %d %d %d %s %s\n" % - (filemode(stat.st_mode), - stat.st_nlink, - stat.st_uid, - stat.st_gid, - stat.st_size, - timestamp(stat.st_mtime), - name)) diff --git a/obnam/formatTests.py b/obnam/formatTests.py deleted file mode 100644 index 5d97b470..00000000 --- a/obnam/formatTests.py +++ /dev/null @@ -1,177 +0,0 @@ -# Copyright (C) 2006 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Unit tests for obnam.format.""" - - -import re -import stat -import StringIO -import unittest - - -import obnam - - -class Fake: - - pass - - -class FormatPermissionsTests(unittest.TestCase): - - def testFormatPermissions(self): - facit = ( - (00000, "---------"), # No permissions for anyone - (00100, "--x------"), # Execute for owner - (00200, "-w-------"), # Write for owner - (00400, "r--------"), # Read for owner - (00010, "-----x---"), # Execute for group - (00020, "----w----"), # Write for group - (00040, "---r-----"), # Read for group - (00001, "--------x"), # Execute for others - (00002, "-------w-"), # Write for others - (00004, "------r--"), # Read for others - (01001, "--------t"), # Sticky bit - (01000, "--------T"), # Sticky bit (upper case since no x) - (02010, "-----s---"), # Set group id - (02000, "-----S---"), # Set group id (upper case since no x) - (04100, "--s------"), # Set user id - (04000, "--S------"), # Set user id (upper case since no x) - ) - for mode, correct in facit: - self.failUnlessEqual(obnam.format.permissions(mode), correct) - - -class FormatFileTypeTests(unittest.TestCase): - - def test(self): - facit = ( - (0, "?"), # Unknown - (stat.S_IFSOCK, "s"), # socket - (stat.S_IFLNK, "l"), # symbolic link - (stat.S_IFREG, "-"), # regular file - (stat.S_IFBLK, "b"), # block device - (stat.S_IFDIR, "d"), # directory - (stat.S_IFCHR, "c"), # character device - (stat.S_IFIFO, "p"), # FIFO - ) - for mode, correct in facit: - self.failUnlessEqual(obnam.format.filetype(mode), correct) - - -class FormatFileModeTest(unittest.TestCase): - - def test(self): - self.failUnlessEqual(obnam.format.filemode(0100777), "-rwxrwxrwx") - - -class FormatInodeFieldsTest(unittest.TestCase): - - def test(self): - st = Fake() - st.st_mode = 1 - st.st_ino = 1 - st.st_dev = 1 - st.st_nlink = 1 - st.st_uid = 1 - st.st_gid = 1 - st.st_size = 1 - st.st_atime = 1 - st.st_mtime = 1 - st.st_ctime = 1 - st.st_blocks = 1 - st.st_blksize = 1 - st.st_rdev = 1 - file_component = \ - obnam.filelist.create_file_component_from_stat("Makefile", st, - None, None, None) - - list = obnam.format.inode_fields(file_component) - - self.failUnlessEqual(list, ["?--------x"] + ["1"] * 4 + - ["1970-01-01 00:00:01"]) - - -class FormatTimeTests(unittest.TestCase): - - def test(self): - self.failUnlessEqual(obnam.format.timestamp(1), "1970-01-01 00:00:01") - - - -class ListingTests(unittest.TestCase): - - dirpat = re.compile(r"^drwxrwxrwx 0 0 0 0 1970-01-01 00:00:00 pretty$") - filepat = re.compile(r"^-rw-rw-rw- 0 0 0 0 1970-01-01 00:00:00 pink$") - - def make_filegroup(self, filenames): - fg = obnam.obj.FileGroupObject(id=obnam.obj.object_id_new()) - mode = 0666 | stat.S_IFREG - st = obnam.utils.make_stat_result(st_mode=mode) - for filename in filenames: - fg.add_file(filename, st, None, None, None) - - self.objects[fg.get_id()] = fg - return fg - - def make_dir(self, name, dirs, filegroups): - mode = 0777 | stat.S_IFDIR - st = obnam.utils.make_stat_result(st_mode=mode) - dir = obnam.obj.DirObject(id=obnam.obj.object_id_new(), - name=name, - stat=st, - dirrefs=[x.get_id() for x in dirs], - filegrouprefs=[x.get_id() - for x in filegroups]) - self.objects[dir.get_id()] = dir - return dir - - def mock_get_object(self, context, objid): - return self.objects.get(objid) - - def setUp(self): - self.objects = {} - self.file = StringIO.StringIO() - self.listing = obnam.format.Listing(None, self.file) - self.listing._get_object = self.mock_get_object - - def testWritesNothingForNothing(self): - self.listing.walk([], []) - self.failUnlessEqual(self.file.getvalue(), "") - - def testWritesAFileLineForOneFile(self): - fg = self.make_filegroup(["pink"]) - self.listing.walk([], [fg]) - self.failUnless(self.filepat.match(self.file.getvalue())) - - def testWritesADirLineForOneDir(self): - dir = self.make_dir("pretty", [], []) - self.listing.walk([dir], []) - self.failUnless(self.dirpat.match(self.file.getvalue())) - - def testWritesFileInSubdirectoryCorrectly(self): - fg = self.make_filegroup(["pink"]) - dir = self.make_dir("pretty", [], [fg]) - self.listing.walk([dir], []) - s = self.file.getvalue() - lines = s.splitlines() - self.failUnlessEqual(len(lines), 4) - self.failUnless(self.dirpat.match(lines[0])) - self.failUnlessEqual(lines[1], "") - self.failUnlessEqual(lines[2], "pretty:") - self.failUnless(self.filepat.match(lines[3])) diff --git a/obnam/gpg.py b/obnam/gpg.py deleted file mode 100644 index 1212dac4..00000000 --- a/obnam/gpg.py +++ /dev/null @@ -1,104 +0,0 @@ -# Copyright (C) 2006 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""GPG stuff for making backups""" - - -import logging -import os -import subprocess -import tempfile - - -import obnam - - -class GpgEncryptionFailure(obnam.ObnamException): - - def __init__(self, returncode, stderr): - self._msg = "GPG failed to encrypt: exit code %d" % returncode - if stderr: - self._msg += "\n%s" % indent_string(stderr) - - -class GpgDecryptionFailure(obnam.ObnamException): - - def __init__(self, returncode, stderr): - self._msg = "GPG failed to decrypt: exit code %d" % returncode - if stderr: - self._msg += "\n%s" % indent_string(stderr) - - -def encrypt(config, data): - """Encrypt data according to config""" - - logging.debug("Encrypting data with gpg") - - (fd, tempname) = tempfile.mkstemp() - os.write(fd, data) - os.lseek(fd, 0, 0) - - gpg = ["gpg", "-q", "--encrypt"] - gpg += ["--homedir=%s" % config.get("backup", "gpg-home")] - recipients = config.get("backup", "gpg-encrypt-to").split(" ") - gpg += ["-r%s" % x for x in recipients] - signer = config.get("backup", "gpg-sign-with") - if signer: - gpg += ["--sign", "-u%s" % signer] - - p = subprocess.Popen(gpg, stdin=fd, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - (encrypted, stderr_data) = p.communicate() - - os.close(fd) - os.remove(tempname) - - if p.returncode == 0: - logging.debug("Encryption OK") - return encrypted - else: - raise GpgEncryptionFailure(p.returncode, stderr_data) - - -def indent_string(str, indent=2): - """Indent all lines in a string with 'indent' spaces""" - return "".join([(" " * indent) + x for x in str.split("\n")]) - - -def decrypt(config, data): - """Decrypt data according to config""" - - logging.debug("Decrypting with gpg") - - (fd, tempname) = tempfile.mkstemp() - os.write(fd, data) - os.close(fd) - - gpg = ["gpg", "-q", "--decrypt"] - gpg += ["--homedir=%s" % config.get("backup", "gpg-home")] - gpg += [tempname] - - p = subprocess.Popen(gpg, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - (decrypted, stderr_data) = p.communicate() - - os.remove(tempname) - - if p.returncode == 0: - logging.debug("Decryption OK") - return decrypted - else: - raise GpgDecryptionFailure(p.returncode, stderr_data) diff --git a/obnam/gpgTests.py b/obnam/gpgTests.py deleted file mode 100644 index 68d286ba..00000000 --- a/obnam/gpgTests.py +++ /dev/null @@ -1,82 +0,0 @@ -# Copyright (C) 2006 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Unit tests for obnam.gpg.""" - - -import os -import shutil -import tempfile -import unittest - - -import obnam - - -class GpgEncryptionFailureTests(unittest.TestCase): - - def testIncludesExitCodeInMessage(self): - e = obnam.gpg.GpgEncryptionFailure(42, "") - self.failUnless("42" in str(e)) - - def testIncludesStderrInMessage(self): - e = obnam.gpg.GpgEncryptionFailure(42, "pink") - self.failUnless("pink" in str(e)) - - -class GpgDecryptionFailureTests(unittest.TestCase): - - def testIncludesExitCodeInMessage(self): - e = obnam.gpg.GpgDecryptionFailure(42, "") - self.failUnless("42" in str(e)) - - def testIncludesStderrInMessage(self): - e = obnam.gpg.GpgDecryptionFailure(42, "pink") - self.failUnless("pink" in str(e)) - - -class GpgTests(unittest.TestCase): - - def test(self): - block = "pink" - config = obnam.config.default_config() - config.set("backup", "gpg-home", "sample-gpg-home") - config.set("backup", "gpg-encrypt-to", "490C9ED1") - config.set("backup", "gpg-sign-with", "490C9ED1") - - encrypted = obnam.gpg.encrypt(config, block) - self.failIf(block in encrypted) - - decrypted = obnam.gpg.decrypt(config, encrypted) - self.failUnlessEqual(block, decrypted) - - def testEncryptionWithInvalidKey(self): - block = "pink" - config = obnam.config.default_config() - config.set("backup", "gpg-home", "sample-gpg-home") - config.set("backup", "gpg-encrypt-to", "pretty") - - self.failUnlessRaises(obnam.gpg.GpgEncryptionFailure, - obnam.gpg.encrypt, config, block) - - def testDecryptionWithInvalidData(self): - encrypted = "pink" - config = obnam.config.default_config() - config.set("backup", "gpg-home", "sample-gpg-home") - - self.failUnlessRaises(obnam.gpg.GpgDecryptionFailure, - obnam.gpg.decrypt, config, encrypted) diff --git a/obnam/io.py b/obnam/io.py deleted file mode 100644 index 7bc95434..00000000 --- a/obnam/io.py +++ /dev/null @@ -1,407 +0,0 @@ -# Copyright (C) 2006 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Module for doing local file I/O and higher level remote operations""" - - -import logging -import os -import stat -import subprocess -import tempfile - - -import obnam - - -def resolve(context, pathname): - """Resolve a pathname relative to the user's desired target directory""" - return os.path.join(context.config.get("backup", "target-dir"), pathname) - - -def unsolve(context, pathname): - """Undo resolve(context, pathname)""" - if pathname == os.sep: - return pathname - target = context.config.get("backup", "target-dir") - if not target.endswith(os.sep): - target += os.sep - if pathname.startswith(target): - return pathname[len(target):] - else: - return pathname - - -def flush_object_queue(context, oq, map, to_cache): - """Put all objects in an object queue into a block and upload it - - Also put mappings into map. The queue is cleared (emptied) afterwards. - - """ - - if not oq.is_empty(): - block_id = context.be.generate_block_id() - logging.debug("Creating new object block %s" % block_id) - block = oq.as_block(block_id) - context.be.upload_block(block_id, block, to_cache) - for id in oq.ids(): - obnam.map.add(map, id, block_id) - oq.clear() - - -def flush_all_object_queues(context): - """Flush and clear all object queues in a given context""" - flush_object_queue(context, context.oq, context.map, True) - flush_object_queue(context, context.content_oq, context.contmap, False) - - -def get_block(context, block_id): - """Get a block from cache or by downloading it""" - block = context.cache.get_block(block_id) - if not block: - block = context.be.download_block(block_id) - elif context.be.use_gpg(): - logging.debug("Decrypting cached block %s before using it", block_id) - block = obnam.gpg.decrypt(context.config, block) - return block - - -class MissingBlock(obnam.ObnamException): - - def __init__(self, block_id, object_id): - self._msg = "Block %s for object %s is missing" % \ - (block_id, object_id) - - -class ObjectCache: - - def __init__(self, context): - self.MAX = context.config.getint("backup", "object-cache-size") - if self.MAX <= 0: - self.MAX = context.config.getint("backup", "block-size") / 64 - # 64 bytes seems like a reasonably good guess at the typical - # size of an object that doesn't contain file data. Inodes, - # for example. - self.objects = {} - self.mru = [] - - def get(self, object_id): - if object_id in self.objects: - self.mru.remove(object_id) - self.mru.insert(0, object_id) - return self.objects[object_id] - else: - return None - - def forget(self, object_id): - if object_id in self.objects: - del self.objects[object_id] - self.mru.remove(object_id) - - def put(self, object): - object_id = object.get_id() - self.forget(object_id) - self.objects[object_id] = object - self.mru.insert(0, object_id) - while len(self.mru) > self.MAX: - self.forget(self.mru[-1]) - - def size(self): - return len(self.mru) - - -def get_object(context, object_id): - """Fetch an object""" - - logging.debug("Fetching object %s" % object_id) - - if context.object_cache is None: - context.object_cache = ObjectCache(context) - o = context.object_cache.get(object_id) - if o: - logging.debug("Object is in cache, good") - return o - - logging.debug("Object not in cache, looking up mapping") - block_id = obnam.map.get(context.map, object_id) - if not block_id: - block_id = obnam.map.get(context.contmap, object_id) - if not block_id: - return None - - logging.debug("Fetching block") - block = get_block(context, block_id) - - logging.debug("Decoding block") - list = obnam.obj.block_decode(block) - - logging.debug("Finding objects in block") - list = obnam.cmp.find_by_kind(list, obnam.cmp.OBJECT) - - logging.debug("Putting objects into object cache") - the_one = None - factory = obnam.obj.StorageObjectFactory() - for component in list: - subs = component.get_subcomponents() - o = factory.get_object(subs) - if o.get_kind() != obnam.obj.FILEPART: - context.object_cache.put(o) - if o.get_id() == object_id: - the_one = o - - logging.debug("Returning desired object") - return the_one - - -def upload_host_block(context, host_block): - """Upload a host block""" - context.be.upload_block(context.config.get("backup", "host-id"), host_block, False) - - -def get_host_block(context): - """Return (and fetch, if needed) the host block, or None if not found""" - host_id = context.config.get("backup", "host-id") - logging.debug("Getting host block %s" % host_id) - try: - return context.be.download_block(host_id) - except IOError: - return None - - -def enqueue_object(context, oq, map, object_id, object, to_cache): - """Put an object into the object queue, and flush queue if too big""" - block_size = context.config.getint("backup", "block-size") - cur_size = oq.combined_size() - if len(object) + cur_size > block_size: - obnam.io.flush_object_queue(context, oq, map, to_cache) - oq.clear() - oq.add(object_id, object) - - -def create_file_contents_object(context, filename): - """Create and queue objects to hold a file's contents""" - object_id = obnam.obj.object_id_new() - part_ids = [] - odirect_read = context.config.get("backup", "odirect-read") - block_size = context.config.getint("backup", "block-size") - f = subprocess.Popen([odirect_read, resolve(context, filename)], - stdout=subprocess.PIPE) - while True: - data = f.stdout.read(block_size) - if not data: - break - c = obnam.cmp.Component(obnam.cmp.FILECHUNK, data) - part_id = obnam.obj.object_id_new() - o = obnam.obj.FilePartObject(id=part_id, components=[c]) - o = o.encode() - enqueue_object(context, context.content_oq, context.contmap, - part_id, o, False) - part_ids.append(part_id) - - o = obnam.obj.FileContentsObject(id=object_id) - for part_id in part_ids: - c = obnam.cmp.Component(obnam.cmp.FILEPARTREF, part_id) - o.add(c) - o = o.encode() - enqueue_object(context, context.oq, context.map, object_id, o, True) - if context.progress: - context.progress.update_current_action(filename) - - return object_id - - -class FileContentsObjectMissing(obnam.ObnamException): - - def __init__(self, id): - self._msg = "Missing file contents object: %s" % id - - -def copy_file_contents(context, fd, cont_id): - """Write contents of a file in backup to a file descriptor""" - cont = obnam.io.get_object(context, cont_id) - if not cont: - raise FileContentsObjectMissing(cont_id) - part_ids = cont.find_strings_by_kind(obnam.cmp.FILEPARTREF) - for part_id in part_ids: - part = obnam.io.get_object(context, part_id) - chunk = part.first_string_by_kind(obnam.cmp.FILECHUNK) - os.write(fd, chunk) - - -def reconstruct_file_contents(context, fd, delta_id): - """Write (to file descriptor) file contents, given an rsync delta""" - logging.debug("Reconstructing contents %s to %d" % (delta_id, fd)) - - logging.debug("Finding chain of DELTAs") - - delta = obnam.io.get_object(context, delta_id) - if not delta: - logging.error("Can't find DELTA object to reconstruct: %s" % delta_id) - return - - stack = [delta] - while True: - prev_delta_id = stack[-1].first_string_by_kind(obnam.cmp.DELTAREF) - if not prev_delta_id: - break - prev_delta = obnam.io.get_object(context, prev_delta_id) - if not prev_delta: - logging.error("Can't find DELTA object %s" % prev_delta_id) - return - stack.append(prev_delta) - - cont_id = stack[-1].first_string_by_kind(obnam.cmp.CONTREF) - if not cont_id: - logging.error("DELTA object chain does not end in CONTREF") - return - - logging.debug("Creating initial version of file") - (temp_fd1, temp_name1) = tempfile.mkstemp() - copy_file_contents(context, temp_fd1, cont_id) - - while stack: - delta = stack[-1] - stack = stack[:-1] - logging.debug("Applying DELTA %s" % delta.get_id()) - - deltapart_ids = delta.find_strings_by_kind(obnam.cmp.DELTAPARTREF) - - (temp_fd2, temp_name2) = tempfile.mkstemp() - obnam.rsync.apply_delta(context, temp_name1, deltapart_ids, - temp_name2) - os.remove(temp_name1) - os.close(temp_fd1) - temp_name1 = temp_name2 - temp_fd1 = temp_fd2 - - logging.debug("Copying final version of file to file descriptor %d" % fd) - while True: - data = os.read(temp_fd1, 64 * 1024) - if not data: - break - os.write(fd, data) - - os.close(temp_fd1) - os.remove(temp_name1) - - -def set_inode(full_pathname, file_component): - stat_component = file_component.first_by_kind(obnam.cmp.STAT) - st = obnam.cmp.parse_stat_component(stat_component) - os.utime(full_pathname, (st.st_atime, st.st_mtime)) - os.chmod(full_pathname, stat.S_IMODE(st.st_mode)) - - -_interesting = set([obnam.cmp.OBJECT, obnam.cmp.FILE]) -def _find_refs(components, refs=None): - """Return set of all references (recursively) in a list of components""" - if refs is None: - refs = set() - - for c in components: - kind = c.get_kind() - if obnam.cmp.kind_is_reference(kind): - refs.add(c.get_string_value()) - elif kind in _interesting: - subs = c.get_subcomponents() - _find_refs(subs, refs) - - return refs - - -def find_reachable_data_blocks(context, host_block): - """Find all blocks with data that can be reached from host block""" - logging.debug("Finding reachable data") - host = obnam.obj.create_host_from_block(host_block) - gen_ids = host.get_generation_ids() - object_ids = set(gen_ids) - reachable_block_ids = set() - while object_ids: - logging.debug("find_reachable_data_blocks: %d remaining" % - len(object_ids)) - object_id = object_ids.pop() - block_id = obnam.map.get(context.map, object_id) - if not block_id: - block_id = obnam.map.get(context.contmap, object_id) - if not block_id: - logging.warning("Can't find object %s in any block" % object_id) - elif block_id not in reachable_block_ids: - logging.debug("Marking block as reachable: %s" % block_id) - assert block_id is not None - reachable_block_ids.add(block_id) - block = get_block(context, block_id) - logging.debug("Finding references within block") - refs = _find_refs(obnam.obj.block_decode(block)) - logging.debug("This block contains %d refs" % len(refs)) - refs = [ref for ref in refs if ref not in reachable_block_ids] - logging.debug("This block contains %d refs not already reachable" - % len(refs)) - for ref in refs: - object_ids.add(ref) - return [x for x in reachable_block_ids] - - -def find_map_blocks_in_use(context, host_block, data_block_ids): - """Given data blocks in use, return map blocks they're mentioned in""" - data_block_ids = set(data_block_ids) - host = obnam.obj.create_host_from_block(host_block) - map_block_ids = host.get_map_block_ids() - contmap_block_ids = host.get_contmap_block_ids() - used_map_block_ids = set() - for map_block_id in map_block_ids + contmap_block_ids: - block = get_block(context, map_block_id) - list = obnam.obj.block_decode(block) - assert type(list) == type([]) - list = obnam.cmp.find_by_kind(list, obnam.cmp.OBJMAP) - for c in list: - id = c.first_string_by_kind(obnam.cmp.BLOCKREF) - if id in data_block_ids: - used_map_block_ids.add(map_block_id) - break # We already know this entire map block is used - return [x for x in used_map_block_ids] - # FIXME: This needs to keep normal and content maps separate. - - -def collect_garbage(context, host_block): - """Find files on the server store that are not linked from host object""" - logging.debug("Collecting garbage") - host_id = context.config.get("backup", "host-id") - logging.debug("GC: finding reachable data") - data_block_ids = find_reachable_data_blocks(context, host_block) - logging.debug("GC: finding map blocks still in use") - map_block_ids = find_map_blocks_in_use(context, host_block, - data_block_ids) - logging.debug("GC: finding all files in store") - files = context.be.list() - for id in [host_id] + data_block_ids + map_block_ids: - if id in files: - files.remove(id) - for garbage in files: - logging.debug("GC: Removing file %s" % garbage) - context.be.remove(garbage) - logging.debug("GC: done") - - -def load_maps(context, map, block_ids): - """Load and parse mapping blocks, store results in map""" - num_blocks = len(block_ids) - logging.debug("Loading %d maps" % num_blocks) - for i in range(num_blocks): - id = block_ids[i] - logging.debug("Loading map block %d/%d: %s" % (i+1, num_blocks, id)) - block = obnam.io.get_block(context, id) - obnam.map.decode_block(map, block) diff --git a/obnam/ioTests.py b/obnam/ioTests.py deleted file mode 100644 index 4e878566..00000000 --- a/obnam/ioTests.py +++ /dev/null @@ -1,534 +0,0 @@ -# Copyright (C) 2006 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Unit tests for obnam.io""" - - -import os -import shutil -import tempfile -import unittest - - -import obnam - - -class ExceptionTests(unittest.TestCase): - - def testMissingBlock(self): - e = obnam.io.MissingBlock("pink", "pretty") - self.failUnless("pink" in str(e)) - self.failUnless("pretty" in str(e)) - - def testFileContentsObjectMissing(self): - e = obnam.io.FileContentsObjectMissing("pink") - self.failUnless("pink" in str(e)) - - -class ResolveTests(unittest.TestCase): - - def test(self): - context = obnam.context.Context() - # We don't need the fields that are usually initialized manually. - - facit = ( - (".", "/", "/"), - (".", "/pink", "/pink"), - (".", "pink", "./pink"), - ("pink", "/", "/"), - ("pink", "/pretty", "/pretty"), - ("pink", "pretty", "pink/pretty"), - ("/pink", "/", "/"), - ("/pink", "/pretty", "/pretty"), - ("/pink", "pretty", "/pink/pretty"), - ("/", "/", "/"), - ) - - for target, pathname, resolved in facit: - context.config.set("backup", "target-dir", target) - x = obnam.io.resolve(context, pathname) - self.failUnlessEqual(x, resolved) - self.failUnlessEqual(obnam.io.unsolve(context, x), pathname) - - self.failUnlessEqual(obnam.io.unsolve(context, "/pink"), "pink") - - -class IoBase(unittest.TestCase): - - def setUp(self): - self.cachedir = "tmp.cachedir" - self.rootdir = "tmp.rootdir" - - os.mkdir(self.cachedir) - os.mkdir(self.rootdir) - - config_list = ( - ("backup", "cache", self.cachedir), - ("backup", "store", self.rootdir) - ) - - self.context = obnam.context.Context() - - for section, item, value in config_list: - self.context.config.set(section, item, value) - - self.context.cache = obnam.cache.Cache(self.context.config) - self.context.be = obnam.backend.init(self.context.config, - self.context.cache) - - def tearDown(self): - shutil.rmtree(self.cachedir) - shutil.rmtree(self.rootdir) - del self.cachedir - del self.rootdir - del self.context - - -class ObjectQueueFlushing(IoBase): - - def testEmptyQueue(self): - obnam.io.flush_object_queue(self.context, self.context.oq, - self.context.map, False) - list = self.context.be.list() - self.failUnlessEqual(list, []) - - def testFlushing(self): - self.context.oq.add("pink", "pretty") - - self.failUnlessEqual(self.context.be.list(), []) - - obnam.io.flush_object_queue(self.context, self.context.oq, - self.context.map, False) - - list = self.context.be.list() - self.failUnlessEqual(len(list), 1) - - b1 = os.path.basename(obnam.map.get(self.context.map, "pink")) - b2 = os.path.basename(list[0]) - self.failUnlessEqual(b1, b2) - - def testFlushAll(self): - self.context.oq.add("pink", "pretty") - self.context.content_oq.add("x", "y") - obnam.io.flush_all_object_queues(self.context) - self.failUnlessEqual(len(self.context.be.list()), 2) - self.failUnless(self.context.oq.is_empty()) - self.failUnless(self.context.content_oq.is_empty()) - - -class GetBlockTests(IoBase): - - def setup_pink_block(self, to_cache): - self.context.be.upload_block("pink", "pretty", to_cache) - - def testRaisesIoErrorForNonExistentBlock(self): - self.failUnlessRaises(IOError, obnam.io.get_block, self.context, "pink") - - def testFindsBlockWhenNotInCache(self): - self.setup_pink_block(to_cache=False) - self.failUnless(obnam.io.get_block(self.context, "pink")) - - def testFindsBlockWhenInCache(self): - self.setup_pink_block(to_cache=True) - obnam.io.get_block(self.context, "pink") - self.failUnless(obnam.io.get_block(self.context, "pink")) - - -class GetObjectTests(IoBase): - - def upload_object(self, object_id, object): - self.context.oq.add(object_id, object) - obnam.io.flush_object_queue(self.context, self.context.oq, - self.context.map, False) - - def testGetObject(self): - id = "pink" - component = obnam.cmp.Component(42, "pretty") - object = obnam.obj.FilePartObject(id=id) - object.add(component) - object = object.encode() - self.upload_object(id, object) - o = obnam.io.get_object(self.context, id) - - self.failUnlessEqual(o.get_id(), id) - self.failUnlessEqual(o.get_kind(), obnam.obj.FILEPART) - list = o.get_components() - list = [c for c in list if c.get_kind() not in [obnam.cmp.OBJID, - obnam.cmp.OBJKIND]] - self.failUnlessEqual(len(list), 1) - self.failUnlessEqual(list[0].get_kind(), 42) - self.failUnlessEqual(list[0].get_string_value(), "pretty") - - def testGetObjectTwice(self): - id = "pink" - component = obnam.cmp.Component(42, "pretty") - object = obnam.obj.FileContentsObject(id=id) - object.add(component) - object = object.encode() - self.upload_object(id, object) - o = obnam.io.get_object(self.context, id) - o2 = obnam.io.get_object(self.context, id) - self.failUnlessEqual(o, o2) - - def testReturnsNoneForNonexistentObject(self): - self.failUnlessEqual(obnam.io.get_object(self.context, "pink"), None) - - -class HostBlock(IoBase): - - def testFetchHostBlock(self): - host_id = self.context.config.get("backup", "host-id") - host = obnam.obj.HostBlockObject(host_id=host_id, - gen_ids=["gen1", "gen2"], - map_block_ids=["map1", "map2"], - contmap_block_ids=["contmap1", - "contmap2"]) - host = host.encode() - be = obnam.backend.init(self.context.config, self.context.cache) - - obnam.io.upload_host_block(self.context, host) - host2 = obnam.io.get_host_block(self.context) - self.failUnlessEqual(host, host2) - - def testFetchNonexistingHostBlockReturnsNone(self): - self.failUnlessEqual(obnam.io.get_host_block(self.context), None) - - -class ObjectQueuingTests(unittest.TestCase): - - def find_block_files(self, config): - files = [] - root = config.get("backup", "store") - for dirpath, _, filenames in os.walk(root): - files += [os.path.join(dirpath, x) for x in filenames] - files.sort() - return files - - def testEnqueue(self): - context = obnam.context.Context() - object_id = "pink" - object = "pretty" - context.config.set("backup", "block-size", "%d" % 128) - context.cache = obnam.cache.Cache(context.config) - context.be = obnam.backend.init(context.config, context.cache) - - self.failUnlessEqual(self.find_block_files(context.config), []) - - obnam.io.enqueue_object(context, context.oq, context.map, - object_id, object, False) - - self.failUnlessEqual(self.find_block_files(context.config), []) - self.failUnlessEqual(context.oq.combined_size(), len(object)) - - object_id2 = "pink2" - object2 = "x" * 1024 - - obnam.io.enqueue_object(context, context.oq, context.map, - object_id2, object2, False) - - self.failUnlessEqual(len(self.find_block_files(context.config)), 1) - self.failUnlessEqual(context.oq.combined_size(), len(object2)) - - shutil.rmtree(context.config.get("backup", "cache"), True) - shutil.rmtree(context.config.get("backup", "store"), True) - - -class FileContentsTests(unittest.TestCase): - - def setUp(self): - self.context = obnam.context.Context() - self.context.cache = obnam.cache.Cache(self.context.config) - self.context.be = obnam.backend.init(self.context.config, - self.context.cache) - - def tearDown(self): - for x in ["cache", "store"]: - if os.path.exists(self.context.config.get("backup", x)): - shutil.rmtree(self.context.config.get("backup", x)) - - def testEmptyFile(self): - (fd, filename) = tempfile.mkstemp() - os.close(fd) - - id = obnam.io.create_file_contents_object(self.context, filename) - - self.failIfEqual(id, None) - self.failUnlessEqual(self.context.oq.ids(), [id]) - self.failUnlessEqual(obnam.map.count(self.context.map), 0) - # there's no mapping yet, because the queue is small enough - # that there has been no need to flush it - - os.remove(filename) - - def testNonEmptyFile(self): - block_size = 4096 - self.context.config.set("backup", "block-size", "%d" % block_size) - filename = "Makefile" - - id = obnam.io.create_file_contents_object(self.context, filename) - - self.failIfEqual(id, None) - self.failUnlessEqual(self.context.oq.ids(), [id]) - - def testRestore(self): - block_size = 4096 - self.context.config.set("backup", "block-size", "%d" % block_size) - filename = "Makefile" - - id = obnam.io.create_file_contents_object(self.context, filename) - obnam.io.flush_object_queue(self.context, self.context.oq, - self.context.map, False) - obnam.io.flush_object_queue(self.context, self.context.content_oq, - self.context.contmap, False) - - (fd, name) = tempfile.mkstemp() - obnam.io.copy_file_contents(self.context, fd, id) - os.close(fd) - - f = file(name, "r") - data1 = f.read() - f.close() - os.remove(name) - - f = file(filename, "r") - data2 = f.read() - f.close() - - self.failUnlessEqual(data1, data2) - - def testRestoreNonexistingFile(self): - self.failUnlessRaises(obnam.io.FileContentsObjectMissing, - obnam.io.copy_file_contents, self.context, None, "pink") - - -class MetaDataTests(unittest.TestCase): - - def testSet(self): - (fd, name) = tempfile.mkstemp() - os.close(fd) - - st1 = os.stat(name) - inode = obnam.filelist.create_file_component_from_stat(name, st1, - None, None, - None) - - os.chmod(name, 0) - - obnam.io.set_inode(name, inode) - - st2 = os.stat(name) - - self.failUnlessEqual(st1.st_mode, st2.st_mode) - self.failUnlessEqual(st1.st_atime, st2.st_atime) - self.failUnlessEqual(st1.st_mtime, st2.st_mtime) - - -class ObjectCacheTests(unittest.TestCase): - - def setUp(self): - self.object = obnam.obj.FilePartObject(id="pink") - self.object2 = obnam.obj.FilePartObject(id="pretty") - self.object3 = obnam.obj.FilePartObject(id="beautiful") - - def testCreate(self): - context = obnam.context.Context() - oc = obnam.io.ObjectCache(context) - self.failUnlessEqual(oc.size(), 0) - self.failUnless(oc.MAX > 0) - - def testPut(self): - context = obnam.context.Context() - oc = obnam.io.ObjectCache(context) - self.failUnlessEqual(oc.get("pink"), None) - oc.put(self.object) - self.failUnlessEqual(oc.get("pink"), self.object) - - def testPutWithOverflow(self): - context = obnam.context.Context() - oc = obnam.io.ObjectCache(context) - oc.MAX = 1 - oc.put(self.object) - self.failUnlessEqual(oc.size(), 1) - self.failUnlessEqual(oc.get("pink"), self.object) - oc.put(self.object2) - self.failUnlessEqual(oc.size(), 1) - self.failUnlessEqual(oc.get("pink"), None) - self.failUnlessEqual(oc.get("pretty"), self.object2) - - def testPutWithOverflowPart2(self): - context = obnam.context.Context() - oc = obnam.io.ObjectCache(context) - oc.MAX = 2 - - oc.put(self.object) - oc.put(self.object2) - self.failUnlessEqual(oc.size(), 2) - self.failUnlessEqual(oc.get("pink"), self.object) - self.failUnlessEqual(oc.get("pretty"), self.object2) - - oc.get("pink") - oc.put(self.object3) - self.failUnlessEqual(oc.size(), 2) - self.failUnlessEqual(oc.get("pink"), self.object) - self.failUnlessEqual(oc.get("pretty"), None) - self.failUnlessEqual(oc.get("beautiful"), self.object3) - - -class ReachabilityTests(IoBase): - - def testNoDataNoMaps(self): - host_id = self.context.config.get("backup", "host-id") - host = obnam.obj.HostBlockObject(host_id=host_id).encode() - obnam.io.upload_host_block(self.context, host) - - list = obnam.io.find_reachable_data_blocks(self.context, host) - self.failUnlessEqual(list, []) - - list2 = obnam.io.find_map_blocks_in_use(self.context, host, list) - self.failUnlessEqual(list2, []) - - def testNoDataExtraMaps(self): - obnam.map.add(self.context.map, "pink", "pretty") - map_block_id = "box" - map_block = obnam.map.encode_new_to_block(self.context.map, - map_block_id) - self.context.be.upload_block(map_block_id, map_block, False) - - obnam.map.add(self.context.contmap, "black", "beautiful") - contmap_block_id = "fiddly" - contmap_block = obnam.map.encode_new_to_block( - self.context.contmap, contmap_block_id) - self.context.be.upload_block(contmap_block_id, contmap_block, False) - - host_id = self.context.config.get("backup", "host-id") - host = obnam.obj.HostBlockObject(host_id=host_id, - map_block_ids=[map_block_id], - contmap_block_ids=[contmap_block_id]) - host = host.encode() - obnam.io.upload_host_block(self.context, host) - - list = obnam.io.find_map_blocks_in_use(self.context, host, []) - self.failUnlessEqual(list, []) - - def testDataAndMap(self): - o = obnam.obj.FilePartObject(id="rouge") - c = obnam.cmp.Component(obnam.cmp.FILECHUNK, "moulin") - o.add(c) - encoded_o = o.encode() - - block_id = "pink" - oq = obnam.obj.ObjectQueue() - oq.add("rouge", encoded_o) - block = oq.as_block(block_id) - self.context.be.upload_block(block_id, block, False) - - obnam.map.add(self.context.contmap, "rouge", block_id) - map_block_id = "pretty" - map_block = obnam.map.encode_new_to_block(self.context.contmap, - map_block_id) - self.context.be.upload_block(map_block_id, map_block, False) - - host_id = self.context.config.get("backup", "host-id") - host = obnam.obj.HostBlockObject(host_id=host_id, - map_block_ids=[map_block_id]) - host = host.encode() - obnam.io.upload_host_block(self.context, host) - - list = obnam.io.find_map_blocks_in_use(self.context, host, - [block_id]) - self.failUnlessEqual(list, [map_block_id]) - - -class GarbageCollectionTests(IoBase): - - def testFindUnreachableFiles(self): - host_id = self.context.config.get("backup", "host-id") - host = obnam.obj.HostBlockObject(host_id=host_id).encode() - obnam.io.upload_host_block(self.context, host) - - block_id = self.context.be.generate_block_id() - self.context.be.upload_block(block_id, "pink", False) - - files = self.context.be.list() - self.failUnlessEqual(files, [host_id, block_id]) - - obnam.io.collect_garbage(self.context, host) - files = self.context.be.list() - self.failUnlessEqual(files, [host_id]) - - -class ObjectCacheRegressionTest(unittest.TestCase): - - # This test case is for a bug in obnam.io.ObjectCache: with the - # right sequence of operations, the cache can end up in a state where - # the MRU list is too long, but contains two instances of the same - # object ID. When the list is shortened, the first instance of the - # ID is removed, and the object is also removed from the dictionary. - # If the list is still too long, it is shortened again, by removing - # the last item in the list, but that no longer is in the dictionary, - # resulting in the shortening not happening. Voila, an endless loop. - # - # As an example, if the object queue maximum size is 3, the following - # sequence exhibits the problem: - # - # put('a') mru = ['a'] - # put('b') mru = ['b', 'a'] - # put('c') mru = ['c', 'b', 'a'] - # put('a') mru = ['a', 'c', 'b', 'a'], shortened into - # ['c', 'b', 'a'], and now dict no longer - # has 'a' - # put('d') mru = ['d', 'c', 'b', 'a'], which needs to be - # shortened by removing the last element, but - # since 'a' is no longer in dict, the list - # doesn't actually become shorter, and - # the shortening loop becomes infinite - # - # (The fix to the bug is, of course, to forget the object to be - # inserted before inserting it, thus removing duplicates in the MRU - # list.) - - def test(self): - context = obnam.context.Context() - context.config.set("backup", "object-cache-size", "3") - oc = obnam.io.ObjectCache(context) - a = obnam.obj.FilePartObject(id="a") - b = obnam.obj.FilePartObject(id="b") - c = obnam.obj.FilePartObject(id="c") - d = obnam.obj.FilePartObject(id="d") - oc.put(a) - oc.put(b) - oc.put(c) - oc.put(a) - # If the bug is there, the next method call doesn't return. - # Beware the operator. - oc.put(b) - - -class LoadMapTests(IoBase): - - def test(self): - map = obnam.map.create() - obnam.map.add(map, "pink", "pretty") - block_id = self.context.be.generate_block_id() - block = obnam.map.encode_new_to_block(map, block_id) - self.context.be.upload_block(block_id, block, False) - - obnam.io.load_maps(self.context, self.context.map, [block_id]) - self.failUnlessEqual(obnam.map.get(self.context.map, "pink"), - "pretty") - self.failUnlessEqual(obnam.map.get(self.context.map, "black"), - None) diff --git a/obnam/log.py b/obnam/log.py deleted file mode 100644 index a10f91cd..00000000 --- a/obnam/log.py +++ /dev/null @@ -1,52 +0,0 @@ -# Copyright (C) 2006, 2007 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Setting up the logging module""" - - -import logging -import os -import sys -import time - - -levels = { - "debug": logging.DEBUG, - "info": logging.INFO, - "warning": logging.WARNING, - "error": logging.ERROR, - "critical": logging.CRITICAL, -} - - -def setup(config): - filename = config.get("backup", "log-file") - f = sys.stdout - if filename: - fd = os.open(filename, os.O_WRONLY | os.O_APPEND | os.O_CREAT, 0600) - f = os.fdopen(fd, "a") - level = config.get("backup", "log-level") - - formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s", - "%Y-%m-%d %H:%M:%S") - - handler = logging.StreamHandler(f) - handler.setFormatter(formatter) - - logger = logging.getLogger() - logger.setLevel(levels[level.lower()]) - logger.addHandler(handler) diff --git a/obnam/logTests.py b/obnam/logTests.py deleted file mode 100644 index b866381d..00000000 --- a/obnam/logTests.py +++ /dev/null @@ -1,49 +0,0 @@ -# Copyright (C) 2007 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Unit tests for obnam.log""" - - -import os -import stat -import unittest - -import obnam - - -class LogTests(unittest.TestCase): - - filename = "unittest.testlog" - - def setUp(self): - if os.path.exists(self.filename): - os.remove(self.filename) - - tearDown = setUp - - def testCreateNew(self): - self.failIf(os.path.exists(self.filename)) - - config = obnam.config.default_config() - config.set("backup", "log-file", self.filename) - - obnam.log.setup(config) - self.failUnless(os.path.exists(self.filename)) - - st = os.stat(self.filename) - self.failUnless(stat.S_ISREG(st.st_mode)) - self.failUnlessEqual(stat.S_IMODE(st.st_mode), 0600) diff --git a/obnam/map.py b/obnam/map.py deleted file mode 100644 index ac8d02ea..00000000 --- a/obnam/map.py +++ /dev/null @@ -1,139 +0,0 @@ -# Copyright (C) 2006 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Mapping of object to block identifiers""" - -import logging - -import obnam.cmp - - -class Mappings: - - def __init__(self): - self.dict = {} - self.new_keys = {} - - -def create(): - """Create a new object ID to block ID mapping""" - return Mappings() - - -def count(mapping): - """Return the number of mappings in 'mapping'""" - return len(mapping.dict.keys()) - - -def add(mapping, object_id, block_id): - """Add a mapping from object_id to block_id""" - _add_old(mapping, object_id, block_id) - if object_id not in mapping.new_keys: - mapping.new_keys[object_id] = 1 - - -def _add_old(mapping, object_id, block_id): - """Add a mapping from object_id to block_id""" - assert object_id not in mapping.dict - mapping.dict[object_id] = block_id - - -def get(mapping, object_id): - """Return the list of blocks, in order, that contain parts of an object""" - return mapping.dict.get(object_id, None) - - -def get_new(mapping): - """Return list of new mappings""" - return mapping.new_keys.keys() - - -def reset_new(mapping): - """Reset list of new mappings""" - mapping.new_keys = {} - - -def encode_new(mapping): - """Return a list of encoded components for the new mappings""" - list = [] - dict = {} - for object_id in get_new(mapping): - block_id = get(mapping, object_id) - if block_id in dict: - dict[block_id].append(object_id) - else: - dict[block_id] = [object_id] - for block_id in dict: - object_ids = dict[block_id] - object_ids = [obnam.cmp.Component(obnam.cmp.OBJREF, x) - for x in object_ids] - block_id = obnam.cmp.Component(obnam.cmp.BLOCKREF, block_id) - c = obnam.cmp.Component(obnam.cmp.OBJMAP, - [block_id] + object_ids) - list.append(c.encode()) - return list - - -def encode_new_to_block(mapping, block_id): - """Encode new mappings into a block""" - c = obnam.cmp.Component(obnam.cmp.BLKID, block_id) - list = encode_new(mapping) - block = "".join([obnam.obj.BLOCK_COOKIE, c.encode()] + list) - return block - - -# This function used to use the block and component parsing code in -# obnam.obj and obnam.cmp, namely the obnam.obj.block_decode function. -# However, it turned out to be pretty slow, and since we load maps at -# the beginning of pretty much any backup run, the following version was -# written, and measured with benchmarks to run in about a quarter of the -# speed of the original. If the structure of blocks changes, this code -# needs to change as well. - -def decode_block(mapping, block): - """Decode a block with mappings, add them to mapping object""" - logging.debug("Decoding mapping block") - - if not block.startswith(obnam.obj.BLOCK_COOKIE): - raise obnam.obj.BlockWithoutCookie(block) - - pos = len(obnam.obj.BLOCK_COOKIE) - end = len(block) - - while pos < end: - size, pos = obnam.varint.decode(block, pos) - kind, pos = obnam.varint.decode(block, pos) - - if kind == obnam.cmp.OBJMAP: - pos2 = pos - end2 = pos + size - block_id = None - object_ids = [] - while pos2 < end2: - size2, pos2 = obnam.varint.decode(block, pos2) - kind2, pos2 = obnam.varint.decode(block, pos2) - data2 = block[pos2:pos2+size2] - pos2 += size2 - if kind2 == obnam.cmp.BLOCKREF: - block_id = data2 - elif kind2 == obnam.cmp.OBJREF: - object_ids.append(data2) - if object_ids and block_id: - for object_id in object_ids: - _add_old(mapping, object_id, block_id) - - pos += size diff --git a/obnam/mapTests.py b/obnam/mapTests.py deleted file mode 100644 index 563c08c6..00000000 --- a/obnam/mapTests.py +++ /dev/null @@ -1,113 +0,0 @@ -# Copyright (C) 2006 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Unit tests for obnam.map.""" - - -import unittest - - -import obnam - - -class ObjectMappingTests(unittest.TestCase): - - def testInvalidBlockRaisesException(self): - m = obnam.map.create() - self.failUnlessRaises(obnam.obj.BlockWithoutCookie, - obnam.map.decode_block, m, "pink") - - def testEmpty(self): - m = obnam.map.create() - self.failUnlessEqual(obnam.map.count(m), 0) - - def testGetNonexisting(self): - m = obnam.map.create() - self.failUnlessEqual(obnam.map.get(m, "pink"), None) - - def testAddOneMapping(self): - m = obnam.map.create() - obnam.map.add(m, "pink", "pretty") - self.failUnlessEqual(obnam.map.count(m), 1) - - self.failUnlessEqual(obnam.map.get(m, "pink"), "pretty") - - def testAddTwoMappings(self): - m = obnam.map.create() - obnam.map.add(m, "pink", "pretty") - self.failUnlessRaises(AssertionError, obnam.map.add, - m, "pink", "beautiful") - - def testGetNewMappings(self): - m = obnam.map.create() - self.failUnlessEqual(obnam.map.get_new(m), []) - obnam.map.add(m, "pink", "pretty") - self.failUnlessEqual(obnam.map.get_new(m), ["pink"]) - obnam.map.reset_new(m) - self.failUnlessEqual(obnam.map.get_new(m), []) - obnam.map.add(m, "black", "beautiful") - self.failUnlessEqual(obnam.map.get_new(m), ["black"]) - - def testMappingEncodings(self): - # Set up a mapping - m = obnam.map.create() - - # It's empty; make sure encoding new ones returns an empty list - list = obnam.map.encode_new(m) - self.failUnlessEqual(list, []) - - # Add a mapping - obnam.map.add(m, "pink", "pretty") - - # Encode the new mapping, make sure that goes well - list = obnam.map.encode_new(m) - self.failUnlessEqual(len(list), 1) - - # Make sure the encoding is correct - list2 = obnam.cmp.Parser(list[0]).decode_all() - self.failUnlessEqual(len(list2), 1) - self.failUnlessEqual(list2[0].get_kind(), obnam.cmp.OBJMAP) - - list3 = list2[0].get_subcomponents() - self.failUnlessEqual(len(list3), 2) - self.failUnlessEqual(list3[0].get_kind(), obnam.cmp.BLOCKREF) - self.failUnlessEqual(list3[0].get_string_value(), "pretty") - self.failUnlessEqual(list3[1].get_kind(), obnam.cmp.OBJREF) - self.failUnlessEqual(list3[1].get_string_value(), "pink") - - # Now try decoding with the official function - block = obnam.map.encode_new_to_block(m, "black") - m2 = obnam.map.create() - obnam.map.decode_block(m2, block) - self.failUnlessEqual(obnam.map.count(m2), 1) - self.failUnlessEqual(obnam.map.get(m2, "pink"), "pretty") - - def testMappingEncodingsForTwoInOneBlock(self): - m = obnam.map.create() - - obnam.map.add(m, "pink", "pretty") - obnam.map.add(m, "black", "pretty") - - list = obnam.map.encode_new(m) - self.failUnlessEqual(len(list), 1) - - block = obnam.map.encode_new_to_block(m, "box") - m2 = obnam.map.create() - obnam.map.decode_block(m2, block) - self.failUnlessEqual(obnam.map.count(m), obnam.map.count(m2)) - self.failUnlessEqual(obnam.map.get(m2, "pink"), "pretty") - self.failUnlessEqual(obnam.map.get(m2, "black"), "pretty") diff --git a/obnam/obj.py b/obnam/obj.py deleted file mode 100644 index 5168900c..00000000 --- a/obnam/obj.py +++ /dev/null @@ -1,521 +0,0 @@ -# Copyright (C) 2006, 2007, 2008 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Backup objects""" - -import logging - -import uuid - -import obnam.cmp -import obnam.varint - - -# Magic cookie at the beginning of every block - -BLOCK_COOKIE = "blockhead\n" - - -# Version of the storage format - -FORMAT_VERSION = "1" - - -# Constants of object kinds - -_object_kinds = {} - -def _define_kind(code, name): - assert code not in _object_kinds - assert name not in _object_kinds.values() - _object_kinds[code] = name - return code - -FILEPART = _define_kind( 1, "FILEPART") -# object kind 2 used to be INODE, but it's been removed -GEN = _define_kind( 3, "GEN") -SIG = _define_kind( 4, "SIG") -HOST = _define_kind( 5, "HOST") -FILECONTENTS = _define_kind( 6, "FILECONTENTS") -FILELIST = _define_kind( 7, "FILELIST") -DELTA = _define_kind( 8, "DELTA") -DELTAPART = _define_kind( 9, "DELTAPART") -DIR = _define_kind(10, "DIR") -FILEGROUP = _define_kind(11, "FILEGROUP") - - -def kind_name(kind): - """Return a textual name for a numeric object kind""" - return _object_kinds.get(kind, "UNKNOWN") - - -def object_id_new(): - """Return a string that is a universally unique ID for an object""" - id = str(uuid.uuid4()) - logging.debug("Creating object id %s" % id) - return id - - -class StorageObject(object): - - """Implement a storage object in memory. - - There should be a sub-class of this class for every kind of storage - object. Sub-class may implement a constructor, but their construct - MUST accept a components= argument and pass it on to the base class - constructor. - - Additionally, sub-classes MUST define the "kind" attribute to refer - to the kind of storage object they are. This is required for - the StorageObjectFactory to work. - - """ - - kind = None - - def __init__(self, components=None, id=None): - assert components is not None or id is not None - if components: - self._components = components - else: - self._components = [] - - if id: - self.set_id(id) - if self.first_varint_by_kind(obnam.cmp.OBJKIND) is None and self.kind: - self.add(obnam.cmp.Component(obnam.cmp.OBJKIND, - obnam.varint.encode(self.kind))) - - def remove(self, kind): - """Remove all components of a given kind.""" - self._components = [c for c in self.get_components() - if c.get_kind() != kind] - - def add(self, c): - """Add a component""" - self._components.append(c) - - def replace(self, c): - """Remove any existing components of this kind, then add this one.""" - self.remove(c.get_kind()) - self.add(c) - - def get_kind(self): - """Return the kind of an object""" - return self.first_varint_by_kind(obnam.cmp.OBJKIND) - - def get_id(self): - """Return the identifier for an object""" - return self.first_string_by_kind(obnam.cmp.OBJID) - - def set_id(self, id): - """Set the identifier for this object.""" - self.replace(obnam.cmp.Component(obnam.cmp.OBJID, id)) - - def get_components(self): - """Return list of all components in an object""" - return self._components - - def find_by_kind(self, wanted_kind): - """Find all components of a desired kind inside this object""" - return [c for c in self.get_components() - if c.get_kind() == wanted_kind] - - def find_strings_by_kind(self, wanted_kind): - """Find all components of a desired kind, return string values""" - return [c.get_string_value() for c in self.find_by_kind(wanted_kind)] - - def find_varints_by_kind(self, wanted_kind): - """Find all components of a desired kind, return varint values""" - return [c.get_varint_value() for c in self.find_by_kind(wanted_kind)] - - def first_by_kind(self, wanted_kind): - """Find first component of a desired kind""" - for c in self.get_components(): - if c.get_kind() == wanted_kind: - return c - return None - - def first_string_by_kind(self, wanted_kind): - """Find string value of first component of a desired kind""" - c = self.first_by_kind(wanted_kind) - if c: - return c.get_string_value() - else: - return None - - def first_varint_by_kind(self, wanted_kind): - """Find string value of first component of a desired kind""" - c = self.first_by_kind(wanted_kind) - if c: - return c.get_varint_value() - else: - return None - - def encode(self): - """Encode an object as a string""" - return "".join(c.encode() for c in self.get_components()) - - -# This function is only used during testing. -def decode(encoded): - """Decode an object from a string""" - parser = obnam.cmp.Parser(encoded) - list = parser.decode_all() - return StorageObject(components=list) - - -class ObjectQueue: - - def __init__(self): - self.clear() - - def add(self, object_id, encoded_object): - """Add an encoded object into an object queue""" - self.queue.append((object_id, encoded_object)) - self.size += len(encoded_object) - - def clear(self): - """Remove all objects from an object queue""" - self.queue = [] - self.size = 0 - - def is_empty(self): - """Is an object queue empty?""" - return self.size == 0 - - def combined_size(self): - """Return the combined size of all objects in an object queue""" - return self.size - - def ids(self): - """Return identifiers for all the objects in the object queue""" - return [x[0] for x in self.queue] - - def as_block(self, blkid): - """Create a block from an object queue""" - logging.debug("Creating block %s" % blkid) - blkid = obnam.cmp.Component(obnam.cmp.BLKID, blkid) - objects = [obnam.cmp.Component(obnam.cmp.OBJECT, x[1]) - for x in self.queue] - return "".join([BLOCK_COOKIE] + - [c.encode() for c in [blkid] + objects]) - - -class BlockWithoutCookie(obnam.ObnamException): - - def __init__(self, block): - self._msg = ("Block does not start with cookie: %s" % - " ".join("%02x" % ord(c) for c in block[:32])) - - -class EmptyBlock(obnam.ObnamException): - - def __init__(self): - self._msg = "Block has no components." - - -def block_decode(block): - """Return list of decoded components in block, or None on error""" - if block.startswith(BLOCK_COOKIE): - parser = obnam.cmp.Parser(block, len(BLOCK_COOKIE)) - list = parser.decode_all() - if not list: - raise EmptyBlock() - return list - else: - raise BlockWithoutCookie(block) - - -class SignatureObject(StorageObject): - - kind = SIG - - def __init__(self, components=None, id=None, sigdata=None): - StorageObject.__init__(self, components=components, id=id) - if sigdata: - c = obnam.cmp.Component(obnam.cmp.SIGDATA, sigdata) - self.add(c) - - -class DeltaObject(StorageObject): - - kind = DELTA - - def __init__(self, components=None, id=None, deltapart_refs=None, - cont_ref=None, delta_ref=None): - StorageObject.__init__(self, components=components, id=id) - if deltapart_refs: - for deltapart_ref in deltapart_refs: - c = obnam.cmp.Component(obnam.cmp.DELTAPARTREF, deltapart_ref) - self.add(c) - if cont_ref: - c = obnam.cmp.Component(obnam.cmp.CONTREF, cont_ref) - self.add(c) - elif delta_ref: - c = obnam.cmp.Component(obnam.cmp.DELTAREF, delta_ref) - self.add(c) - - -class GenerationObject(StorageObject): - - kind = GEN - - def __init__(self, components=None, id=None, filelist_id=None, - dirrefs=None, filegrouprefs=None, start=None, end=None): - StorageObject.__init__(self, components=components, id=id) - if filelist_id: - self.add(obnam.cmp.Component(obnam.cmp.FILELISTREF, filelist_id)) - if dirrefs: - for ref in dirrefs: - self.add(obnam.cmp.Component(obnam.cmp.DIRREF, ref)) - if filegrouprefs: - for ref in filegrouprefs: - self.add(obnam.cmp.Component(obnam.cmp.FILEGROUPREF, ref)) - if start: - self.add(obnam.cmp.Component(obnam.cmp.GENSTART, - obnam.varint.encode(start))) - if end: - self.add(obnam.cmp.Component(obnam.cmp.GENEND, - obnam.varint.encode(end))) - - def get_filelistref(self): - return self.first_string_by_kind(obnam.cmp.FILELISTREF) - - def get_dirrefs(self): - return self.find_strings_by_kind(obnam.cmp.DIRREF) - - def get_filegrouprefs(self): - return self.find_strings_by_kind(obnam.cmp.FILEGROUPREF) - - def get_start_time(self): - return self.first_varint_by_kind(obnam.cmp.GENSTART) - - def get_end_time(self): - return self.first_varint_by_kind(obnam.cmp.GENEND) - - -# This is used only by testing. -def generation_object_decode(gen): - """Decode a generation object into objid, file list ref""" - - o = decode(gen) - return o.get_id(), \ - o.first_string_by_kind(obnam.cmp.FILELISTREF), \ - o.find_strings_by_kind(obnam.cmp.DIRREF), \ - o.find_strings_by_kind(obnam.cmp.FILEGROUPREF), \ - o.first_varint_by_kind(obnam.cmp.GENSTART), \ - o.first_varint_by_kind(obnam.cmp.GENEND) - - -class HostBlockObject(StorageObject): - - kind = HOST - - def __init__(self, components=None, host_id=None, gen_ids=None, - map_block_ids=None, contmap_block_ids=None): - StorageObject.__init__(self, components=components, id=host_id) - - if components is None: - c = obnam.cmp.Component(obnam.cmp.FORMATVERSION, FORMAT_VERSION) - self.add(c) - - if gen_ids: - for gen_id in gen_ids: - c = obnam.cmp.Component(obnam.cmp.GENREF, gen_id) - self.add(c) - - if map_block_ids: - for map_block_id in map_block_ids: - c = obnam.cmp.Component(obnam.cmp.MAPREF, map_block_id) - self.add(c) - - if contmap_block_ids: - for map_block_id in contmap_block_ids: - c = obnam.cmp.Component(obnam.cmp.CONTMAPREF, map_block_id) - self.add(c) - - def get_generation_ids(self): - """Return IDs of all generations for this host.""" - return self.find_strings_by_kind(obnam.cmp.GENREF) - - def get_map_block_ids(self): - """Return IDs of all map blocks for this host.""" - return self.find_strings_by_kind(obnam.cmp.MAPREF) - - def get_contmap_block_ids(self): - """Return IDs of all map blocks for this host.""" - return self.find_strings_by_kind(obnam.cmp.CONTMAPREF) - - def encode(self): - oq = ObjectQueue() - oq.add(self.get_id(), StorageObject.encode(self)) - return oq.as_block(self.get_id()) - - -def create_host_from_block(block): - """Decode a host block into a HostBlockObject""" - - list = block_decode(block) - - host_id = obnam.cmp.first_string_by_kind(list, obnam.cmp.BLKID) - - gen_ids = [] - map_ids = [] - contmap_ids = [] - - objparts = obnam.cmp.find_by_kind(list, obnam.cmp.OBJECT) - for objpart in objparts: - gen_ids += objpart.find_strings_by_kind(obnam.cmp.GENREF) - map_ids += objpart.find_strings_by_kind(obnam.cmp.MAPREF) - contmap_ids += objpart.find_strings_by_kind(obnam.cmp.CONTMAPREF) - - return HostBlockObject(host_id=host_id, gen_ids=gen_ids, - map_block_ids=map_ids, - contmap_block_ids=contmap_ids) - - -class DirObject(StorageObject): - - kind = DIR - - def __init__(self, components=None, id=None, name=None, stat=None, - dirrefs=None, filegrouprefs=None): - StorageObject.__init__(self, components=components, id=id) - if name: - self.add(obnam.cmp.Component(obnam.cmp.FILENAME, name)) - if stat: - self.add(obnam.cmp.create_stat_component(stat)) - if dirrefs: - for ref in dirrefs: - self.add(obnam.cmp.Component(obnam.cmp.DIRREF, ref)) - if filegrouprefs: - for ref in filegrouprefs: - self.add(obnam.cmp.Component(obnam.cmp.FILEGROUPREF, ref)) - - def get_name(self): - return self.first_by_kind(obnam.cmp.FILENAME).get_string_value() - - def get_stat(self): - st = self.first_by_kind(obnam.cmp.STAT) - return obnam.cmp.parse_stat_component(st) - - def get_dirrefs(self): - return [c.get_string_value() - for c in self.find_by_kind(obnam.cmp.DIRREF)] - - def get_filegrouprefs(self): - return [c.get_string_value() - for c in self.find_by_kind(obnam.cmp.FILEGROUPREF)] - - -class FileGroupObject(StorageObject): - - kind = FILEGROUP - - def add_file(self, name, stat, contref, sigref, deltaref): - c = obnam.filelist.create_file_component_from_stat(name, stat, - contref, sigref, - deltaref) - self.add(c) - - def get_files(self): - return self.find_by_kind(obnam.cmp.FILE) - - def get_file(self, name): - for file in self.get_files(): - fname = file.first_string_by_kind(obnam.cmp.FILENAME) - if name == fname: - return file - return None - - def get_string_from_file(self, file, kind): - return file.first_string_by_kind(kind) - - def get_stat_from_file(self, file): - c = file.first_by_kind(obnam.cmp.STAT) - return obnam.cmp.parse_stat_component(c) - - def get_names(self): - return [self.get_string_from_file(x, obnam.cmp.FILENAME) - for x in self.get_files()] - - def get_stat(self, filename): - return self.get_stat_from_file(self.get_file(filename)) - - def get_contref(self, filename): - return self.get_string_from_file(self.get_file(filename), - obnam.cmp.CONTREF) - - def get_sigref(self, filename): - return self.get_string_from_file(self.get_file(filename), - obnam.cmp.SIGREF) - - def get_deltaref(self, filename): - return self.get_string_from_file(self.get_file(filename), - obnam.cmp.DELTAREF) - - -class FileListObject(StorageObject): - - kind = FILELIST - - -class FilePartObject(StorageObject): - - kind = FILEPART - - -class FileContentsObject(StorageObject): - - kind = FILECONTENTS - - -class DeltaPartObject(StorageObject): - - kind = DELTAPART - - -class UnknownStorageObjectKind(obnam.ObnamException): - - def __init__(self, kind): - self._msg = "Unknown storage object kind %s" % kind - - -class StorageObjectFactory: - - """Create the right kind of Object subclass instance. - - Given a parsed component representing an object, figure out the type - of the object and instantiate the right sub-class of Object. - - """ - - def __init__(self): - self._classes = [] - for n, klass in globals().iteritems(): - if (type(klass) is type(StorageObject) and - klass != StorageObject and - issubclass(klass, StorageObject)): - self._classes.append(klass) - - def get_object(self, components): - kind = obnam.cmp.first_varint_by_kind(components, obnam.cmp.OBJKIND) - for klass in self._classes: - if klass.kind == kind: - return klass(components=components) - raise UnknownStorageObjectKind(kind) diff --git a/obnam/objTests.py b/obnam/objTests.py deleted file mode 100644 index f6c333be..00000000 --- a/obnam/objTests.py +++ /dev/null @@ -1,522 +0,0 @@ -# Copyright (C) 2006, 2007, 2008 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Unit tests for obnam.obj.""" - - -import os -import unittest - - -from obnam.obj import * -import obnam - - -class ObjectKindNameTests(unittest.TestCase): - - def test(self): - self.failUnlessEqual(kind_name(-12765), "UNKNOWN") - self.failUnlessEqual(kind_name(FILEPART), "FILEPART") - self.failUnlessEqual(kind_name(GEN), "GEN") - self.failUnlessEqual(kind_name(SIG), "SIG") - self.failUnlessEqual(kind_name(HOST), "HOST") - self.failUnlessEqual(kind_name(FILECONTENTS), "FILECONTENTS") - self.failUnlessEqual(kind_name(FILELIST), "FILELIST") - self.failUnlessEqual(kind_name(DELTA), "DELTA") - self.failUnlessEqual(kind_name(DELTAPART), "DELTAPART") - self.failUnlessEqual(kind_name(DIR), "DIR") - self.failUnlessEqual(kind_name(FILEGROUP), "FILEGROUP") - - -class ObjectIdTests(unittest.TestCase): - - def testHasCorrectProperties(self): - id = obnam.obj.object_id_new() - self.failUnlessEqual(type(id), type("")) - - -class StorageObjectTests(unittest.TestCase): - - components = [ - obnam.cmp.Component(obnam.cmp.OBJID, "pink"), - obnam.cmp.Component(obnam.cmp.OBJKIND, - obnam.varint.encode(obnam.obj.HOST)), - obnam.cmp.Component(0xdeadbeef, "hello"), - obnam.cmp.Component(0xcafebabe, "world"), - ] - - def setUp(self): - self.o = obnam.obj.StorageObject(components=self.components) - - def testInitializesComponentListCorrectlyFromComponents(self): - self.failUnlessEqual(len(self.o.get_components()), - len(self.components)) - - def testInitalizesIdCorrectlyFromComponents(self): - self.failUnlessEqual(self.o.get_id(), "pink") - - def testInitalizesKindCorrectlyFromComponents(self): - self.failUnlessEqual(self.o.get_kind(), obnam.obj.HOST) - - def testInitializesIdCorrectlyFromArguments(self): - o = obnam.obj.StorageObject(id="pink") - self.failUnlessEqual(o.get_id(), "pink") - - def testEncodesAndDecodesToIdenticalObject(self): - o = obnam.obj.StorageObject(components=self.components) - encoded = o.encode() - o2 = obnam.obj.decode(encoded) - encoded2 = o2.encode() - self.failUnlessEqual(encoded, encoded2) - - def testAddsComponentCorrectly(self): - c = obnam.cmp.Component(obnam.cmp.FILENAME, "pretty") - self.o.add(c) - self.failUnless(self.o.find_by_kind(obnam.cmp.FILENAME), [c]) - - -class ObjectQueueTests(unittest.TestCase): - - def testCreate(self): - oq = obnam.obj.ObjectQueue() - self.failUnlessEqual(oq.combined_size(), 0) - - def testAdd(self): - oq = obnam.obj.ObjectQueue() - oq.add("xx", "abc") - self.failUnlessEqual(oq.combined_size(), 3) - - def testSize(self): - oq = obnam.obj.ObjectQueue() - self.failUnless(oq.is_empty()) - oq.add("xx", "abc") - self.failUnlessEqual(oq.combined_size(), 3) - oq.add("yy", "abc") - self.failUnlessEqual(oq.combined_size(), 6) - - def testClear(self): - oq = obnam.obj.ObjectQueue() - oq_orig = oq - self.failUnless(oq.is_empty()) - oq.clear() - self.failUnlessEqual(oq.combined_size(), 0) - oq.add("xx", "abc") - self.failUnlessEqual(oq.combined_size(), 3) - oq.clear() - self.failUnless(oq.is_empty()) - self.failUnless(oq == oq_orig) - - -class BlockWithoutCookieTests(unittest.TestCase): - - def setUp(self): - self.e = obnam.obj.BlockWithoutCookie("\x01\x02\x03") - - def testIncludesBlockHexDumpInMessage(self): - self.failUnless("01 02 03" in str(self.e)) - - -class BlockCreateTests(unittest.TestCase): - - def testDecodeInvalidObject(self): - self.failUnlessRaises(obnam.obj.BlockWithoutCookie, - obnam.obj.block_decode, "pink") - - def testDecodeEmptyBlock(self): - self.failUnlessRaises(obnam.obj.EmptyBlock, - obnam.obj.block_decode, obnam.obj.BLOCK_COOKIE) - - def testEmptyObjectQueue(self): - oq = obnam.obj.ObjectQueue() - block = oq.as_block("blkid") - list = obnam.obj.block_decode(block) - self.failUnlessEqual( - obnam.cmp.first_string_by_kind(list, obnam.cmp.BLKID), - "blkid") - self.failUnlessEqual(len(list), 1) - self.failUnlessEqual(oq.ids(), []) - - def testObjectQueue(self): - o = obnam.obj.StorageObject(id="pink") - o.add(obnam.cmp.Component(2, "pretty")) - oq = obnam.obj.ObjectQueue() - oq.add("pink", o.encode()) - block = oq.as_block("blkid") - - list = obnam.obj.block_decode(block) - self.failUnlessEqual( - obnam.cmp.first_string_by_kind(list, obnam.cmp.BLKID), - "blkid") - self.failUnlessEqual(len(list), 2) - o2 = obnam.cmp.first_by_kind(list, obnam.cmp.OBJECT) - self.failUnlessEqual(o.first_string_by_kind(2), "pretty") - self.failUnlessEqual(oq.ids(), ["pink"]) - - -class GenerationTests(unittest.TestCase): - - def testEncodeDecode(self): - id1 = "pink" - fl1 = "pretty" - dirs1 = ["dir1", "dir2"] - fg1 = ["fg1", "fg2"] - start1 = 12765 - end1 = 37337 - gen = obnam.obj.GenerationObject(id=id1, filelist_id=fl1, - dirrefs=dirs1, filegrouprefs=fg1, - start=start1, end=end1).encode() - (id2, fl2, dirs2, fg2, start2, end2) = generation_object_decode(gen) - self.failUnlessEqual(id1, id2) - self.failUnlessEqual(fl1, fl2) - self.failUnlessEqual(dirs1, dirs2) - self.failUnlessEqual(fg1, fg2) - self.failUnlessEqual(start1, start2) - self.failUnlessEqual(end1, end2) - - def setUp(self): - self.gen = GenerationObject(id="objid", filelist_id="filelistref", - dirrefs=["dir2", "dir1"], - filegrouprefs=["fg2", "fg1"], - start=123, end=456) - - def testSetsFilelistRefCorrectly(self): - self.failUnlessEqual(self.gen.get_filelistref(), "filelistref") - - def testSetsDirRefsCorrectly(self): - self.failUnlessEqual(sorted(self.gen.get_dirrefs()), - sorted(["dir1", "dir2"])) - - def testSetsFileGroupRefsCorrectly(self): - self.failUnlessEqual(sorted(self.gen.get_filegrouprefs()), - sorted(["fg1", "fg2"])) - - def testSetsStartTimeCorrectly(self): - self.failUnlessEqual(self.gen.get_start_time(), 123) - - def testSetsEndTimeCorrectly(self): - self.failUnlessEqual(self.gen.get_end_time(), 456) - - -class OldStorageObjectTests(unittest.TestCase): - - def testCreateSignatureObject(self): - context = obnam.context.Context() - id = "pink" - sig = obnam.rsync.compute_signature(context, "Makefile") - sig_object = obnam.obj.SignatureObject(id=id, sigdata=sig) - encoded = sig_object.encode() - o = obnam.obj.decode(encoded) - self.failUnlessEqual(o.get_id(), "pink") - self.failUnlessEqual(o.get_kind(), obnam.obj.SIG) - self.failUnlessEqual(len(o.get_components()), 1+2) - self.failUnlessEqual(o.first_string_by_kind(obnam.cmp.SIGDATA), sig) - - def testCreateDeltaObjectWithContRef(self): - id = "pink" - deltapart_ref = "xyzzy" - do = obnam.obj.DeltaObject(id=id, deltapart_refs=[deltapart_ref], - cont_ref="pretty") - encoded = do.encode() - o = obnam.obj.decode(encoded) - self.failUnlessEqual(o.get_id(), "pink") - self.failUnlessEqual(o.get_kind(), obnam.obj.DELTA) - self.failUnlessEqual(len(o.get_components()), 2+2) - self.failUnlessEqual(o.first_string_by_kind(obnam.cmp.DELTAPARTREF), - deltapart_ref) - self.failUnlessEqual(o.first_string_by_kind(obnam.cmp.CONTREF), - "pretty") - - def testCreateDeltaObjectWithDeltaRef(self): - id = "pink" - deltapart_ref = "xyzzy" - do = obnam.obj.DeltaObject(id=id, deltapart_refs=[deltapart_ref], - delta_ref="pretty") - encoded = do.encode() - o = obnam.obj.decode(encoded) - self.failUnlessEqual(o.get_id(), "pink") - self.failUnlessEqual(o.get_kind(), obnam.obj.DELTA) - self.failUnlessEqual(len(o.get_components()), 2+2) - self.failUnlessEqual(o.first_string_by_kind(obnam.cmp.DELTAPARTREF), - deltapart_ref) - self.failUnlessEqual(o.first_string_by_kind(obnam.cmp.DELTAREF), - "pretty") - - -class HostBlockTests(unittest.TestCase): - - def testEncodeDecode(self): - host_id = "pink" - gen_ids = ["pretty", "beautiful"] - map_ids = ["black", "box"] - contmap_ids = ["tilu", "lii"] - host = obnam.obj.HostBlockObject(host_id=host_id, gen_ids=gen_ids, - map_block_ids=map_ids, - contmap_block_ids=contmap_ids) - host = host.encode() - self.failUnless(host.startswith(obnam.obj.BLOCK_COOKIE)) - host2 = obnam.obj.create_host_from_block(host) - self.failUnlessEqual(host_id, host2.get_id()) - self.failUnlessEqual(gen_ids, host2.get_generation_ids()) - self.failUnlessEqual(map_ids, host2.get_map_block_ids()) - self.failUnlessEqual(contmap_ids, host2.get_contmap_block_ids()) - - def testFormatVersion(self): - encoded = obnam.obj.HostBlockObject(host_id="pink", gen_ids=[], - map_block_ids=[], - contmap_block_ids=[]).encode() - decoded = obnam.obj.block_decode(encoded) - c = obnam.cmp.first_by_kind(decoded, obnam.cmp.OBJECT) - id = c.first_string_by_kind(obnam.cmp.OBJID) - self.failUnlessEqual(id, "pink") - ver = c.first_string_by_kind(obnam.cmp.FORMATVERSION) - self.failUnlessEqual(ver, "1") - - def make_block(self, gen_ids=None, map_ids=None, contmap_ids=None): - host = obnam.obj.HostBlockObject(host_id="pink", gen_ids=gen_ids, - map_block_ids=map_ids, - contmap_block_ids=contmap_ids) - return host.encode() - - def testReturnsEmtpyListForBlockWithNoGenerations(self): - block = self.make_block() - host = obnam.obj.create_host_from_block(block) - self.failUnlessEqual(host.get_generation_ids(), []) - - def testReturnsCorrectListForBlockWithSomeGenerations(self): - block = self.make_block(gen_ids=["pretty", "black"]) - host = obnam.obj.create_host_from_block(block) - self.failUnlessEqual(host.get_generation_ids(), ["pretty", "black"]) - - def testReturnsEmtpyListForBlockWithNoMaps(self): - block = self.make_block() - host = obnam.obj.create_host_from_block(block) - self.failUnlessEqual(host.get_map_block_ids(), []) - - def testReturnsCorrectListForBlockWithSomeMaps(self): - block = self.make_block(map_ids=["pretty", "black"]) - host = obnam.obj.create_host_from_block(block) - self.failUnlessEqual(host.get_map_block_ids(), ["pretty", "black"]) - - def testReturnsEmtpyListForBlockWithNoContentMaps(self): - block = self.make_block() - host = obnam.obj.create_host_from_block(block) - self.failUnlessEqual(host.get_contmap_block_ids(), []) - - def testReturnsCorrectListForBlockWithSomeContentMaps(self): - block = self.make_block(contmap_ids=["pretty", "black"]) - host = obnam.obj.create_host_from_block(block) - self.failUnlessEqual(host.get_contmap_block_ids(), - ["pretty", "black"]) - - -class GetComponentTests(unittest.TestCase): - - def setUp(self): - self.o = obnam.obj.StorageObject([ - obnam.cmp.Component(1, "pink"), - obnam.cmp.Component(2, "pretty"), - obnam.cmp.Component(3, "red"), - obnam.cmp.Component(3, "too"), - ]) - - def testGetByKind(self): - find = lambda t: \ - [c.get_string_value() for c in self.o.find_by_kind(t)] - self.failUnlessEqual(find(1), ["pink"]) - self.failUnlessEqual(find(2), ["pretty"]) - self.failUnlessEqual(find(3), ["red", "too"]) - self.failUnlessEqual(find(0), []) - - def testGetStringsByKind(self): - find = lambda t: self.o.find_strings_by_kind(t) - self.failUnlessEqual(find(1), ["pink"]) - self.failUnlessEqual(find(2), ["pretty"]) - self.failUnlessEqual(find(3), ["red", "too"]) - self.failUnlessEqual(find(0), []) - - def helper(self, wanted_kind): - c = self.o.first_by_kind(wanted_kind) - if c: - return c.get_string_value() - else: - return None - - def testGetFirstByKind(self): - self.failUnlessEqual(self.helper(1), "pink") - self.failUnlessEqual(self.helper(2), "pretty") - self.failUnlessEqual(self.helper(3), "red") - self.failUnlessEqual(self.helper(0), None) - - def testGetFirstStringByKind(self): - find = lambda t: self.o.first_string_by_kind(t) - self.failUnlessEqual(find(1), "pink") - self.failUnlessEqual(find(2), "pretty") - self.failUnlessEqual(find(3), "red") - self.failUnlessEqual(find(0), None) - - def testGetVarintsByKind(self): - numbers = range(1024) - components = [obnam.cmp.Component(0, obnam.varint.encode(i)) - for i in numbers] - o = obnam.obj.StorageObject(components=components) - self.failUnlessEqual(o.find_varints_by_kind(0), numbers) - - def testGetFirstSVarintByKind(self): - numbers = range(0, 1024, 17) - components = [obnam.cmp.Component(i, obnam.varint.encode(i)) - for i in numbers] - o = obnam.obj.StorageObject(components=components) - for i in numbers: - self.failUnlessEqual(o.first_varint_by_kind(i), i) - self.failUnlessEqual(o.first_varint_by_kind(-1), None) - - -class DirObjectTests(unittest.TestCase): - - def setUp(self): - self.stat = os.stat(".") - self.dir = DirObject(id="pink", name="name", stat=self.stat, - dirrefs=["dir2", "dir1"], - filegrouprefs=["fg2", "fg1"]) - - def testSetsNameCorrectly(self): - self.failUnlessEqual(self.dir.get_name(), "name") - - def testSetsStatCorrectly(self): - self.failUnlessEqual(self.dir.get_stat(), self.stat) - - def testSetsDirrefsCorrectly(self): - self.failUnlessEqual(sorted(self.dir.get_dirrefs()), - sorted(["dir1", "dir2"])) - - def testSetsFilegrouprefsCorrectly(self): - self.failUnlessEqual(sorted(self.dir.get_filegrouprefs()), - sorted(["fg1", "fg2"])) - - -class FileGroupObjectTests(unittest.TestCase): - - def setUp(self): - stat = os.stat("README") - self.files = [ - ("pink", stat, "pink_contref", "pink_sigref", None), - ("pretty", stat, None, "pretty_sigref", "pretty_deltaref"), - ("black", stat, "black_contref", "black_sigref", None), - ] - self.names = [x[0] for x in self.files] - self.fg = FileGroupObject(id="objid") - for name, stat, contref, sigref, deltaref in self.files: - self.fg.add_file(name, stat, contref, sigref, deltaref) - - def testReturnsNoneIfSoughtFileNotFound(self): - self.failUnlessEqual(self.fg.get_file("xxx"), None) - - def testSetsNamesCorrectly(self): - self.failUnlessEqual(sorted(self.fg.get_names()), sorted(self.names)) - - def testSetsStatCorrectly(self): - for x in self.files: - self.failUnlessEqual(x[1], self.fg.get_stat(x[0])) - - def testSetsContentRefCorrectly(self): - for x in self.files: - self.failUnlessEqual(x[2], self.fg.get_contref(x[0])) - - def testSetsSigRefCorrectly(self): - for x in self.files: - self.failUnlessEqual(x[3], self.fg.get_sigref(x[0])) - - def testSetsDeltaRefCorrectly(self): - for x in self.files: - self.failUnlessEqual(x[4], self.fg.get_deltaref(x[0])) - - -class StorageObjectFactoryTests(unittest.TestCase): - - def setUp(self): - self.factory = StorageObjectFactory() - - def make_component(self, objkind): - list = [] - - list.append(obnam.cmp.Component(obnam.cmp.OBJID, "objid")) - list.append(obnam.cmp.Component(obnam.cmp.OBJKIND, - obnam.varint.encode(objkind))) - - - if objkind == obnam.obj.GEN: - list.append(obnam.cmp.Component(obnam.cmp.GENSTART, - obnam.varint.encode(1))) - list.append(obnam.cmp.Component(obnam.cmp.GENEND, - obnam.varint.encode(2))) - - return list - - def make_object(self, objkind): - return self.factory.get_object(self.make_component(objkind)) - - def testCreatesFilePartObjectCorrectly(self): - o = self.make_object(obnam.obj.FILEPART) - self.failUnlessEqual(type(o), obnam.obj.FilePartObject) - - def testCreatesGenerationObjectCorrectly(self): - o = self.make_object(obnam.obj.GEN) - self.failUnlessEqual(type(o), obnam.obj.GenerationObject) - self.failUnlessEqual(o.get_start_time(), 1) - self.failUnlessEqual(o.get_end_time(), 2) - - def testCreatesSignatureObjectCorrectly(self): - o = self.make_object(obnam.obj.SIG) - self.failUnlessEqual(type(o), obnam.obj.SignatureObject) - - def testCreatesHostBlockObjectCorrectly(self): - o = self.make_object(obnam.obj.HOST) - self.failUnlessEqual(type(o), obnam.obj.HostBlockObject) - - def testCreatesHostBlockObjectCorrectlyFromParsedBlock(self): - host = obnam.obj.HostBlockObject(host_id="pink") - block = host.encode() - host2 = obnam.obj.create_host_from_block(block) - self.failUnlessEqual(host.get_id(), host2.get_id()) - - def testCreatesFileContentsObjectCorrectly(self): - o = self.make_object(obnam.obj.FILECONTENTS) - self.failUnlessEqual(type(o), obnam.obj.FileContentsObject) - - def testCreatesFileListObjectCorrectly(self): - o = self.make_object(obnam.obj.FILELIST) - self.failUnlessEqual(type(o), obnam.obj.FileListObject) - - def testCreatesDeltaObjectCorrectly(self): - o = self.make_object(obnam.obj.DELTA) - self.failUnlessEqual(type(o), obnam.obj.DeltaObject) - - def testCreatesDeltaPartObjectCorrectly(self): - o = self.make_object(obnam.obj.DELTAPART) - self.failUnlessEqual(type(o), obnam.obj.DeltaPartObject) - - def testCreatesDirObjectCorrectly(self): - o = self.make_object(obnam.obj.DIR) - self.failUnlessEqual(type(o), obnam.obj.DirObject) - - def testCreatesFileGroupObjectCorrectly(self): - o = self.make_object(obnam.obj.FILEGROUP) - self.failUnlessEqual(type(o), obnam.obj.FileGroupObject) - - def testRaisesExceptionForUnknownObjectKind(self): - self.failUnlessRaises(obnam.ObnamException, - self.make_object, 0xdeadbeef) diff --git a/obnam/oper.py b/obnam/oper.py deleted file mode 100644 index bc4d999c..00000000 --- a/obnam/oper.py +++ /dev/null @@ -1,111 +0,0 @@ -# Copyright (C) 2008 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Obnam operations.""" - - -import inspect - -import obnam - - -class Operation: - - """A user-visible operation for the Obnam backup program. - - User-visible operations are things like "make a backup", "restore - files from a backup", "list backup generations", and so on. This - base class abstracts the operations so that they can be easily - implemented. Associated with this is the OperationFactory class, - which will automatically instantiate the right Operation subclass - based on command line arguments. For this to work, subclasses - MUST set the 'name' attribute to the command word the user will - use on the command line. - - """ - - name = None - - def __init__(self, app, args): - self._app = app - self._args = args - - def get_application(self): - """Return application this operation instance will use.""" - return self._app - - def get_args(self): - """Return arguments this operation instance will use.""" - return self._args - - def do_it(self, args): - """Do the operation. - - 'args' will contain all command line arguments /except/ the - command word. There's no point in passing that to this class, - since we already know it must be our name. - - Subclasses should override this method with something that - is actually useful. The default implementation does nothing. - - """ - - -class NoArguments(obnam.ObnamException): - - def __init__(self): - self._msg = ("Command line argument list is empty. " - "Need at least the operation name.") - - -class OperationNotFound(obnam.ObnamException): - - def __init__(self, name): - self._msg = "Unknown operation %s" % name - - -class OperationFactory: - - """Instantiate Operation subclasses based on command line arguments.""" - - def __init__(self, app): - self._app = app - - def find_operations(self): - """Find operations defined in obnam.""" - list = [] - for name in dir(obnam): - x = getattr(obnam, name) - if inspect.isclass(x) and issubclass(x, Operation): - list.append(x) - return list - - def get_operation(self, args): - """Instantiate the right operation given the command line. - - If there is no corresponding operation, raise an error. - - """ - - if not args: - raise NoArguments() - - for oper in self.find_operations(): - if oper.name == args[0]: - return oper(self._app, args[1:]) - - raise OperationNotFound(args[0]) diff --git a/obnam/operTests.py b/obnam/operTests.py deleted file mode 100644 index a2edaf9c..00000000 --- a/obnam/operTests.py +++ /dev/null @@ -1,63 +0,0 @@ -# Copyright (C) 2008 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Tests for obnam.oper.""" - - -import unittest - -import obnam - - -class OperationTests(unittest.TestCase): - - def setUp(self): - context = obnam.context.Context() - self.app = obnam.Application(context) - self.args = ["pink", "pretty"] - self.op = obnam.Operation(self.app, self.args) - - def testNameIsNone(self): - self.failUnlessEqual(self.op.name, None) - - def testHasRightApplication(self): - self.failUnlessEqual(self.op.get_application(), self.app) - - def testHasRightArgs(self): - self.failUnlessEqual(self.op.get_args(), self.args) - - -class OperationFactoryTests(unittest.TestCase): - - def setUp(self): - context = obnam.context.Context() - self.app = obnam.Application(context) - self.factory = obnam.OperationFactory(self.app) - - def testFindsOperations(self): - self.failUnless(self.factory.find_operations()) - - def testRaisesErrorForNoArguments(self): - self.failUnlessRaises(obnam.ObnamException, - self.factory.get_operation, []) - - def testRaisesErrorForUnknownArgument(self): - self.failUnlessRaises(obnam.ObnamException, - self.factory.get_operation, ["pink"]) - - def testFindsBackupOperation(self): - self.failUnless(self.factory.get_operation(["backup"])) diff --git a/obnam/oper_backup.py b/obnam/oper_backup.py deleted file mode 100644 index ec59a6b2..00000000 --- a/obnam/oper_backup.py +++ /dev/null @@ -1,56 +0,0 @@ -# Copyright (C) 2008 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""A backup operation for Obnam.""" - - -import logging - -import obnam - - -class Backup(obnam.Operation): - - """Backup files the user has specified.""" - - name = "backup" - - def do_it(self, roots): - logging.info("Starting backup") - logging.info("Getting and decoding host block") - app = self.get_application() - host = app.load_host() - app.get_store().load_maps() - # We don't need to load in file data, therefore we don't load - # the content map blocks. - - old_gen_ids = host.get_generation_ids() - if old_gen_ids: - prev_gen = app.get_store().get_object(old_gen_ids[-1]) - app.set_previous_generation(prev_gen) - filelist_id = prev_gen.get_filelistref() - if filelist_id: - filelist = obnam.filelist.Filelist() - o = app.get_store().get_object(filelist_id) - filelist.from_object(o) - app.set_prevgen_filelist(filelist) - - gen = app.backup(roots) - - app.get_store().commit_host_block([gen]) - - logging.info("Backup done") diff --git a/obnam/oper_forget.py b/obnam/oper_forget.py deleted file mode 100644 index 40d98f41..00000000 --- a/obnam/oper_forget.py +++ /dev/null @@ -1,62 +0,0 @@ -# Copyright (C) 2008 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Operation to forget generations from backup store.""" - - -import logging - -import obnam - - -class Forget(obnam.Operation): - - """Forget specified generations.""" - - name = "forget" - - def do_it(self, forgettable_ids): - logging.debug("Forgetting generations: %s" % " ".join(forgettable_ids)) - - logging.debug("forget: Loading and decoding host block") - app = self.get_application() - context = app.get_context() - host = app.load_host() - gen_ids = host.get_generation_ids() - map_block_ids = host.get_map_block_ids() - contmap_block_ids = host.get_contmap_block_ids() - - app.get_store().load_maps() - app.get_store().load_content_maps() - - logging.debug("forget: Forgetting each id") - for id in forgettable_ids: - if id in gen_ids: - gen_ids.remove(id) - else: - print "Warning: Generation", id, "is not known" - - logging.debug("forget: Uploading new host block") - host_id = context.config.get("backup", "host-id") - host2 = obnam.obj.HostBlockObject(host_id=host_id, gen_ids=gen_ids, - map_block_ids=map_block_ids, - contmap_block_ids=contmap_block_ids) - block = host2.encode() - obnam.io.upload_host_block(context, block) - - logging.debug("forget: Forgetting garbage") - obnam.io.collect_garbage(context, block) diff --git a/obnam/oper_generations.py b/obnam/oper_generations.py deleted file mode 100644 index 3ca1a293..00000000 --- a/obnam/oper_generations.py +++ /dev/null @@ -1,52 +0,0 @@ -# Copyright (C) 2008 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Operation to list generations in a backup store.""" - - -import logging - -import obnam - - -class ListGenerations(obnam.Operation): - - """List generations in the store.""" - - name = "generations" - - def do_it(self, *ignored): - app = self.get_application() - host = app.load_host() - context = app.get_context() - gentimes = context.config.getboolean("backup", "generation-times") - if gentimes: - app.load_maps() - - gen_ids = host.get_generation_ids() - for id in gen_ids: - if gentimes: - gen = obnam.io.get_object(context, id) - if not gen: - logging.warning("Can't find info about generation %s" % id) - else: - start = gen.get_start_time() - end = gen.get_end_time() - print id, obnam.format.timestamp(start), "--", \ - obnam.format.timestamp(end) - else: - print id diff --git a/obnam/oper_restore.py b/obnam/oper_restore.py deleted file mode 100644 index a4a161e1..00000000 --- a/obnam/oper_restore.py +++ /dev/null @@ -1,220 +0,0 @@ -# Copyright (C) 2008 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Operation to restore from backup.""" - - -import logging -import os -import stat - -import obnam - - -class UnknownGeneration(obnam.ObnamException): - - def __init__(self, gen_id): - self._msg = "Can't find generation %s" % gen_id - - -class Restore(obnam.Operation): - - """Restore specified files (or all) from a specified generation.""" - - name = "restore" - - def hardlink_key(self, st): - """Compute key into hardlink lookup table from stat result""" - return "%d/%d" % (st.st_ino, st.st_dev) - - def create_filesystem_object(self, hardlinks, full_pathname, inode): - context = self.get_application().get_context() - logging.debug("Creating filesystem object %s" % full_pathname) - stat_component = inode.first_by_kind(obnam.cmp.STAT) - st = obnam.cmp.parse_stat_component(stat_component) - mode = st.st_mode - - if st.st_nlink > 1 and not stat.S_ISDIR(mode): - key = self.hardlink_key(st) - if key in hardlinks: - existing_link = hardlinks[key] - os.link(existing_link, full_pathname) - return - else: - hardlinks[key] = full_pathname - - if stat.S_ISDIR(mode): - if not os.path.exists(full_pathname): - os.makedirs(full_pathname, 0700) - elif stat.S_ISREG(mode): - basedir = os.path.dirname(full_pathname) - if not os.path.exists(basedir): - os.makedirs(basedir, 0700) - fd = os.open(full_pathname, os.O_WRONLY | os.O_CREAT, 0) - cont_id = inode.first_string_by_kind(obnam.cmp.CONTREF) - if cont_id: - obnam.io.copy_file_contents(context, fd, cont_id) - else: - delta_id = inode.first_string_by_kind(obnam.cmp.DELTAREF) - obnam.io.reconstruct_file_contents(context, fd, delta_id) - os.close(fd) - - def restore_requested(self, files, pathname): - """Return True, if pathname should be restored""" - - # If there is no explicit file list, restore everything. - if not files: - return True - - # If the pathname is specified explicitly, restore it. - if pathname in files: - return True - - # Otherwise, if there's an explicitly specified filename that is a - # prefix of directory parts in the pathname, restore it. That is, - # if files is ["foo/bar"], then restore "foo/bar/baz", but not - # "foo/barbell". - for x in files: - if pathname.startswith(x) and x.endswith(os.sep): - return True - if pathname.startswith(x + os.sep): - return True - - # Nope, don't restore it. - return False - - def restore_single_item(self, hardlinks, target, pathname, inode): - logging.debug("Restoring %s" % pathname) - - if pathname.startswith(os.sep): - pathname = "." + pathname - full_pathname = os.path.join(target, pathname) - - self.create_filesystem_object(hardlinks, full_pathname, inode) - return full_pathname - - def fix_permissions(self, list): - logging.debug("Fixing permissions") - list.sort() - for full_pathname, inode in list: - obnam.io.set_inode(full_pathname, inode) - - def restore_from_filelist(self, target, fl, files): - logging.debug("Restoring files from FILELIST") - list = [] - hardlinks = {} - - for c in fl.find_by_kind(obnam.cmp.FILE): - pathname = c.first_string_by_kind(obnam.cmp.FILENAME) - - if not self.restore_requested(files, pathname): - logging.debug("Restore of %s not requested" % pathname) - continue - - full_pathname = self.restore_single_item(hardlinks, target, - pathname, c) - list.append((full_pathname, c)) - - self.fix_permissions(list) - - def restore_from_filegroups(self, target, hardlinks, list, parent, - filegrouprefs, files): - for ref in filegrouprefs: - fg = obnam.io.get_object(self.app.get_context(), ref) - if not fg: - logging.warning("Cannot find FILEGROUP object %s" % ref) - else: - for name in fg.get_names(): - if parent: - name2 = os.path.join(parent, name) - if self.restore_requested(files, name2): - file = fg.get_file(name) - full_pathname = self.restore_single_item(hardlinks, - target, name2, file) - list.append((full_pathname, file)) - else: - logging.debug("Restore of %s not requested" % name2) - - def restore_from_dirs(self, target, hardlinks, list, parent, dirrefs, - files): - for ref in dirrefs: - dir = obnam.io.get_object(self.app.get_context(), ref) - if not dir: - logging.warning("Cannot find DIR object %s" % ref) - else: - name = dir.get_name() - if parent: - name = os.path.join(parent, name) - if self.restore_requested(files, name): - st = dir.first_by_kind(obnam.cmp.STAT) - st = obnam.cmp.parse_stat_component(st) - file = \ - obnam.filelist.create_file_component_from_stat( - dir.get_name(), st, None, None, None) - full_pathname = self.restore_single_item(hardlinks, - target, name, - file) - list.append((full_pathname, file)) - self.restore_from_filegroups(target, hardlinks, list, - name, - dir.get_filegrouprefs(), - files) - self.restore_from_dirs(target, hardlinks, list, name, - dir.get_dirrefs(), files) - else: - logging.debug("Restore of %s not requested" % name) - - def restore_from_dirs_and_filegroups(self, target, gen, files): - hardlinks = {} - list = [] - self.restore_from_filegroups(target, hardlinks, list, None, - gen.get_filegrouprefs(), files) - self.restore_from_dirs(target, hardlinks, list, - None, gen.get_dirrefs(), files) - self.fix_permissions(list) - - def do_it(self, args): - gen_id = args[0] - files = args[1:] - logging.debug("Restoring generation %s" % gen_id) - logging.debug("Restoring files: %s" % ", ".join(files)) - - self.app = app = self.get_application() - context = app.get_context() - host = app.load_host() - - app.get_store().load_maps() - app.get_store().load_content_maps() - - logging.debug("Getting generation object") - gen = obnam.io.get_object(context, gen_id) - if gen is None: - raise UnknownGeneration(gen_id) - - target = context.config.get("backup", "target-dir") - logging.debug("Restoring files under %s" % target) - - fl_id = gen.get_filelistref() - if fl_id: - logging.debug("Getting list of files in generation") - fl = obnam.io.get_object(context, fl_id) - if not fl: - logging.warning("Cannot find file list object %s" % fl_id) - else: - self.restore_from_filelist(target, fl, files) - else: - self.restore_from_dirs_and_filegroups(target, gen, files) diff --git a/obnam/oper_show_generations.py b/obnam/oper_show_generations.py deleted file mode 100644 index e22d9510..00000000 --- a/obnam/oper_show_generations.py +++ /dev/null @@ -1,111 +0,0 @@ -# Copyright (C) 2008 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Operation to show contents of generations in a backup store.""" - - -import logging -import sys -import time - -import obnam - - -class ShowGenerations(obnam.Operation): - - """Show contents of generations specified by user.""" - - name = "show-generations" - - def format_period(self, start, end): - """Format time period in a format that is easy to read for humans""" - start = time.localtime(start) - end = time.localtime(end) - if start[0:3] == end[0:3]: - return "%s %s - %s" % \ - (time.strftime("%Y-%m-%d", start), - time.strftime("%H:%M", start), - time.strftime("%H:%M", end)) - else: - return "%s %s - %s %s" % \ - (time.strftime("%Y-%m-%d", start), - time.strftime("%H:%M", start), - time.strftime("%Y-%m-%d", end), - time.strftime("%H:%M", end)) - - def format_generation_period(self, gen): - """Return human readable string to show the period of a generation""" - start_time = gen.get_start_time() - end_time = gen.get_end_time() - return self.format_period(start_time, end_time) - - def show_filelist(self, fl): - pretty = True - list = [] - for c in fl.find_by_kind(obnam.cmp.FILE): - filename = c.first_string_by_kind(obnam.cmp.FILENAME) - if pretty: - list.append((obnam.format.inode_fields(c), filename)) - else: - print " ".join(obnam.format.inode_fields(c)), filename - - if pretty: - widths = [] - for fields, _ in list: - for i in range(len(fields)): - if i >= len(widths): - widths.append(0) - widths[i] = max(widths[i], len(fields[i])) - - for fields, filename in list: - cols = [] - for i in range(len(widths)): - if i < len(fields): - x = fields[i] - else: - x = "" - cols.append("%*s" % (widths[i], x)) - print " ", " ".join(cols), filename - - def show_dirs_and_filegroups(self, context, gen): - listing = obnam.format.Listing(context, sys.stdout) - listing.walk(listing.get_objects(gen.get_dirrefs()), - listing.get_objects(gen.get_filegrouprefs())) - - def do_it(self, gen_ids): - app = self.get_application() - context = app.get_context() - host = app.load_host() - app.get_store().load_maps() - - for gen_id in gen_ids: - gen = obnam.io.get_object(context, gen_id) - if not gen: - logging.warning("Can't find generation %s" % gen_id) - continue - print "Generation: %s %s" % (gen_id, - self.format_generation_period(gen)) - - fl_id = gen.get_filelistref() - if fl_id: - fl = obnam.io.get_object(context, fl_id) - if fl: - self.show_filelist(fl) - else: - logging.warning("Can't find file list %s" % fl_id) - else: - self.show_dirs_and_filegroups(context, gen) diff --git a/obnam/progress.py b/obnam/progress.py deleted file mode 100644 index e8cd7ad2..00000000 --- a/obnam/progress.py +++ /dev/null @@ -1,87 +0,0 @@ -# Copyright (C) 2007 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Progress reporting for Obnam""" - - -import sys -import time - - -class ProgressReporter: - - initial_values = (("total_files", 0), ("uploaded", 0), ("downloaded", 0), - ("current_action", None)) - - def __init__(self, config): - self.config = config - self.dict = dict(self.initial_values) - self.prev_output = "" - self.timestamp = 0 - self.min_time = 1.0 # seconds - - def reporting_is_allowed(self): - return self.config.getboolean("backup", "report-progress") - - def clear(self): - if self.reporting_is_allowed(): - sys.stdout.write("\r" + " " * len(self.prev_output) + "\r") - sys.stdout.flush() - - def update(self, key, value): - self.dict[key] = value - if self.reporting_is_allowed(): - now = time.time() - if now - self.timestamp >= self.min_time: - self.clear() - parts = [] - parts.append("Files: %(total_files)d" % self.dict) - parts.append("up: %d MB" % - (self.dict["uploaded"] / 1024 / 1024)) - parts.append("down: %d MB" % - (self.dict["downloaded"] / 1024 / 1024)) - current = self.dict["current_action"] - if current: - parts.append("now:") - part_one = ", ".join(parts) - progress = "%s%s" % (part_one, - current[-(79-len(part_one)):]) - else: - progress = ", ".join(parts) - sys.stdout.write(progress) - sys.stdout.flush() - self.prev_output = progress - self.timestamp = now - - def update_total_files(self, total_files): - self.update("total_files", total_files) - - def update_uploaded(self, uploaded): - self.update("uploaded", uploaded) - - def update_downloaded(self, downloaded): - self.update("downloaded", downloaded) - - def update_current_action(self, current_action): - self.update("current_action", current_action) - - def final_report(self): - self.timestamp = 0 - self.update_current_action(None) - if self.reporting_is_allowed(): - sys.stdout.write("\n") - sys.stdout.flush() diff --git a/obnam/rsync.py b/obnam/rsync.py deleted file mode 100644 index cd083677..00000000 --- a/obnam/rsync.py +++ /dev/null @@ -1,133 +0,0 @@ -# Copyright (C) 2006, 2007, 2008 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Rsync stuff for making backups""" - - -import logging -import os -import subprocess -import tempfile - - -import obnam - - -class UnknownCommand(obnam.ObnamException): - - def __init__(self, argv, errno): - self._msg = "Unknown command (error %d): %s" % (errno, " ".join(argv)) - - -class CommandFailure(obnam.ObnamException): - - def __init__(self, argv, returncode, stderr): - self._msg = "Command failed: %s\nError code: %d\n%s" % \ - (" ".join(argv), - returncode, - obnam.gpg.indent_string(stderr)) - - -def run_command(argv, stdin=None, stdout=None, stderr=None): - # We let stdin be None unless explicitly specified. - if stdout is None: - stdout = subprocess.PIPE - if stderr is None: - stderr = subprocess.PIPE - - try: - p = subprocess.Popen(argv, stdin=stdin, stdout=stdout, stderr=stderr) - except os.error, e: - raise UnknownCommand(argv, e.errno) - return p - - -def compute_signature(context, filename): - """Compute an rsync signature for 'filename'""" - - argv = [context.config.get("backup", "odirect-pipe"), - context.config.get("backup", "odirect-read"), - filename, - "rdiff", "--", "signature", "-", "-"] - p = run_command(argv) - stdout_data, stderr_data = p.communicate() - - if p.returncode == 0: - return stdout_data - else: - raise CommandFailure(argv, p.returncode, stderr_data) - - -def compute_delta(context, signature, filename): - """Compute an rsync delta for a file, given signature of old version - - Return list of ids of DELTAPART objects. - - """ - - (fd, tempname) = tempfile.mkstemp() - os.write(fd, signature) - os.close(fd) - - argv = [context.config.get("backup", "odirect-pipe"), - context.config.get("backup", "odirect-read"), - filename, - "rdiff", "--", "delta", tempname, "-", "-"] - p = run_command(argv) - - list = [] - block_size = context.config.getint("backup", "block-size") - while True: - data = p.stdout.read(block_size) - if not data: - break - id = obnam.obj.object_id_new() - o = obnam.obj.DeltaPartObject(id=id) - o.add(obnam.cmp.Component(obnam.cmp.DELTADATA, data)) - o = o.encode() - obnam.io.enqueue_object(context, context.content_oq, - context.contmap, id, o, False) - list.append(id) - exit = p.wait() - os.remove(tempname) - if exit == 0: - return list - else: - raise CommandFailure(argv, exit, "") - - -def apply_delta(context, basis, deltaparts, new, open=os.open, cmd="rdiff"): - """Apply an rsync delta for a file, to get a new version of it""" - - devnull = open("/dev/null", os.O_WRONLY) - - argv = [cmd, "--", "patch", basis, "-", new] - - p = run_command(argv, stdin=subprocess.PIPE, stdout=devnull) - - ret = True - for id in deltaparts: - deltapart = obnam.io.get_object(context, id) - deltadata = deltapart.first_string_by_kind(obnam.cmp.DELTADATA) - p.stdin.write(deltadata) - - stdout_data, stderr_data = p.communicate(input="") - os.close(devnull) - if p.returncode != 0: - raise CommandFailure(argv, p.returncode, stderr_data) - else: - return ret diff --git a/obnam/rsyncTests.py b/obnam/rsyncTests.py deleted file mode 100644 index 8209a833..00000000 --- a/obnam/rsyncTests.py +++ /dev/null @@ -1,163 +0,0 @@ -# Copyright (C) 2006 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Unit tests for obnam.rsync.""" - - -import os -import shutil -import tempfile -import unittest - - -import obnam - - -class RsyncTests(unittest.TestCase): - - def testSignature(self): - (fd, empty_file) = tempfile.mkstemp() - os.close(fd) - - context = obnam.context.Context() - sig = obnam.rsync.compute_signature(context, empty_file) - os.system("rdiff signature %s empty_file.sig.temp" % empty_file) - f = file("empty_file.sig.temp") - data = f.read() - f.close() - self.failUnlessEqual(sig, data) - os.remove("empty_file.sig.temp") - os.remove(empty_file) - - def testSignatureRaisesExceptionIfCommandIsUnknown(self): - (fd, empty_file) = tempfile.mkstemp() - os.close(fd) - - context = obnam.context.Context() - context.config.set("backup", "odirect-pipe", "/notexist") - self.failUnlessRaises(obnam.rsync.UnknownCommand, - obnam.rsync.compute_signature, - context, empty_file) - - os.remove(empty_file) - - def testSignatureRaisesExceptionIfCommandFails(self): - (fd, empty_file) = tempfile.mkstemp() - os.close(fd) - - context = obnam.context.Context() - context.config.set("backup", "odirect-pipe", "false") - self.failUnlessRaises(obnam.rsync.CommandFailure, - obnam.rsync.compute_signature, - context, empty_file) - - os.remove(empty_file) - - def testDeltaRaisesExceptionIfCommandFails(self): - (fd, empty_file) = tempfile.mkstemp() - os.close(fd) - - context = obnam.context.Context() - context.config.set("backup", "odirect-pipe", "false") - self.failUnlessRaises(obnam.rsync.CommandFailure, - obnam.rsync.compute_delta, - context, "pink", empty_file) - - os.remove(empty_file) - - def testEmptyDelta(self): - (fd, empty_file) = tempfile.mkstemp() - os.close(fd) - - context = obnam.context.Context() - context.cache = obnam.cache.Cache(context.config) - context.be = obnam.backend.init(context.config, context.cache) - - sig = obnam.rsync.compute_signature(context, empty_file) - deltapart_ids = obnam.rsync.compute_delta(context, sig, empty_file) - - os.remove(empty_file) - self.failUnlessEqual(len(deltapart_ids), 1) - - obnam.io.flush_all_object_queues(context) - delta = obnam.io.get_object(context, deltapart_ids[0]) - self.failIfEqual(delta, None) - delta = delta.first_string_by_kind(obnam.cmp.DELTADATA) - - # The hex string below is what rdiff outputs. I've no idea what - # the format is, and the empty delta is expressed differently - # in different situations. Eventually we'll move away from rdiff, - # and then this should become clearer. --liw, 2006-09-24 - self.failUnlessEqual(delta, "rs\x026\x00") - - shutil.rmtree(context.config.get("backup", "store")) - - def create_file(self, contents): - (fd, filename) = tempfile.mkstemp() - os.write(fd, contents) - os.close(fd) - return filename - - def testApplyDelta(self): - context = obnam.context.Context() - context.cache = obnam.cache.Cache(context.config) - context.be = obnam.backend.init(context.config, context.cache) - - first = self.create_file("pink") - second = self.create_file("pretty") - sig = obnam.rsync.compute_signature(context, first) - deltapart_ids = obnam.rsync.compute_delta(context, sig, second) - obnam.io.flush_all_object_queues(context) - - (fd, third) = tempfile.mkstemp() - os.close(fd) - obnam.rsync.apply_delta(context, first, deltapart_ids, third) - - f = file(third, "r") - third_data = f.read() - f.close() - - self.failUnlessEqual(third_data, "pretty") - - shutil.rmtree(context.config.get("backup", "store")) - - def raise_os_error(self, *args): - raise os.error("foo") - - def testApplyDeltaWithoutDevNull(self): - self.failUnlessRaises(os.error, - obnam.rsync.apply_delta, - None, None, None, None, - open=self.raise_os_error) - - def testApplyDeltaRaisesExceptionWhenCommandFails(self): - context = obnam.context.Context() - context.cache = obnam.cache.Cache(context.config) - context.be = obnam.backend.init(context.config, context.cache) - - first = self.create_file("pink") - second = self.create_file("pretty") - sig = obnam.rsync.compute_signature(context, first) - deltapart_ids = obnam.rsync.compute_delta(context, sig, second) - obnam.io.flush_all_object_queues(context) - - self.failUnlessRaises(obnam.rsync.CommandFailure, - obnam.rsync.apply_delta, - context, first, deltapart_ids, "/dev/null", - cmd="./badcat") - - shutil.rmtree(context.config.get("backup", "store")) diff --git a/obnam/store.py b/obnam/store.py deleted file mode 100644 index 3ca6ba55..00000000 --- a/obnam/store.py +++ /dev/null @@ -1,254 +0,0 @@ -# Copyright (C) 2008 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Abstraction for storing backup data, for Obnam.""" - - -import logging -import os - -import obnam - - -class ObjectNotFoundInStore(obnam.exception.ObnamException): - - def __init__(self, id): - self._msg = "Object %s not found in store" % id - - -class Store: - - def __init__(self, context): - self._context = context - self._host = None - - def close(self): - """Close connection to the store. - - You must not use this store instance for anything after - closing it. - - """ - - self._context.be.close() - - def get_host_block(self): - """Return current host block, or None if one is not known. - - You must call fetch_host_block to fetch the host block first. - - """ - - return self._host - - def fetch_host_block(self): - """Fetch host block from store, if one exists. - - If a host block does not exist, it is not an error. A new - host block is then created. - - """ - - if not self._host: - host_block = obnam.io.get_host_block(self._context) - if host_block: - self._host = obnam.obj.create_host_from_block(host_block) - else: - id = self._context.config.get("backup", "host-id") - self._host = obnam.obj.HostBlockObject(host_id=id) - return self._host - - - def load_maps(self): - """Load non-content map blocks.""" - ids = self._host.get_map_block_ids() - logging.info("Decoding %d mapping blocks" % len(ids)) - obnam.io.load_maps(self._context, self._context.map, ids) - - def load_content_maps(self): - """Load content map blocks.""" - ids = self._host.get_contmap_block_ids() - logging.info("Decoding %d content mapping blocks" % len(ids)) - obnam.io.load_maps(self._context, self._context.contmap, ids) - - def _update_map_helper(self, map): - """Create new mapping blocks of a given kind, and upload them. - - Return list of block ids for the new blocks. - - """ - - if obnam.map.get_new(map): - id = self._context.be.generate_block_id() - logging.debug("Creating mapping block %s" % id) - block = obnam.map.encode_new_to_block(map, id) - self._context.be.upload_block(id, block, True) - return [id] - else: - logging.debug("No new mappings, no new mapping block") - return [] - - def update_maps(self): - """Create new object mapping blocks and upload them.""" - logging.debug("Creating new mapping block for normal mappings") - return self._update_map_helper(self._context.map) - - def update_content_maps(self): - """Create new content object mapping blocks and upload them.""" - logging.debug("Creating new mapping block for content mappings") - return self._update_map_helper(self._context.contmap) - - def commit_host_block(self, new_generations): - """Commit the current host block to the store. - - If no host block exists, create one. If one already exists, - update it with new info. - - NOTE that after this operation the host block has changed, - and you need to call get_host_block again. - - """ - - obnam.io.flush_all_object_queues(self._context) - - logging.info("Creating new mapping blocks") - host = self.get_host_block() - map_ids = host.get_map_block_ids() + self.update_maps() - contmap_ids = (host.get_contmap_block_ids() + - self.update_content_maps()) - - logging.info("Creating new host block") - gen_ids = (host.get_generation_ids() + - [gen.get_id() for gen in new_generations]) - host2 = obnam.obj.HostBlockObject(host_id=host.get_id(), - gen_ids=gen_ids, - map_block_ids=map_ids, - contmap_block_ids=contmap_ids) - obnam.io.upload_host_block(self._context, host2.encode()) - - self._host = host2 - - def queue_object(self, object): - """Queue an object for upload to the store. - - It won't necessarily be committed (i.e., uploaded, etc) until - you call commit_host_block. Until it is committed, you may not - call get_object on it. - - """ - - obnam.io.enqueue_object(self._context, self._context.oq, - self._context.map, object.get_id(), - object.encode(), True) - - def queue_objects(self, objects): - """Queue a list of objects for upload to the store. - - See queue_object for information about what queuing means. - - """ - - for object in objects: - self.queue_object(object) - - def get_object(self, id): - """Get an object from the store. - - If the object cannot be found, raise an exception. - - """ - - object = obnam.io.get_object(self._context, id) - if object: - return object - raise ObjectNotFoundInStore(id) - - def parse_pathname(self, pathname): - """Return list of components in pathname.""" - - list = [] - while pathname: - dirname = os.path.dirname(pathname) - basename = os.path.basename(pathname) - if basename: - list.insert(0, basename) - elif dirname == os.sep: - list.insert(0, "/") - dirname = "" - pathname = dirname - - return list - - def _lookup_dir_from_refs(self, dirrefs, parts): - for ref in dirrefs: - dir = self.get_object(ref) - if dir.get_name() == parts[0]: - parts = parts[1:] - if parts: - dirrefs = dir.get_dirrefs() - return self._lookup_dir_from_refs(dirrefs, parts) - else: - return dir - return None - - def lookup_dir(self, generation, pathname): - """Return a DirObject that corresponds to pathname in a generation. - - Look up the directory in the generation. If it does not exist, - return None. - - """ - - dirrefs = generation.get_dirrefs() - parts = self.parse_pathname(pathname) - - for dirref in dirrefs: - dir = self.get_object(dirref) - name = dir.get_name() - if name == pathname: - return dir - else: - if not name.endswith(os.sep): - name += os.sep - if pathname.startswith(name): - subpath = pathname[len(name):] - subparts = self.parse_pathname(subpath) - return self._lookup_dir_from_refs(dir.get_dirrefs(), - subparts) - - return self._lookup_dir_from_refs(dirrefs, parts) - - def lookup_file(self, generation, pathname): - """Find a non-directory thingy in a generation. - - Return a FILE component that corresponds to the filesystem entity - in question. If not found, return None. - - """ - - dirname = os.path.dirname(pathname) - if dirname: - dir = self.lookup_dir(generation, dirname) - if dir: - basename = os.path.basename(pathname) - for id in dir.get_filegrouprefs(): - fg = self.get_object(id) - file = fg.get_file(basename) - if file: - return file - - return None diff --git a/obnam/storeTests.py b/obnam/storeTests.py deleted file mode 100644 index 4a2c732f..00000000 --- a/obnam/storeTests.py +++ /dev/null @@ -1,300 +0,0 @@ -# Copyright (C) 2008 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Unit tests for abstraction for storing backup data, for Obnam.""" - - -import os -import shutil -import socket -import tempfile -import unittest - -import obnam - - -class StoreTests(unittest.TestCase): - - def setUp(self): - context = obnam.context.Context() - context.cache = obnam.cache.Cache(context.config) - context.be = obnam.backend.init(context.config, context.cache) - self.store = obnam.Store(context) - - def tearDown(self): - shutil.rmtree(self.store._context.config.get("backup", "store"), - ignore_errors=True) - shutil.rmtree(self.store._context.config.get("backup", "cache"), - ignore_errors=True) - - def testReturnsNoneWhenNoHostBlockExists(self): - self.failUnlessEqual(self.store.get_host_block(), None) - - def testReturnsAnActualHostBlockAfterFetch(self): - self.store.fetch_host_block() - host = self.store.get_host_block() - self.failUnless(isinstance(host, obnam.obj.HostBlockObject)) - - def testReturnsActualHostBlockWhenOneExists(self): - self.store.fetch_host_block() - self.store.commit_host_block([]) - - context = obnam.context.Context() - context.be = obnam.backend.init(context.config, context.cache) - store = obnam.Store(context) - store.fetch_host_block() - host = store.get_host_block() - self.failUnless(isinstance(host, obnam.obj.HostBlockObject)) - - def testReplacesHostObjectInMemory(self): - self.store.fetch_host_block() - host = self.store.get_host_block() - self.store.commit_host_block([]) - self.failIfEqual(self.store.get_host_block(), host) - - def testCreatesNewHostBlockWhenNoneExists(self): - self.store.fetch_host_block() - host = self.store.get_host_block() - self.failUnlessEqual(host.get_id(), socket.gethostname()) - self.failUnlessEqual(host.get_generation_ids(), []) - self.failUnlessEqual(host.get_map_block_ids(), []) - self.failUnlessEqual(host.get_contmap_block_ids(), []) - - def testLoadsActualHostBlockWhenOneExists(self): - context = obnam.context.Context() - cache = obnam.cache.Cache(context.config) - context.be = obnam.backend.init(context.config, context.cache) - host_id = context.config.get("backup", "host-id") - temp = obnam.obj.HostBlockObject(host_id=host_id, - gen_ids=["pink", "pretty"]) - obnam.io.upload_host_block(context, temp.encode()) - - self.store.fetch_host_block() - host = self.store.get_host_block() - self.failUnlessEqual(host.get_generation_ids(), ["pink", "pretty"]) - - def testGettingNonExistentObjectRaisesException(self): - self.failUnlessRaises(obnam.exception.ObnamException, - self.store.get_object, "pink") - - def testAddsObjectToStore(self): - o = obnam.obj.GenerationObject(id="pink") - self.store.fetch_host_block() - self.store.queue_object(o) - self.store.commit_host_block([]) - - context2 = obnam.context.Context() - context2.cache = obnam.cache.Cache(context2.config) - context2.be = obnam.backend.init(context2.config, context2.cache) - store2 = obnam.Store(context2) - store2.fetch_host_block() - store2.load_maps() - self.failUnless(store2.get_object(o.get_id())) - - def mock_queue_object(self, object): - self.queued_objects.append(object) - - def testAddsSeveralObjectsToStore(self): - objs = [None, True, False] - self.queued_objects = [] - self.store.queue_object = self.mock_queue_object - self.store.queue_objects(objs) - self.failUnlessEqual(objs, self.queued_objects) - - -class StoreMapTests(unittest.TestCase): - - def setUp(self): - # First, set up two mappings. - - context = obnam.context.Context() - context.cache = obnam.cache.Cache(context.config) - context.be = obnam.backend.init(context.config, context.cache) - - obnam.map.add(context.map, "pink", "pretty") - obnam.map.add(context.contmap, "black", "beautiful") - - map_id = context.be.generate_block_id() - map_block = obnam.map.encode_new_to_block(context.map, map_id) - context.be.upload_block(map_id, map_block, True) - - contmap_id = context.be.generate_block_id() - contmap_block = obnam.map.encode_new_to_block(context.contmap, - contmap_id) - context.be.upload_block(contmap_id, contmap_block, True) - - host_id = context.config.get("backup", "host-id") - host = obnam.obj.HostBlockObject(host_id=host_id, - map_block_ids=[map_id], - contmap_block_ids=[contmap_id]) - obnam.io.upload_host_block(context, host.encode()) - - # Then set up the real context and app. - - self.context = obnam.context.Context() - self.context.cache = obnam.cache.Cache(self.context.config) - self.context.be = obnam.backend.init(self.context.config, - self.context.cache) - self.store = obnam.Store(self.context) - self.store.fetch_host_block() - - def tearDown(self): - shutil.rmtree(self.store._context.config.get("backup", "store"), - ignore_errors=True) - shutil.rmtree(self.store._context.config.get("backup", "cache"), - ignore_errors=True) - - def testHasNoMapsLoadedByDefault(self): - self.failUnlessEqual(obnam.map.count(self.context.map), 0) - - def testHasNoContentMapsLoadedByDefault(self): - self.failUnlessEqual(obnam.map.count(self.context.contmap), 0) - - def testLoadsMapsWhenRequested(self): - self.store.load_maps() - self.failUnlessEqual(obnam.map.count(self.context.map), 1) - - def testLoadsContentMapsWhenRequested(self): - self.store.load_content_maps() - self.failUnlessEqual(obnam.map.count(self.context.contmap), 1) - - def testAddsNoNewMapsWhenNothingHasChanged(self): - self.store.update_maps() - self.failUnlessEqual(obnam.map.count(self.context.map), 0) - - def testAddsANewMapsWhenSomethingHasChanged(self): - obnam.map.add(self.context.map, "pink", "pretty") - self.store.update_maps() - self.failUnlessEqual(obnam.map.count(self.context.map), 1) - - def testAddsNoNewContentMapsWhenNothingHasChanged(self): - self.store.update_content_maps() - self.failUnlessEqual(obnam.map.count(self.context.contmap), 0) - - def testAddsANewContentMapsWhenSomethingHasChanged(self): - obnam.map.add(self.context.contmap, "pink", "pretty") - self.store.update_content_maps() - self.failUnlessEqual(obnam.map.count(self.context.contmap), 1) - - -class StorePathnameParserTests(unittest.TestCase): - - def setUp(self): - context = obnam.context.Context() - self.store = obnam.Store(context) - - def testReturnsRootForRoot(self): - self.failUnlessEqual(self.store.parse_pathname("/"), ["/"]) - - def testReturnsDotForDot(self): - self.failUnlessEqual(self.store.parse_pathname("."), ["."]) - - def testReturnsItselfForSingleElement(self): - self.failUnlessEqual(self.store.parse_pathname("foo"), ["foo"]) - - def testReturnsListOfPartsForMultipleElements(self): - self.failUnlessEqual(self.store.parse_pathname("foo/bar"), - ["foo", "bar"]) - - def testReturnsListOfPartsFromRootForAbsolutePathname(self): - self.failUnlessEqual(self.store.parse_pathname("/foo/bar"), - ["/", "foo", "bar"]) - - def testIgnoredTrailingSlashIfNotRoot(self): - self.failUnlessEqual(self.store.parse_pathname("foo/bar/"), - ["foo", "bar"]) - - -class StoreLookupTests(unittest.TestCase): - - def create_data_dir(self): - dirname = tempfile.mkdtemp() - file(os.path.join(dirname, "file1"), "w").close() - os.mkdir(os.path.join(dirname, "dir1")) - os.mkdir(os.path.join(dirname, "dir1", "dir2")) - file(os.path.join(dirname, "dir1", "dir2", "file2"), "w").close() - return dirname - - def create_context(self): - context = obnam.context.Context() - context.cache = obnam.cache.Cache(context.config) - context.be = obnam.backend.init(context.config, context.cache) - return context - - def setUp(self): - self.datadir = self.create_data_dir() - - app = obnam.Application(self.create_context()) - app.load_host() - gen = app.backup([self.datadir]) - app.get_store().commit_host_block([gen]) - - self.store = obnam.Store(self.create_context()) - self.store.fetch_host_block() - self.store.load_maps() - gen_ids = self.store.get_host_block().get_generation_ids() - self.gen = self.store.get_object(gen_ids[0]) - - def tearDown(self): - shutil.rmtree(self.datadir) - shutil.rmtree(self.store._context.config.get("backup", "store")) - - def testFindsBackupRoot(self): - dir = self.store.lookup_dir(self.gen, self.datadir) - self.failUnless(dir.get_name(), self.datadir) - - def testFindsFirstSubdir(self): - pathname = os.path.join(self.datadir, "dir1") - dir = self.store.lookup_dir(self.gen, pathname) - self.failUnless(dir.get_name(), "dir1") - - def testFindsSecondSubdir(self): - pathname = os.path.join(self.datadir, "dir1", "dir2") - dir = self.store.lookup_dir(self.gen, pathname) - self.failUnless(dir.get_name(), "dir2") - - def testDoesNotFindNonExistentDir(self): - self.failUnlessEqual(self.store.lookup_dir(self.gen, "notexist"), - None) - - def testDoesNotFindNonExistentFileInSubDirectory(self): - pathname = os.path.join(self.datadir, "dir1", "notexist") - file = self.store.lookup_file(self.gen, pathname) - self.failUnlessEqual(file, None) - - def testDoesNotFindNonExistentFileInSubSubDirectory(self): - pathname = os.path.join(self.datadir, "dir1", "dir2", "notexist") - file = self.store.lookup_file(self.gen, pathname) - self.failUnlessEqual(file, None) - - def testDoesNotFindNonExistentFileInRoot(self): - pathname = os.path.join(self.datadir, "notexist") - file = self.store.lookup_file(self.gen, pathname) - self.failUnlessEqual(file, None) - - def filename(self, file): - return file.first_string_by_kind(obnam.cmp.FILENAME) - - def testFindsFileInRootDirectory(self): - pathname = os.path.join(self.datadir, "file1") - file = self.store.lookup_file(self.gen, pathname) - self.failUnlessEqual(self.filename(file), "file1") - - def testFindsFileInSubDirectory(self): - pathname = os.path.join(self.datadir, "dir1", "dir2", "file2") - file = self.store.lookup_file(self.gen, pathname) - self.failUnlessEqual(self.filename(file), "file2") diff --git a/obnam/utils.py b/obnam/utils.py deleted file mode 100644 index f6747867..00000000 --- a/obnam/utils.py +++ /dev/null @@ -1,47 +0,0 @@ -# Copyright (C) 2007 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Misc. utility functions for Obnam""" - - -import os - - -def make_stat_result(st_mode=0, st_ino=0, st_dev=0, st_nlink=0, st_uid=0, - st_gid=0, st_size=0, st_atime=0, st_mtime=0, st_ctime=0, - st_blocks=0, st_blksize=0, st_rdev=0): - - dict = { - "st_mode": st_mode, - "st_ino": st_ino, - "st_dev": st_dev, - "st_nlink": st_nlink, - "st_uid": st_uid, - "st_gid": st_gid, - "st_size": st_size, - "st_atime": st_atime, - "st_mtime": st_mtime, - "st_ctime": st_ctime, - "st_blocks": st_blocks, - "st_blksize": st_blksize, - "st_rdev": st_rdev, - } - - tup = (st_mode, st_ino, st_dev, st_nlink, st_uid, st_gid, st_size, - st_atime, st_mtime, st_ctime) - - return os.stat_result(tup, dict) diff --git a/obnam/varint.py b/obnam/varint.py deleted file mode 100644 index 3174b1da..00000000 --- a/obnam/varint.py +++ /dev/null @@ -1,32 +0,0 @@ -# Copyright (C) 2006 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Variable length integers""" - - -def encode(i): - """Encode an integer as a varint""" - return "%d\n" % i - - -def decode(encoded, pos): - """Decode a varint from a string, return value and pos after it""" - i = encoded.find("\n", pos) - if i == -1: - return -1, pos - else: - return int(encoded[pos:i]), i+1 diff --git a/obnam/varintTests.py b/obnam/varintTests.py deleted file mode 100644 index 79d68f9c..00000000 --- a/obnam/varintTests.py +++ /dev/null @@ -1,41 +0,0 @@ -# Copyright (C) 2006 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Unit tests for obnam.varint.""" - - -import unittest - - -import obnam.varint - - -class VarintEncodeDecodeTests(unittest.TestCase): - - def test(self): - numbers = (0, 1, 127, 128, 0xff00) - for i in numbers: - str = obnam.varint.encode(i) - (i2, pos) = obnam.varint.decode(str, 0) - self.failUnlessEqual(i, i2) - self.failUnlessEqual(pos, len(str)) - - def testError(self): - str = "asdf" - n, pos = obnam.varint.decode(str, 0) - self.failUnlessEqual(n, -1) - self.failUnlessEqual(pos, 0) diff --git a/obnam/walk.py b/obnam/walk.py deleted file mode 100644 index 4a2d0e23..00000000 --- a/obnam/walk.py +++ /dev/null @@ -1,67 +0,0 @@ -# Copyright (C) 2008 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Walk a directory tree.""" - - -import os - - -def depth_first(top, prune=None): - """Walk a directory tree depth-first, except for unwanted subdirs. - - This is, essentially, 'os.walk(top, topdown=False)', except that - if the prune argument is set, we call it before descending to - sub-directories to allow it to remove any directories and files - the caller does not want to know about. - - If set, prune must be a function that gets three arguments (current - directory, list of sub-directory names, list of files in directory), - and must modify the two lists _in_place_. For example: - - def prune(dirname, dirnames, filenames): - if ".bzr" in dirnames: - dirnames.remove(".bzr") - - The dirnames and filenames lists contain basenames, relative to - dirname. - - """ - - # We walk topdown, since that's the only way os.walk allows us to - # do any pruning. We use os.walk to get the exact same error handling - # and other logic it uses. - for dirname, dirnames, filenames in os.walk(top): - - # Prune. This modifies dirnames and filenames in place. - if prune: - prune(dirname, dirnames, filenames) - - # Make a duplicate of the dirnames, then empty the existing list. - # This way, os.walk won't try to walk to subdirectories. We'll - # do that manually. - real_dirnames = dirnames[:] - del dirnames[:] - - # Process subdirectories, recursively. - for subdirname in real_dirnames: - subdirpath = os.path.join(dirname, subdirname) - for x in depth_first(subdirpath, prune=prune): - yield x - - # Return current directory last. - yield dirname, real_dirnames, filenames diff --git a/obnam/walk_tests.py b/obnam/walk_tests.py deleted file mode 100644 index 747c1b97..00000000 --- a/obnam/walk_tests.py +++ /dev/null @@ -1,56 +0,0 @@ -# Copyright (C) 2008 Lars Wirzenius <liw@iki.fi> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - -"""Unit tests for obnam.walk.""" - - -import os -import shutil -import tempfile -import unittest - -import obnam - - -class DepthFirstTests(unittest.TestCase): - - def setUp(self): - self.root = tempfile.mkdtemp() - self.dirs = ["foo", "foo/bar", "foobar"] - self.dirs = [os.path.join(self.root, x) for x in self.dirs] - for dir in self.dirs: - os.mkdir(dir) - self.dirs.insert(0, self.root) - - def tearDown(self): - shutil.rmtree(self.root) - - def testFindsAllDirs(self): - dirs = [x[0] for x in obnam.walk.depth_first(self.root)] - self.failUnlessEqual(sorted(dirs), sorted(self.dirs)) - - def prune(self, dirname, dirnames, filenames): - if "foo" in dirnames: - dirnames.remove("foo") - - def testFindsAllDirsExceptThePrunedOne(self): - correct = [x - for x in self.dirs - if not x.endswith("/foo") and not "/foo/" in x] - dirs = [x[0] - for x in obnam.walk.depth_first(self.root, prune=self.prune)] - self.failUnlessEqual(sorted(dirs), sorted(correct)) |