summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLars Wirzenius <liw@liw.fi>2010-07-05 16:05:45 +1200
committerLars Wirzenius <liw@liw.fi>2010-07-05 16:05:45 +1200
commit46a0b61571dc206ce11d9d782601573610aea0a5 (patch)
tree1c74beb7ee9a1d434bf617171359b980f061418a
parent93599799ce66769409e46a7dd7a2670b587146bf (diff)
downloadobnam-46a0b61571dc206ce11d9d782601573610aea0a5.tar.gz
Add --upload-queue-size for tweaking size of upload queue for B-tree nodes.
-rwxr-xr-xblackboxtest6
-rwxr-xr-xdumpobjs3
-rw-r--r--obnamlib/__init__.py1
-rw-r--r--obnamlib/app.py6
-rw-r--r--obnamlib/plugins/backup_plugin.py3
-rw-r--r--obnamlib/plugins/forget_plugin.py3
-rw-r--r--obnamlib/plugins/fsck_plugin.py3
-rw-r--r--obnamlib/plugins/restore_plugin.py3
-rw-r--r--obnamlib/plugins/show_plugin.py3
-rw-r--r--obnamlib/plugins/verify_plugin.py3
-rw-r--r--obnamlib/store.py48
-rw-r--r--obnamlib/store_tests.py33
12 files changed, 76 insertions, 39 deletions
diff --git a/blackboxtest b/blackboxtest
index 35108bed..3621cb42 100755
--- a/blackboxtest
+++ b/blackboxtest
@@ -390,7 +390,8 @@ class ReusesChunks(BlackBoxTest):
self.verify(data, restored)
fsf = obnamlib.VfsFactory()
fs = fsf.new(store)
- s = obnamlib.Store(fs, obnamlib.DEFAULT_NODE_SIZE)
+ s = obnamlib.Store(fs, obnamlib.DEFAULT_NODE_SIZE,
+ obnamlib.DEFAULT_UPLOAD_QUEUE_SIZE)
s.open_host(self.hostid)
self.assertEqual('ReusesChunks', len(s.list_chunks()), 1,
'only one chunk when all files are identical')
@@ -411,7 +412,8 @@ class UsesChunkGroups(BlackBoxTest):
self.verify(data, restored)
fsf = obnamlib.VfsFactory()
fs = fsf.new(store)
- s = obnamlib.Store(fs, obnamlib.DEFAULT_NODE_SIZE)
+ s = obnamlib.Store(fs, obnamlib.DEFAULT_NODE_SIZE,
+ obnamlib.DEFAULT_UPLOAD_QUEUE_SIZE)
s.open_host(self.hostid)
self.assertEqual('UsesChunkGroups', len(s.list_chunk_groups()),
ngroups,
diff --git a/dumpobjs b/dumpobjs
index 4edcd833..b3b5ede3 100755
--- a/dumpobjs
+++ b/dumpobjs
@@ -27,7 +27,8 @@ def find_objids(fs):
fs = obnamlib.LocalFS(sys.argv[1])
-store = obnamlib.Store(fs, obnamlib.DEFAULT_NODE_SIZE)
+store = obnamlib.Store(fs, obnamlib.DEFAULT_NODE_SIZE,
+ obnamlib.DEFAULT_UPLOAD_QUEUE_SIZE)
for objid in find_objids(fs):
obj = store.get_object(objid)
print 'id %s (%s):' % (obj.id, obj.__class__.__name__)
diff --git a/obnamlib/__init__.py b/obnamlib/__init__.py
index 04872c94..c7cd310b 100644
--- a/obnamlib/__init__.py
+++ b/obnamlib/__init__.py
@@ -29,6 +29,7 @@ class Error(Exception):
DEFAULT_NODE_SIZE = 64 * 1024
DEFAULT_CHUNK_SIZE = 4096
DEFAULT_CHUNK_GROUP_SIZE = 16
+DEFAULT_UPLOAD_QUEUE_SIZE = 1024
from sizeparse import SizeSyntaxError, UnitNameError, ByteSizeParser
diff --git a/obnamlib/app.py b/obnamlib/app.py
index e3f299c6..c0172d47 100644
--- a/obnamlib/app.py
+++ b/obnamlib/app.py
@@ -58,6 +58,12 @@ class App(object):
self.config['chunk-group-size'] = \
'%s' % obnamlib.DEFAULT_CHUNK_GROUP_SIZE
+ self.config.new_bytesize(['upload-queue-size'],
+ 'length of upload queue for B-tree nodes '
+ '(default: %default)')
+ self.config['upload-queue-size'] = \
+ '%s' % obnamlib.DEFAULT_UPLOAD_QUEUE_SIZE
+
self.pm = obnamlib.PluginManager()
self.pm.locations = [self.plugins_dir()]
self.pm.plugin_arguments = (self,)
diff --git a/obnamlib/plugins/backup_plugin.py b/obnamlib/plugins/backup_plugin.py
index 77bf270e..ffe0c710 100644
--- a/obnamlib/plugins/backup_plugin.py
+++ b/obnamlib/plugins/backup_plugin.py
@@ -55,7 +55,8 @@ class BackupPlugin(obnamlib.ObnamPlugin):
storepath = self.app.config['store']
logging.debug('store: %s' % storepath)
storefs = self.app.fsf.new(storepath)
- self.store = obnamlib.Store(storefs, self.app.config['node-size'])
+ self.store = obnamlib.Store(storefs, self.app.config['node-size'],
+ self.app.config['upload-queue-size'])
hostname = self.app.config['hostname']
logging.debug('hostname: %s' % hostname)
diff --git a/obnamlib/plugins/forget_plugin.py b/obnamlib/plugins/forget_plugin.py
index f6a1c634..0ecc8bd0 100644
--- a/obnamlib/plugins/forget_plugin.py
+++ b/obnamlib/plugins/forget_plugin.py
@@ -34,7 +34,8 @@ class ForgetPlugin(obnamlib.ObnamPlugin):
self.app.config.require('hostname')
fs = self.app.fsf.new(self.app.config['store'])
- self.store = obnamlib.Store(fs, self.app.config['node-size'])
+ self.store = obnamlib.Store(fs, self.app.config['node-size'],
+ self.app.config['upload-queue-size'])
self.store.lock_host(self.app.config['hostname'])
if args:
diff --git a/obnamlib/plugins/fsck_plugin.py b/obnamlib/plugins/fsck_plugin.py
index 5ecc3bf1..17afed1a 100644
--- a/obnamlib/plugins/fsck_plugin.py
+++ b/obnamlib/plugins/fsck_plugin.py
@@ -31,7 +31,8 @@ class FsckPlugin(obnamlib.ObnamPlugin):
storefs = self.app.fsf.new(self.app.config['store'])
storefs.connect()
- self.store = obnamlib.Store(storefs, self.app.config['node-size'])
+ self.store = obnamlib.Store(storefs, self.app.config['node-size'],
+ self.app.config['node-size'])
self.check_root()
diff --git a/obnamlib/plugins/restore_plugin.py b/obnamlib/plugins/restore_plugin.py
index 11b2a22f..b51d3977 100644
--- a/obnamlib/plugins/restore_plugin.py
+++ b/obnamlib/plugins/restore_plugin.py
@@ -81,7 +81,8 @@ class RestorePlugin(obnamlib.ObnamPlugin):
args = ['/']
storefs = self.app.fsf.new(self.app.config['store'])
- self.store = obnamlib.Store(storefs, self.app.config['node-size'])
+ self.store = obnamlib.Store(storefs, self.app.config['node-size'],
+ self.app.config['node-size'])
self.store.open_host(self.app.config['hostname'])
self.fs = self.app.fsf.new(self.app.config['to'])
diff --git a/obnamlib/plugins/show_plugin.py b/obnamlib/plugins/show_plugin.py
index 3ed00895..d650a828 100644
--- a/obnamlib/plugins/show_plugin.py
+++ b/obnamlib/plugins/show_plugin.py
@@ -40,7 +40,8 @@ class ShowPlugin(obnamlib.ObnamPlugin):
self.app.config.require('store')
self.app.config.require('hostname')
fs = self.app.fsf.new(self.app.config['store'])
- self.store = obnamlib.Store(fs, self.app.config['node-size'])
+ self.store = obnamlib.Store(fs, self.app.config['node-size'],
+ self.app.config['node-size'])
self.store.open_host(self.app.config['hostname'])
def hosts(self, args):
diff --git a/obnamlib/plugins/verify_plugin.py b/obnamlib/plugins/verify_plugin.py
index c24b8bfd..7ae3f4a3 100644
--- a/obnamlib/plugins/verify_plugin.py
+++ b/obnamlib/plugins/verify_plugin.py
@@ -49,7 +49,8 @@ class VerifyPlugin(obnamlib.ObnamPlugin):
fs = self.app.fsf.new(self.app.config['store'])
fs.connect()
- self.store = obnamlib.Store(fs, self.app.config['node-size'])
+ self.store = obnamlib.Store(fs, self.app.config['node-size'],
+ self.app.config['node-size'])
self.store.open_host(self.app.config['hostname'])
self.fs = self.app.fsf.new(self.app.config['root'][0])
self.fs.connect()
diff --git a/obnamlib/store.py b/obnamlib/store.py
index dc93299e..903c0044 100644
--- a/obnamlib/store.py
+++ b/obnamlib/store.py
@@ -94,8 +94,9 @@ def decode_metadata(encoded):
class NodeStoreVfs(btree.NodeStoreDisk):
- def __init__(self, fs, dirname, node_size, codec):
- btree.NodeStoreDisk.__init__(self, dirname, node_size, codec)
+ def __init__(self, fs, dirname, node_size, codec, upload_queue_size):
+ btree.NodeStoreDisk.__init__(self, dirname, node_size, codec,
+ upload_max=upload_queue_size)
self.fs = fs
def mkdir(self, dirname):
@@ -125,11 +126,12 @@ class StoreTree(object):
'''A B-tree within a Store.'''
- def __init__(self, fs, dirname, key_bytes, node_size):
+ def __init__(self, fs, dirname, key_bytes, node_size, upload_queue_size):
self.fs = fs
self.dirname = dirname
self.key_bytes = key_bytes
self.node_size = node_size
+ self.upload_queue_size = upload_queue_size
self.forest = None
def init_forest(self):
@@ -137,7 +139,8 @@ class StoreTree(object):
if not self.fs.exists(self.dirname):
return False
codec = btree.NodeCodec(self.key_bytes)
- ns = NodeStoreVfs(self.fs, self.dirname, self.node_size, codec)
+ ns = NodeStoreVfs(self.fs, self.dirname, self.node_size, codec,
+ self.upload_queue_size)
self.forest = btree.Forest(ns)
return True
@@ -159,8 +162,9 @@ class HostList(StoreTree):
key_bytes = 8 # 64-bit counter as key
- def __init__(self, fs, node_size):
- StoreTree.__init__(self, fs, 'hostlist', self.key_bytes, node_size)
+ def __init__(self, fs, node_size, upload_queue_size):
+ StoreTree.__init__(self, fs, 'hostlist', self.key_bytes, node_size,
+ upload_queue_size)
self.minkey = self.key(0)
self.maxkey = self.key(2**64-1)
@@ -253,10 +257,11 @@ class GenerationStore(StoreTree):
FILE_NAME = 0
FILE_METADATA = 1
- def __init__(self, fs, hostname, node_size):
+ def __init__(self, fs, hostname, node_size, upload_queue_size):
key_bytes = len(self.key('', 0, 0))
# FIXME: We should handle evil hostnames.
- StoreTree.__init__(self, fs, hostname, key_bytes, node_size)
+ StoreTree.__init__(self, fs, hostname, key_bytes, node_size,
+ upload_queue_size)
self.curgen = None
def hash_name(self, filename):
@@ -473,10 +478,12 @@ class ChecksumTree(StoreTree):
'''
- def __init__(self, fs, name, checksum_length, node_size):
+ def __init__(self, fs, name, checksum_length, node_size,
+ upload_queue_size):
self.sumlen = checksum_length
key_bytes = len(self.key('', 0))
- StoreTree.__init__(self, fs, name, key_bytes, node_size)
+ StoreTree.__init__(self, fs, name, key_bytes, node_size,
+ upload_queue_size)
self.max_id = 2**64 - 1
def key(self, checksum, number):
@@ -522,9 +529,9 @@ class ChunkGroupTree(StoreTree):
# We store things using the chunk group id as tkey key. The ids of
# the chunks are stored as the value, as a blob, using struct.
- def __init__(self, fs, node_size):
+ def __init__(self, fs, node_size, upload_queue_size):
StoreTree.__init__(self, fs, 'chunkgroups',
- len(self.key(0)), node_size)
+ len(self.key(0)), node_size, upload_queue_size)
self.max_id = 2**64 - 1
def key(self, cgid):
@@ -617,11 +624,12 @@ class Store(object):
'''
- def __init__(self, fs, node_size):
+ def __init__(self, fs, node_size, upload_queue_size):
self.fs = fs
self.node_size = node_size
+ self.upload_queue_size = upload_queue_size
self.got_root_lock = False
- self.hostlist = HostList(fs, node_size)
+ self.hostlist = HostList(fs, node_size, upload_queue_size)
self.got_host_lock = False
self.host_lockfile = None
self.current_host = None
@@ -631,10 +639,10 @@ class Store(object):
self.removed_generations = []
self.genstore = None
self.chunksums = ChecksumTree(fs, 'chunksums', len(self.checksum('')),
- node_size)
+ node_size, upload_queue_size)
self.groupsums = ChecksumTree(fs, 'groupsums', len(self.checksum('')),
- node_size)
- self.chunkgroups = ChunkGroupTree(fs, node_size)
+ node_size, upload_queue_size)
+ self.chunkgroups = ChunkGroupTree(fs, node_size, upload_queue_size)
self.prev_chunkid = None
def checksum(self, data):
@@ -742,7 +750,8 @@ class Store(object):
self.current_host = hostname
self.added_generations = []
self.removed_generations = []
- self.genstore = GenerationStore(self.fs, hostname, self.node_size)
+ self.genstore = GenerationStore(self.fs, hostname, self.node_size,
+ self.upload_queue_size)
self.genstore.require_forest()
@require_host_lock
@@ -778,7 +787,8 @@ class Store(object):
if hostname not in self.list_hosts():
raise obnamlib.Error('%s is not an existing host' % hostname)
self.current_host = hostname
- self.genstore = GenerationStore(self.fs, hostname, self.node_size)
+ self.genstore = GenerationStore(self.fs, hostname, self.node_size,
+ self.upload_queue_size)
self.genstore.init_forest()
@require_open_host
diff --git a/obnamlib/store_tests.py b/obnamlib/store_tests.py
index 3c399b3a..38b8daef 100644
--- a/obnamlib/store_tests.py
+++ b/obnamlib/store_tests.py
@@ -31,7 +31,8 @@ class ChecksumTreeTests(unittest.TestCase):
fs = obnamlib.LocalFS(self.tempdir)
self.checksum = hashlib.md5('foo').digest()
self.tree = obnamlib.store.ChecksumTree(fs, 'x', len(self.checksum),
- obnamlib.DEFAULT_NODE_SIZE)
+ obnamlib.DEFAULT_NODE_SIZE,
+ obnamlib.DEFAULT_UPLOAD_QUEUE_SIZE)
def tearDown(self):
self.tree.commit()
@@ -69,7 +70,8 @@ class ChunkGroupTreeTests(unittest.TestCase):
self.tempdir = tempfile.mkdtemp()
fs = obnamlib.LocalFS(self.tempdir)
self.tree = obnamlib.store.ChunkGroupTree(fs,
- obnamlib.DEFAULT_NODE_SIZE)
+ obnamlib.DEFAULT_NODE_SIZE,
+ obnamlib.DEFAULT_UPLOAD_QUEUE_SIZE)
def tearDown(self):
shutil.rmtree(self.tempdir)
@@ -116,10 +118,12 @@ class StoreRootNodeTests(unittest.TestCase):
self.tempdir = tempfile.mkdtemp()
self.fs = obnamlib.LocalFS(self.tempdir)
- self.store = obnamlib.Store(self.fs, obnamlib.DEFAULT_NODE_SIZE)
+ self.store = obnamlib.Store(self.fs, obnamlib.DEFAULT_NODE_SIZE,
+ obnamlib.DEFAULT_UPLOAD_QUEUE_SIZE)
self.otherfs = obnamlib.LocalFS(self.tempdir)
- self.other = obnamlib.Store(self.fs, obnamlib.DEFAULT_NODE_SIZE)
+ self.other = obnamlib.Store(self.fs, obnamlib.DEFAULT_NODE_SIZE,
+ obnamlib.DEFAULT_UPLOAD_QUEUE_SIZE)
def tearDown(self):
shutil.rmtree(self.tempdir)
@@ -183,7 +187,8 @@ class StoreRootNodeTests(unittest.TestCase):
self.store.lock_root()
self.store.add_host('foo')
self.store.commit_root()
- s2 = obnamlib.Store(self.fs, obnamlib.DEFAULT_NODE_SIZE)
+ s2 = obnamlib.Store(self.fs, obnamlib.DEFAULT_NODE_SIZE,
+ obnamlib.DEFAULT_UPLOAD_QUEUE_SIZE)
self.assertEqual(s2.list_hosts(), ['foo'])
def test_adding_existing_host_fails(self):
@@ -248,13 +253,15 @@ class StoreHostTests(unittest.TestCase):
self.tempdir = tempfile.mkdtemp()
self.fs = obnamlib.LocalFS(self.tempdir)
- self.store = obnamlib.Store(self.fs, obnamlib.DEFAULT_NODE_SIZE)
+ self.store = obnamlib.Store(self.fs, obnamlib.DEFAULT_NODE_SIZE,
+ obnamlib.DEFAULT_UPLOAD_QUEUE_SIZE)
self.store.lock_root()
self.store.add_host('hostname')
self.store.commit_root()
self.otherfs = obnamlib.LocalFS(self.tempdir)
- self.other = obnamlib.Store(self.otherfs, obnamlib.DEFAULT_NODE_SIZE)
+ self.other = obnamlib.Store(self.otherfs, obnamlib.DEFAULT_NODE_SIZE,
+ obnamlib.DEFAULT_UPLOAD_QUEUE_SIZE)
self.dir_meta = obnamlib.Metadata()
self.dir_meta.st_mode = stat.S_IFDIR | 0777
@@ -502,7 +509,8 @@ class StoreChunkTests(unittest.TestCase):
self.tempdir = tempfile.mkdtemp()
self.fs = obnamlib.LocalFS(self.tempdir)
- self.store = obnamlib.Store(self.fs, obnamlib.DEFAULT_NODE_SIZE)
+ self.store = obnamlib.Store(self.fs, obnamlib.DEFAULT_NODE_SIZE,
+ obnamlib.DEFAULT_UPLOAD_QUEUE_SIZE)
self.store.lock_root()
self.store.add_host('hostname')
self.store.commit_root()
@@ -561,7 +569,8 @@ class StoreChunkGroupTests(unittest.TestCase):
self.tempdir = tempfile.mkdtemp()
self.fs = obnamlib.LocalFS(self.tempdir)
- self.store = obnamlib.Store(self.fs, obnamlib.DEFAULT_NODE_SIZE)
+ self.store = obnamlib.Store(self.fs, obnamlib.DEFAULT_NODE_SIZE,
+ obnamlib.DEFAULT_UPLOAD_QUEUE_SIZE)
self.store.lock_root()
self.store.add_host('hostname')
self.store.commit_root()
@@ -609,7 +618,8 @@ class StoreGetSetChunksAndGroupsTests(unittest.TestCase):
self.tempdir = tempfile.mkdtemp()
self.fs = obnamlib.LocalFS(self.tempdir)
- self.store = obnamlib.Store(self.fs, obnamlib.DEFAULT_NODE_SIZE)
+ self.store = obnamlib.Store(self.fs, obnamlib.DEFAULT_NODE_SIZE,
+ obnamlib.DEFAULT_UPLOAD_QUEUE_SIZE)
self.store.lock_root()
self.store.add_host('hostname')
self.store.commit_root()
@@ -645,7 +655,8 @@ class StoreGenspecTests(unittest.TestCase):
storedir = os.path.join(self.tempdir, 'store')
fs = obnamlib.VfsFactory().new(storedir)
- self.store = obnamlib.Store(fs, obnamlib.DEFAULT_NODE_SIZE)
+ self.store = obnamlib.Store(fs, obnamlib.DEFAULT_NODE_SIZE,
+ obnamlib.DEFAULT_UPLOAD_QUEUE_SIZE)
self.store.lock_host('hostname')
def tearDown(self):