summaryrefslogtreecommitdiff
path: root/obnam/ioTests.py
diff options
context:
space:
mode:
Diffstat (limited to 'obnam/ioTests.py')
-rw-r--r--obnam/ioTests.py534
1 files changed, 0 insertions, 534 deletions
diff --git a/obnam/ioTests.py b/obnam/ioTests.py
deleted file mode 100644
index 4e878566..00000000
--- a/obnam/ioTests.py
+++ /dev/null
@@ -1,534 +0,0 @@
-# Copyright (C) 2006 Lars Wirzenius <liw@iki.fi>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-
-"""Unit tests for obnam.io"""
-
-
-import os
-import shutil
-import tempfile
-import unittest
-
-
-import obnam
-
-
-class ExceptionTests(unittest.TestCase):
-
- def testMissingBlock(self):
- e = obnam.io.MissingBlock("pink", "pretty")
- self.failUnless("pink" in str(e))
- self.failUnless("pretty" in str(e))
-
- def testFileContentsObjectMissing(self):
- e = obnam.io.FileContentsObjectMissing("pink")
- self.failUnless("pink" in str(e))
-
-
-class ResolveTests(unittest.TestCase):
-
- def test(self):
- context = obnam.context.Context()
- # We don't need the fields that are usually initialized manually.
-
- facit = (
- (".", "/", "/"),
- (".", "/pink", "/pink"),
- (".", "pink", "./pink"),
- ("pink", "/", "/"),
- ("pink", "/pretty", "/pretty"),
- ("pink", "pretty", "pink/pretty"),
- ("/pink", "/", "/"),
- ("/pink", "/pretty", "/pretty"),
- ("/pink", "pretty", "/pink/pretty"),
- ("/", "/", "/"),
- )
-
- for target, pathname, resolved in facit:
- context.config.set("backup", "target-dir", target)
- x = obnam.io.resolve(context, pathname)
- self.failUnlessEqual(x, resolved)
- self.failUnlessEqual(obnam.io.unsolve(context, x), pathname)
-
- self.failUnlessEqual(obnam.io.unsolve(context, "/pink"), "pink")
-
-
-class IoBase(unittest.TestCase):
-
- def setUp(self):
- self.cachedir = "tmp.cachedir"
- self.rootdir = "tmp.rootdir"
-
- os.mkdir(self.cachedir)
- os.mkdir(self.rootdir)
-
- config_list = (
- ("backup", "cache", self.cachedir),
- ("backup", "store", self.rootdir)
- )
-
- self.context = obnam.context.Context()
-
- for section, item, value in config_list:
- self.context.config.set(section, item, value)
-
- self.context.cache = obnam.cache.Cache(self.context.config)
- self.context.be = obnam.backend.init(self.context.config,
- self.context.cache)
-
- def tearDown(self):
- shutil.rmtree(self.cachedir)
- shutil.rmtree(self.rootdir)
- del self.cachedir
- del self.rootdir
- del self.context
-
-
-class ObjectQueueFlushing(IoBase):
-
- def testEmptyQueue(self):
- obnam.io.flush_object_queue(self.context, self.context.oq,
- self.context.map, False)
- list = self.context.be.list()
- self.failUnlessEqual(list, [])
-
- def testFlushing(self):
- self.context.oq.add("pink", "pretty")
-
- self.failUnlessEqual(self.context.be.list(), [])
-
- obnam.io.flush_object_queue(self.context, self.context.oq,
- self.context.map, False)
-
- list = self.context.be.list()
- self.failUnlessEqual(len(list), 1)
-
- b1 = os.path.basename(obnam.map.get(self.context.map, "pink"))
- b2 = os.path.basename(list[0])
- self.failUnlessEqual(b1, b2)
-
- def testFlushAll(self):
- self.context.oq.add("pink", "pretty")
- self.context.content_oq.add("x", "y")
- obnam.io.flush_all_object_queues(self.context)
- self.failUnlessEqual(len(self.context.be.list()), 2)
- self.failUnless(self.context.oq.is_empty())
- self.failUnless(self.context.content_oq.is_empty())
-
-
-class GetBlockTests(IoBase):
-
- def setup_pink_block(self, to_cache):
- self.context.be.upload_block("pink", "pretty", to_cache)
-
- def testRaisesIoErrorForNonExistentBlock(self):
- self.failUnlessRaises(IOError, obnam.io.get_block, self.context, "pink")
-
- def testFindsBlockWhenNotInCache(self):
- self.setup_pink_block(to_cache=False)
- self.failUnless(obnam.io.get_block(self.context, "pink"))
-
- def testFindsBlockWhenInCache(self):
- self.setup_pink_block(to_cache=True)
- obnam.io.get_block(self.context, "pink")
- self.failUnless(obnam.io.get_block(self.context, "pink"))
-
-
-class GetObjectTests(IoBase):
-
- def upload_object(self, object_id, object):
- self.context.oq.add(object_id, object)
- obnam.io.flush_object_queue(self.context, self.context.oq,
- self.context.map, False)
-
- def testGetObject(self):
- id = "pink"
- component = obnam.cmp.Component(42, "pretty")
- object = obnam.obj.FilePartObject(id=id)
- object.add(component)
- object = object.encode()
- self.upload_object(id, object)
- o = obnam.io.get_object(self.context, id)
-
- self.failUnlessEqual(o.get_id(), id)
- self.failUnlessEqual(o.get_kind(), obnam.obj.FILEPART)
- list = o.get_components()
- list = [c for c in list if c.get_kind() not in [obnam.cmp.OBJID,
- obnam.cmp.OBJKIND]]
- self.failUnlessEqual(len(list), 1)
- self.failUnlessEqual(list[0].get_kind(), 42)
- self.failUnlessEqual(list[0].get_string_value(), "pretty")
-
- def testGetObjectTwice(self):
- id = "pink"
- component = obnam.cmp.Component(42, "pretty")
- object = obnam.obj.FileContentsObject(id=id)
- object.add(component)
- object = object.encode()
- self.upload_object(id, object)
- o = obnam.io.get_object(self.context, id)
- o2 = obnam.io.get_object(self.context, id)
- self.failUnlessEqual(o, o2)
-
- def testReturnsNoneForNonexistentObject(self):
- self.failUnlessEqual(obnam.io.get_object(self.context, "pink"), None)
-
-
-class HostBlock(IoBase):
-
- def testFetchHostBlock(self):
- host_id = self.context.config.get("backup", "host-id")
- host = obnam.obj.HostBlockObject(host_id=host_id,
- gen_ids=["gen1", "gen2"],
- map_block_ids=["map1", "map2"],
- contmap_block_ids=["contmap1",
- "contmap2"])
- host = host.encode()
- be = obnam.backend.init(self.context.config, self.context.cache)
-
- obnam.io.upload_host_block(self.context, host)
- host2 = obnam.io.get_host_block(self.context)
- self.failUnlessEqual(host, host2)
-
- def testFetchNonexistingHostBlockReturnsNone(self):
- self.failUnlessEqual(obnam.io.get_host_block(self.context), None)
-
-
-class ObjectQueuingTests(unittest.TestCase):
-
- def find_block_files(self, config):
- files = []
- root = config.get("backup", "store")
- for dirpath, _, filenames in os.walk(root):
- files += [os.path.join(dirpath, x) for x in filenames]
- files.sort()
- return files
-
- def testEnqueue(self):
- context = obnam.context.Context()
- object_id = "pink"
- object = "pretty"
- context.config.set("backup", "block-size", "%d" % 128)
- context.cache = obnam.cache.Cache(context.config)
- context.be = obnam.backend.init(context.config, context.cache)
-
- self.failUnlessEqual(self.find_block_files(context.config), [])
-
- obnam.io.enqueue_object(context, context.oq, context.map,
- object_id, object, False)
-
- self.failUnlessEqual(self.find_block_files(context.config), [])
- self.failUnlessEqual(context.oq.combined_size(), len(object))
-
- object_id2 = "pink2"
- object2 = "x" * 1024
-
- obnam.io.enqueue_object(context, context.oq, context.map,
- object_id2, object2, False)
-
- self.failUnlessEqual(len(self.find_block_files(context.config)), 1)
- self.failUnlessEqual(context.oq.combined_size(), len(object2))
-
- shutil.rmtree(context.config.get("backup", "cache"), True)
- shutil.rmtree(context.config.get("backup", "store"), True)
-
-
-class FileContentsTests(unittest.TestCase):
-
- def setUp(self):
- self.context = obnam.context.Context()
- self.context.cache = obnam.cache.Cache(self.context.config)
- self.context.be = obnam.backend.init(self.context.config,
- self.context.cache)
-
- def tearDown(self):
- for x in ["cache", "store"]:
- if os.path.exists(self.context.config.get("backup", x)):
- shutil.rmtree(self.context.config.get("backup", x))
-
- def testEmptyFile(self):
- (fd, filename) = tempfile.mkstemp()
- os.close(fd)
-
- id = obnam.io.create_file_contents_object(self.context, filename)
-
- self.failIfEqual(id, None)
- self.failUnlessEqual(self.context.oq.ids(), [id])
- self.failUnlessEqual(obnam.map.count(self.context.map), 0)
- # there's no mapping yet, because the queue is small enough
- # that there has been no need to flush it
-
- os.remove(filename)
-
- def testNonEmptyFile(self):
- block_size = 4096
- self.context.config.set("backup", "block-size", "%d" % block_size)
- filename = "Makefile"
-
- id = obnam.io.create_file_contents_object(self.context, filename)
-
- self.failIfEqual(id, None)
- self.failUnlessEqual(self.context.oq.ids(), [id])
-
- def testRestore(self):
- block_size = 4096
- self.context.config.set("backup", "block-size", "%d" % block_size)
- filename = "Makefile"
-
- id = obnam.io.create_file_contents_object(self.context, filename)
- obnam.io.flush_object_queue(self.context, self.context.oq,
- self.context.map, False)
- obnam.io.flush_object_queue(self.context, self.context.content_oq,
- self.context.contmap, False)
-
- (fd, name) = tempfile.mkstemp()
- obnam.io.copy_file_contents(self.context, fd, id)
- os.close(fd)
-
- f = file(name, "r")
- data1 = f.read()
- f.close()
- os.remove(name)
-
- f = file(filename, "r")
- data2 = f.read()
- f.close()
-
- self.failUnlessEqual(data1, data2)
-
- def testRestoreNonexistingFile(self):
- self.failUnlessRaises(obnam.io.FileContentsObjectMissing,
- obnam.io.copy_file_contents, self.context, None, "pink")
-
-
-class MetaDataTests(unittest.TestCase):
-
- def testSet(self):
- (fd, name) = tempfile.mkstemp()
- os.close(fd)
-
- st1 = os.stat(name)
- inode = obnam.filelist.create_file_component_from_stat(name, st1,
- None, None,
- None)
-
- os.chmod(name, 0)
-
- obnam.io.set_inode(name, inode)
-
- st2 = os.stat(name)
-
- self.failUnlessEqual(st1.st_mode, st2.st_mode)
- self.failUnlessEqual(st1.st_atime, st2.st_atime)
- self.failUnlessEqual(st1.st_mtime, st2.st_mtime)
-
-
-class ObjectCacheTests(unittest.TestCase):
-
- def setUp(self):
- self.object = obnam.obj.FilePartObject(id="pink")
- self.object2 = obnam.obj.FilePartObject(id="pretty")
- self.object3 = obnam.obj.FilePartObject(id="beautiful")
-
- def testCreate(self):
- context = obnam.context.Context()
- oc = obnam.io.ObjectCache(context)
- self.failUnlessEqual(oc.size(), 0)
- self.failUnless(oc.MAX > 0)
-
- def testPut(self):
- context = obnam.context.Context()
- oc = obnam.io.ObjectCache(context)
- self.failUnlessEqual(oc.get("pink"), None)
- oc.put(self.object)
- self.failUnlessEqual(oc.get("pink"), self.object)
-
- def testPutWithOverflow(self):
- context = obnam.context.Context()
- oc = obnam.io.ObjectCache(context)
- oc.MAX = 1
- oc.put(self.object)
- self.failUnlessEqual(oc.size(), 1)
- self.failUnlessEqual(oc.get("pink"), self.object)
- oc.put(self.object2)
- self.failUnlessEqual(oc.size(), 1)
- self.failUnlessEqual(oc.get("pink"), None)
- self.failUnlessEqual(oc.get("pretty"), self.object2)
-
- def testPutWithOverflowPart2(self):
- context = obnam.context.Context()
- oc = obnam.io.ObjectCache(context)
- oc.MAX = 2
-
- oc.put(self.object)
- oc.put(self.object2)
- self.failUnlessEqual(oc.size(), 2)
- self.failUnlessEqual(oc.get("pink"), self.object)
- self.failUnlessEqual(oc.get("pretty"), self.object2)
-
- oc.get("pink")
- oc.put(self.object3)
- self.failUnlessEqual(oc.size(), 2)
- self.failUnlessEqual(oc.get("pink"), self.object)
- self.failUnlessEqual(oc.get("pretty"), None)
- self.failUnlessEqual(oc.get("beautiful"), self.object3)
-
-
-class ReachabilityTests(IoBase):
-
- def testNoDataNoMaps(self):
- host_id = self.context.config.get("backup", "host-id")
- host = obnam.obj.HostBlockObject(host_id=host_id).encode()
- obnam.io.upload_host_block(self.context, host)
-
- list = obnam.io.find_reachable_data_blocks(self.context, host)
- self.failUnlessEqual(list, [])
-
- list2 = obnam.io.find_map_blocks_in_use(self.context, host, list)
- self.failUnlessEqual(list2, [])
-
- def testNoDataExtraMaps(self):
- obnam.map.add(self.context.map, "pink", "pretty")
- map_block_id = "box"
- map_block = obnam.map.encode_new_to_block(self.context.map,
- map_block_id)
- self.context.be.upload_block(map_block_id, map_block, False)
-
- obnam.map.add(self.context.contmap, "black", "beautiful")
- contmap_block_id = "fiddly"
- contmap_block = obnam.map.encode_new_to_block(
- self.context.contmap, contmap_block_id)
- self.context.be.upload_block(contmap_block_id, contmap_block, False)
-
- host_id = self.context.config.get("backup", "host-id")
- host = obnam.obj.HostBlockObject(host_id=host_id,
- map_block_ids=[map_block_id],
- contmap_block_ids=[contmap_block_id])
- host = host.encode()
- obnam.io.upload_host_block(self.context, host)
-
- list = obnam.io.find_map_blocks_in_use(self.context, host, [])
- self.failUnlessEqual(list, [])
-
- def testDataAndMap(self):
- o = obnam.obj.FilePartObject(id="rouge")
- c = obnam.cmp.Component(obnam.cmp.FILECHUNK, "moulin")
- o.add(c)
- encoded_o = o.encode()
-
- block_id = "pink"
- oq = obnam.obj.ObjectQueue()
- oq.add("rouge", encoded_o)
- block = oq.as_block(block_id)
- self.context.be.upload_block(block_id, block, False)
-
- obnam.map.add(self.context.contmap, "rouge", block_id)
- map_block_id = "pretty"
- map_block = obnam.map.encode_new_to_block(self.context.contmap,
- map_block_id)
- self.context.be.upload_block(map_block_id, map_block, False)
-
- host_id = self.context.config.get("backup", "host-id")
- host = obnam.obj.HostBlockObject(host_id=host_id,
- map_block_ids=[map_block_id])
- host = host.encode()
- obnam.io.upload_host_block(self.context, host)
-
- list = obnam.io.find_map_blocks_in_use(self.context, host,
- [block_id])
- self.failUnlessEqual(list, [map_block_id])
-
-
-class GarbageCollectionTests(IoBase):
-
- def testFindUnreachableFiles(self):
- host_id = self.context.config.get("backup", "host-id")
- host = obnam.obj.HostBlockObject(host_id=host_id).encode()
- obnam.io.upload_host_block(self.context, host)
-
- block_id = self.context.be.generate_block_id()
- self.context.be.upload_block(block_id, "pink", False)
-
- files = self.context.be.list()
- self.failUnlessEqual(files, [host_id, block_id])
-
- obnam.io.collect_garbage(self.context, host)
- files = self.context.be.list()
- self.failUnlessEqual(files, [host_id])
-
-
-class ObjectCacheRegressionTest(unittest.TestCase):
-
- # This test case is for a bug in obnam.io.ObjectCache: with the
- # right sequence of operations, the cache can end up in a state where
- # the MRU list is too long, but contains two instances of the same
- # object ID. When the list is shortened, the first instance of the
- # ID is removed, and the object is also removed from the dictionary.
- # If the list is still too long, it is shortened again, by removing
- # the last item in the list, but that no longer is in the dictionary,
- # resulting in the shortening not happening. Voila, an endless loop.
- #
- # As an example, if the object queue maximum size is 3, the following
- # sequence exhibits the problem:
- #
- # put('a') mru = ['a']
- # put('b') mru = ['b', 'a']
- # put('c') mru = ['c', 'b', 'a']
- # put('a') mru = ['a', 'c', 'b', 'a'], shortened into
- # ['c', 'b', 'a'], and now dict no longer
- # has 'a'
- # put('d') mru = ['d', 'c', 'b', 'a'], which needs to be
- # shortened by removing the last element, but
- # since 'a' is no longer in dict, the list
- # doesn't actually become shorter, and
- # the shortening loop becomes infinite
- #
- # (The fix to the bug is, of course, to forget the object to be
- # inserted before inserting it, thus removing duplicates in the MRU
- # list.)
-
- def test(self):
- context = obnam.context.Context()
- context.config.set("backup", "object-cache-size", "3")
- oc = obnam.io.ObjectCache(context)
- a = obnam.obj.FilePartObject(id="a")
- b = obnam.obj.FilePartObject(id="b")
- c = obnam.obj.FilePartObject(id="c")
- d = obnam.obj.FilePartObject(id="d")
- oc.put(a)
- oc.put(b)
- oc.put(c)
- oc.put(a)
- # If the bug is there, the next method call doesn't return.
- # Beware the operator.
- oc.put(b)
-
-
-class LoadMapTests(IoBase):
-
- def test(self):
- map = obnam.map.create()
- obnam.map.add(map, "pink", "pretty")
- block_id = self.context.be.generate_block_id()
- block = obnam.map.encode_new_to_block(map, block_id)
- self.context.be.upload_block(block_id, block, False)
-
- obnam.io.load_maps(self.context, self.context.map, [block_id])
- self.failUnlessEqual(obnam.map.get(self.context.map, "pink"),
- "pretty")
- self.failUnlessEqual(obnam.map.get(self.context.map, "black"),
- None)