1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
# Copyright 2017 Lars Wirzenius
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# =*= License: GPL-3+ =*=
import os
import stat
import cliapp
import vmdb
class PartitionPlugin(cliapp.Plugin):
def enable(self):
self.app.step_runners.add(MklabelStepRunner())
self.app.step_runners.add(MkpartStepRunner())
self.app.step_runners.add(KpartxStepRunner())
class MklabelStepRunner(vmdb.StepRunnerInterface):
def get_required_keys(self):
return ['mklabel', 'device']
def run(self, step, settings, state):
label_type = step['mklabel']
device = step['device']
vmdb.runcmd(['parted', '-s', device, 'mklabel', label_type])
class MkpartStepRunner(vmdb.StepRunnerInterface):
def get_required_keys(self):
return ['mkpart', 'device', 'start', 'end', 'tag']
def run(self, step, settings, state):
part_type = step['mkpart']
device = step['device']
start = step['start']
end = step['end']
tag = step.get('tag')
if tag is None:
tag = step['part-tag']
fs_type = step.get('fs-type', 'ext2')
orig = self.list_partitions(device)
vmdb.runcmd(['parted', '-s', device, 'mkpart', part_type, fs_type, start, end])
state.tags.append(tag)
if self.is_block_dev(device):
new = self.list_partitions(device)
diff = self.diff_partitions(orig, new)
assert len(diff) == 1
vmdb.progress('remembering partition {} as {}'.format(diff[0], tag))
state.tags.set_dev(tag, diff[0])
def is_block_dev(self, filename):
st = os.lstat(filename)
return stat.S_ISBLK(st.st_mode)
def list_partitions(self, device):
output = vmdb.runcmd(['parted', '-m', device, 'print'])
output = output.decode('UTF-8')
partitions = [
line.split(':')[0]
for line in output.splitlines()
if ':' in line
]
return [
word if word.startswith('/') else '{}{}'.format(device, word)
for word in partitions
]
def diff_partitions(self, old, new):
return [
line
for line in new
if line not in old
]
class KpartxStepRunner(vmdb.StepRunnerInterface):
def get_required_keys(self):
return ['kpartx']
def run(self, step, settings, state):
device = step['kpartx']
tags = state.tags.get_tags()
devs = self.kpartx(device)
for tag, dev in zip(tags, devs):
vmdb.progress('remembering {} as {}'.format(dev, tag))
state.tags.set_dev(tag, dev)
def kpartx(self, device):
output = vmdb.runcmd(['kpartx', '-asv', device]).decode('UTF-8')
for line in output.splitlines():
words = line.split()
if words[0] == 'add':
name = words[2]
yield '/dev/mapper/{}'.format(name)
def teardown(self, step, settings, state):
device = step['kpartx']
vmdb.runcmd(['kpartx', '-dsv', device])
|