# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import os
import sys
import shutil
import tempfile
import unittest
import larch
import larch.fsck
import StringIO
import ttystatus
class FsckTests(unittest.TestCase):
KEY_SIZE = 15
NODE_SIZE = 128
VALUES_SIZE = 7
NB_KEYS = 40
def setUp(self):
self.logged_errors = []
self.logged_warnings = []
self.dirname = tempfile.mkdtemp()
self.forest = self.new_disk_forest()
def tearDown(self):
self.logged_errors = []
self.logged_warnings = []
shutil.rmtree(self.dirname)
def log_warning(self, msg):
self.logged_warnings.append(msg)
def log_error(self, msg):
self.logged_errors.append(msg)
def new_disk_forest(self):
forest = larch.open_forest(
allow_writes=True,
key_size=self.KEY_SIZE, node_size=self.NODE_SIZE,
dirname=self.dirname, node_store=larch.NodeStoreDisk)
t1=forest.new_tree()
for i in xrange(self.NB_KEYS):
value='%0*d' % (self.VALUES_SIZE, 1*i)
key ='%0*d' % (self.KEY_SIZE, 1*i)
t1.insert(key,value)
forest.commit()
return forest
def get_any_node(self, index_node=False):
"""returns either index node or leaf node"""
any_node=None
for x in xrange(10,self.forest.last_id):
node = self.forest.node_store.get_node(x)
if index_node and isinstance(node, larch.IndexNode):
any_node = node
break
elif (not index_node) and isinstance(node, larch.LeafNode):
any_node = node
break
self.failUnless(any_node is not None)
return any_node
def testError(self):
e = larch.fsck.Error('test')
def testWorkItem(self):
wi = larch.fsck.WorkItem()
self.failUnless( 'WorkItem' == str(wi) )
wi.name = "SomeName"
self.failUnless( 'SomeName' == str(wi) )
wi.do() # does nothing :)
wi.fsck = larch.fsck.Fsck(self.forest, warning=self.log_warning, error=self.log_error, fix=False)
self.failIf( self.logged_warnings )
wi.warning('Glip')
self.failUnless( 'Glip' in self.logged_warnings[0] )
self.failIf( self.logged_errors )
wi.error('Plop')
self.failUnless( 'Plop' in self.logged_errors[0] )
node = self.get_any_node()
node_id = node.id
wi.start_modification(node)
wi.put_node(node)
newnode = self.forest.node_store.get_node(node.id)
self.failUnless( node == newnode )
def test_CheckIndexNode(self):
Ffsck = larch.fsck.Fsck(self.forest, warning=self.log_warning, error=self.log_error, fix=False)
cin = larch.fsck.CheckIndexNode(Ffsck, self.get_any_node(index_node=True) )
for work in cin.do():
work.do() # No error expected
self.failIf( self.logged_warnings )
self.failIf( self.logged_errors )
# Give a leaf node instead of an index node:
cin2 = larch.fsck.CheckIndexNode(Ffsck, self.get_any_node(index_node=False) )
for work in cin2.do():
work.do() # error expected
self.failIf( self.logged_warnings )
self.failUnless('Expected to get an index node' in self.logged_errors[0] )
self.failUnless("got instead" in self.logged_errors[0], self.logged_errors[0] )
def test_CheckIndexNodeNoChildren(self):
Ffsck = larch.fsck.Fsck(self.forest, warning=self.log_warning, error=self.log_error, fix=False)
# Create an index node without children:
index_node = self.get_any_node(index_node=True)
empty_node = larch.IndexNode(index_node.id, [], [])
cin = larch.fsck.CheckIndexNode(Ffsck, empty_node )
for work in cin.do():
work.do() # Expect "No children" error
self.failIf( self.logged_warnings )
self.failUnless('index node %d: No children' % empty_node.id in self.logged_errors[0], self.logged_errors[0] )
def test_CheckIndexNodeFixReferences(self):
# Create an index node with a missing reference:
index_node_missing_reference = self.get_any_node(index_node=True)
id_index_node = index_node_missing_reference.id
id_node_to_remove = index_node_missing_reference.values()[0]
mkey = index_node_missing_reference.keys()[0]
# Let's remove the node_to_remove from the FS:
nodepath = self.forest.node_store.idpath.convert(id_node_to_remove)
os.unlink( nodepath )
del self.forest
Fforest = larch.open_forest(allow_writes=False,dirname=self.dirname)
Ffsck = larch.fsck.Fsck(Fforest, warning=self.log_warning, error=self.log_error, fix=True)
index_node_missing_reference = Fforest.node_store.get_node(id_index_node)
cin = larch.fsck.CheckIndexNode(Ffsck, index_node_missing_reference )
for work in cin.do():
work.do() # Expect dropped keys path
self.failUnless(
'index node %s: dropped key %s' %
(id_index_node, mkey.encode('hex')) in self.logged_warnings[0],
self.logged_warnings[0] )
# We have a "node %d is missing" error as well:
self.failUnless( 'node %s is missing' % id_node_to_remove in self.logged_errors[0], self.logged_errors[0] )
def test_CheckRefcounts(self):
Fforest = larch.open_forest(allow_writes=True,dirname=self.dirname)
Ffsck = larch.fsck.Fsck(Fforest,
warning=self.log_warning, error=self.log_error,
fix = False)
# Populate self.refcounts
Ffsck.run_fsck()
self.failIf( self.logged_warnings )
self.failIf( self.logged_errors )
leafnode = self.get_any_node()
self.failUnless( Ffsck.refcounts[leafnode.id] == 1 , Ffsck.refcounts[leafnode.id] )
# Change a refcount so that it is bad:
Ffsck2 = larch.fsck.Fsck(Fforest,
warning=self.log_warning, error=self.log_error,
fix = False)
self.failUnless( 1 == Fforest.node_store.rs.get_refcount(leafnode.id) )
Fforest.node_store.rs.set_refcount(leafnode.id,2)
Fforest.node_store.rs.save_refcounts()
Fforest.commit()
# Check the refcounts
Ffsck2.run_fsck()
self.failIf( self.logged_warnings )
self.failUnless( 'node %s: refcount is %s but should be %s' % (leafnode.id, 2,1) in self.logged_errors[0], self.logged_errors[0] )
self.failUnless( 2 == Fforest.node_store.rs.get_refcount(leafnode.id) )
# Fix the refcounts:
self.logged_errors = []
self.logged_warnings = []
Ffsck3 = larch.fsck.Fsck(Fforest,
warning=self.log_warning, error=self.log_error,
fix = True)
Ffsck3.run_fsck()
self.failUnless( 'node %s: refcount is %s but should be %s' % (leafnode.id, 2,1) in self.logged_errors[0], self.logged_errors[0] )
self.failUnless( 'node %s: refcount was set to %s' % (leafnode.id, 1) in self.logged_warnings[0], self.logged_warnings[0] )
self.failUnless( 1 == Fforest.node_store.rs.get_refcount(leafnode.id) )
# No more errors expected:
self.logged_errors = []
self.logged_warnings = []
Ffsck4 = larch.fsck.Fsck(Fforest,
warning=self.log_warning, error=self.log_error,
fix = True)
Ffsck4.run_fsck()
self.failIf( self.logged_warnings )
self.failIf( self.logged_errors )
def test_missing_leaf_node(self):
# Let's delete a leaf node
leafnode = self.get_any_node()
self.failUnless(leafnode is not None)
# delete the node:
nodepath = self.forest.node_store.idpath.convert(leafnode.id)
os.unlink( nodepath )
# fsck the forest
Fforest = larch.open_forest(allow_writes=False,dirname=self.dirname)
Ffsck = larch.fsck.Fsck(Fforest,
warning=self.log_warning, error=self.log_error,
fix = False)
Ffsck.run_fsck()
# Make sure that the missing node is noticed
self.failUnless( True in [ 'node %d is missing' % leafnode.id in x for x in self.logged_errors ] )
def test_missing_index_node(self):
# Let's delete a index node
indexnode = self.get_any_node(index_node=True)
self.failUnless(indexnode is not None)
# delete the node:
nodepath = self.forest.node_store.idpath.convert(indexnode.id)
os.unlink( nodepath )
# fsck the forest
Fforest = larch.open_forest(allow_writes=False,dirname=self.dirname)
Ffsck = larch.fsck.Fsck(Fforest,
warning=self.log_warning, error=self.log_error,
fix = False)
Ffsck.run_fsck()
# Make sure that the missing node is noticed
self.failUnless( True in [ 'node %d is missing' % indexnode.id in x for x in self.logged_errors ] )
def test_fsck(self):
Fforest = larch.open_forest(allow_writes=False,dirname=self.dirname)
Ffsck = larch.fsck.Fsck(Fforest,
warning=self.log_warning, error=self.log_error,
fix = False)
# Run fsck
ts = ttystatus.TerminalStatus()
Ffsck.run_fsck(ts=ts)
self.failIf( self.logged_warnings )
self.failIf( self.logged_errors )
#Start the tests
if __name__ == '__main__':
unittest.main()