btv/btv

628 lines
14 KiB
Python
Executable file

#! /usr/bin/env python3
# encoding: utf-8
import configparser
from contextlib import contextmanager
from dataclasses import dataclass
import datetime
import json
import os
import shlex
import sys
import time
import socket
import urllib.request
CONFIG = "/etc/btv/config.ini"
LOCKFILE = "/run/lock/btv/serialization.lock"
# ------------------------------------------------------------------------------
# Global
cfg = configparser.ConfigParser()
cfg.read(CONFIG)
class UsageError(Exception):
pass
# ------------------------------------------------------------------------------
# Aux
@contextmanager
def chdir(new_dir):
previous_dir = os.getcwd()
os.chdir(new_dir)
try:
yield
finally:
os.chdir(previous_dir)
@contextmanager
def lockfile(path):
d = os.path.dirname(path)
if not os.path.exists(d):
os.makedirs(d)
f = open(path, "w")
try:
yield
finally:
os.remove(path)
# ------------------------------------------------------------------------------
# Functions
def verify_keyfile():
"""
Throws a RuntimeError if the keyfile does not exist, is empty, or has the
wrong set of permissions.
"""
f = cfg.get("crypto", "keyfile")
if not os.path.exists(f):
raise RuntimeError("keyfile does not exist")
s = os.stat(f)
if s.st_size < 5:
raise RuntimeError("password in the keyfile is too short")
if s.st_uid != 0:
raise RuntimeError("keyfile is not owned by root")
if s.st_mode & 0b100111111 != 256:
raise RuntimeError("keyfile is readable by others")
def verify_user():
"""
Throws a RuntimeError if the user running the program is not root.
"""
if os.getuid() != 0:
raise RuntimeError("this can only be used by root")
@dataclass
class Snapshot:
name: str
rank: int
def __post_init__(self):
self.path = os.path.join(cfg.get("snap", "dir"), self.name)
self.subvolumes = set()
self.notes = set()
@classmethod
def load(cls, path):
with open(os.path.join(path, ".meta")) as f:
meta = json.load(f)
snap = cls(meta["name"], meta["rank"])
snap.subvolumes.update(meta["subvolumes"])
snap.notes.update(meta["notes"])
snap.path = path
snap.check()
return snap
def dump(self):
if not os.path.exists(self.path):
os.makedirs(self.path)
with open(os.path.join(self.path, ".meta"), "w") as f:
meta = {
"name": self.name,
"rank": self.rank,
"subvolumes": sorted(self.subvolumes),
"notes": sorted(self.notes)
}
json.dump(meta, f)
def check(self):
## verify all declared subvolumes exist
contents = set(os.listdir(self.path))
if self.subvolumes - contents:
self.notes.add("missing subvolumes")
def drop(self):
for subvolume in self.subvolumes:
btrfs_snap = os.path.join(self.path, subvolume)
if not os.path.exists(btrfs_snap):
print(" !! subvolume not found: %s" %(btrfs_snap))
return 1
error = os.system(shlex.join(("btrfs", "subvolume", "delete", btrfs_snap)))
if error:
print(" !! error %d" %(error))
return error
os.remove(os.path.join(self.path, ".meta"))
os.rmdir(self.path)
return 0
def get_snapshot_by_name(name):
"""
Returns a Snapshot with the requested name iff such exists. Otherwise,
returns None.
"""
rootdir = cfg.get("snap", "dir")
try:
return Snapshot.load(os.path.join(rootdir, name))
except FileNotFoundError:
pass
def list_snapshots(min_rank=0):
"""
Returns a list of all existing Snapshots sorted oldest to newest.
"""
snaps = []
rootdir = cfg.get("snap", "dir")
for name in os.listdir(rootdir):
s = get_snapshot_by_name(name)
if s and s.rank >= min_rank:
snaps.append(s)
return snaps
def delete_snapshot(snap):
"""
Drops a snapshot referenced by the object.
"""
def serialize(snap, outdir, key, snap_from=None):
"""
Serializes a snapshot (differentially if snap_from is passed, fully
otherwise) into a new directory under outdir.
Key is the path to an openssl-style passphrase file.
Snap and snap_from are Snapshot objects.
A lockfile is kept for the duration of the process.
Returns 0 on success.
"""
## prepare directories
##
if snap_from:
name = "%s diff from %s" %(snap.name, snap_from.name)
else:
name = "%s full" %(snap.name)
directory = os.path.join(outdir, name)
os.makedirs(directory)
## preflight checks
##
if snap_from:
# I don't want to handle (dis)appearing subvolumes
if not snap.subvolumes == snap_from.subvolumes:
print(" !! snapshots have different subvolumes; please do a full stream")
print(" %s: %s" %(snap.name, " ".join(snap.subvolumes)))
print(" %s: %s" %(snap_from.name, " ".join(snap_from.subvolumes)))
return 1
## serialization (most expensive)
##
with lockfile(LOCKFILE):
for subvolume in snap.subvolumes:
if snap_from:
btrfs_send = 'btrfs send -p "%s" "%s"' %(
os.path.join(snap_from.path, subvolume),
os.path.join(snap.path, subvolume)
)
else:
btrfs_send = 'btrfs send "%s"' %(
os.path.join(snap.path, subvolume)
)
error = os.system('%s | zstd | openssl enc -e -aes-256-cbc -pbkdf2 -salt -pass "file:%s" | hash-pipe sha512 "%s.btrfs.zst.aes" "%s/manifest.sha512" > "%s.btrfs.zst.aes"' %(
btrfs_send,
cfg.get("crypto", "keyfile"),
subvolume,
directory,
os.path.join(directory, subvolume)
))
if error:
print(" !! failed to serialize %s" %(subvolume))
return error
## final touches
##
## add a self-check executable
with open(os.path.join(directory, "check-integrity.sh"), "w") as f:
f.write("#! /bin/sh\n\nsha512sum --check manifest.sha512\n")
os.chmod(f.name, 0o555)
## fix permissions and ownership of created objects
outdir_stat = os.stat(outdir)
os.chown(directory, outdir_stat.st_uid, outdir_stat.st_gid)
for file in os.listdir(directory):
path = os.path.join(directory, file)
os.chown(path, outdir_stat.st_uid, outdir_stat.st_gid)
if path.endswith(".aes") or path.endswith(".sha512"):
os.chmod(path, 0o100)
return 0
def ping(url):
try:
urllib.request.urlopen(url, timeout=10)
except socket.error as e:
print("Ping failed: %s" %(e))
# ------------------------------------------------------------------------------
# Verbs
def do_create(args):
"""
Bulk of the work happens here.
- makes snapshots
- calls serialization if needed
- calls gc if needed
"""
snapshot = Snapshot(
datetime.datetime.now().strftime("%Y-%m-%d.%H%M%S"),
0
)
## determine the rank of the new snapshot
##
if "--rank2" in args:
snapshot.rank = 2
else:
snaps_since_rank_1 = 1
snaps_since_rank_2 = 1
for snap in list_snapshots():
if snap.rank == 1:
snaps_since_rank_1 = 1
else:
snaps_since_rank_1 += 1
if snap.rank == 2:
snaps_since_rank_1 = 1
snaps_since_rank_2 = 1
else:
snaps_since_rank_2 += 1
## promote the snapshot
if snaps_since_rank_2 >= cfg.getint("snap", "rank_2_interval"):
if "--process" in args:
snapshot.rank = 2
else:
print("!!! Rank 2 snapshot is due, please enable --process")
snapshot.rank = 1
elif snaps_since_rank_1 >= cfg.getint("snap", "rank_1_interval"):
snapshot.rank = 1
ping_url = cfg.get("monitoring", "rank2_start_url")
if snapshot.rank == 2 and ping_url:
ping(ping_url)
## create the snapshot itself
##
os.makedirs(snapshot.path)
ignore_prefix = cfg.get("subvol", "ignore_prefix")
with chdir(cfg.get("subvol", "dir")):
for subvolume in os.listdir():
if subvolume.startswith(ignore_prefix):
continue
os.system(shlex.join((
"btrfs", "subvolume", "snapshot", "-r",
subvolume,
os.path.join(snapshot.path, subvolume)
)))
snapshot.subvolumes.add(subvolume)
## do optional processing
##
if snapshot.rank == 2:
## snapshot serialization
# if there's a previous Rank 2 snapshot, compute a diff against it
# if not or if the process fails, demote this snap to Rank 1
prev_r2_snaps = list_snapshots(2)
if prev_r2_snaps:
prev_r2_snap = prev_r2_snaps[-1]
print(">>> doing a diff against snapshot %s" %(prev_r2_snap.name))
error = serialize(
snapshot,
cfg.get("storage", "dir"),
cfg.get("crypto", "keyfile"),
prev_r2_snap
)
if error:
print("!!! serialization failed, demoting snapshot to Rank 1")
snapshot.rank = 1
else:
print("!!! no previous Rank 2 snapshot, please create one using btv stream")
snapshot.rank = 1
## save all snapshot metadata
snapshot.dump()
print(">>> Snapshot created: rank %d %s" %(snapshot.rank, snapshot.name))
## garbage collection
do_gc()
ping_url = cfg.get("monitoring", "rank2_end_url")
if snapshot.rank == 2 and ping_url:
ping(ping_url)
def do_list(args):
"""
Print a list of existing snapshots.
"""
snaps = list_snapshots()
counts = [
len([s for s in snaps if s.rank == 0]),
len([s for s in snaps if s.rank == 1]),
len([s for s in snaps if s.rank == 2])
]
print(" > %d/%d Rank 0 snapshots" %(counts[0], cfg.getint("gc", "rank_0_count")))
print(" > %d/%d Rank 1 snapshots" %(counts[1], cfg.getint("gc", "rank_1_count")))
print(" > %d/%d Rank 2 snapshots" %(counts[2], cfg.getint("gc", "rank_2_count")))
print()
print("Rank Name")
for snap in snaps:
print(" %d %s %s" %(snap.rank, snap.name, ", ".join(snap.notes)))
def do_drop(args):
"""
Drop a snapshot in args[0].
"""
snap = get_snapshot_by_name(args[0])
if not snap:
print("!!! %s is not a snapshot" %(args[0]))
sys.exit(2)
error = snap.drop()
if error:
print("!!! failed to drop snapshot %s" %(snap.name))
def do_stream(args):
"""
args are either
"diff" SNAPSHOT_FROM SNAPSHOT_TO OUTPUT_DIR
or
"full" SNAPSHOT OUTPUT_DIR
SNAPSHOT_FROM must be Rank 2.
Streams the full or diff snapshot into OUTPUT_DIR.
The SNAPSHOT or SNAPSHOT_TO is then promoted to Rank 2.
"""
## args interpretation and validation
##
if not args:
raise UsageError("no args")
if args[0] == "diff":
if len(args) != 4:
raise UsageError("'stream diff' requires exactly 3 arguments")
snap_from_name, snap_name, output_dir = args[1:]
snap_from = get_snapshot_by_name(snap_from_name)
if not snap_from:
print("!!! %s is not a snapshot" %(snap_from_name))
sys.exit(2)
if snap_from.rank != 2:
print("!!! source snapshot must be Rank 2, %s is %d" %(snap_from.name, snap_from.rank))
print(' > Hint: btv stream full %s "%s"' %(snap_from.name, output_dir))
sys.exit(3)
elif args[0] == "full":
if len(args) != 3:
raise UsageError("'stream full' requires exactly 2 arguments")
snap_name, output_dir = args[1:]
snap_from = None
else:
raise UsageError("'stream' type is either 'full' or 'diff'")
snap = get_snapshot_by_name(snap_name)
if not snap:
print("!!! %s is not a snapshot" %(snap_name))
sys.exit(2)
if not os.path.isdir(output_dir):
print("!!! %s is not a directory" %(output_dir))
sys.exit(2)
if snap_from and snap_from.name >= snap.name:
print("!!! source snapshot is younger than target snapshot")
sys.exit(3)
## serialization work
##
if snap_from:
comment = "diff %s -> %s" %(snap_from.name, snap.name)
else:
comment = "full %s" %(snap.name)
print(">>> Serializing %s into %s" %(comment, output_dir))
error = serialize(
snap,
output_dir,
cfg.get("crypto", "keyfile"),
snap_from
)
if error:
print("!!! serialization failed")
else:
snap.rank = 2
snap.notes.add("manually streamed")
if not snap_from:
snap.notes.add("fully streamed")
snap.dump()
print("<<< %s serialized and promoted to Rank 2" %(snap.name))
def do_gc(args=None):
"""
Drops old snapshots.
If the only arg is "greedy", drops ALL snapshots except the youngest
Rank 2. If it's a number, drops that many oldest snapshots.
"""
if args:
if args[0] == "greedy":
newest = list_snapshots(2)[-1]
if newest:
print(">>> Dropping all snapshots except the newest Rank 2 in 5 s...")
time.sleep(5)
for snap in list_snapshots():
if snap.name != newest.name:
snap.drop()
else:
print("!!! no Rank 2 snapshot exists")
sys.exit(1)
else:
try:
count = int(args[0])
except ValueError:
print("!!! %s is not a count of snapshots to delete" %(args[0]))
sys.exit(1)
snaps = list_snapshots()[:count]
for snap in snaps:
print(" %d %s %s" %(snap.rank, snap.name, ", ".join(snap.notes)))
print(">>> These snapshots will be dropped in 5 s...")
time.sleep(5)
for snap in snaps:
snap.drop()
else:
counts = [0, 0, 0] # Rank 0, 1, 2
limits = [
cfg.getint("gc", "rank_0_count"),
cfg.getint("gc", "rank_1_count"),
cfg.getint("gc", "rank_2_count")
]
for snap in reversed(list_snapshots()):
counts[snap.rank] += 1
if counts[snap.rank] > limits[snap.rank]:
print(" >> delete Rank %d %s" %(snap.rank, snap.name))
snap.drop()
verb_router = {
"snapshot": do_create,
"list": do_list,
"drop": do_drop,
"gc": do_gc,
"stream": do_stream
}
# ------------------------------------------------------------------------------
# Entry point
def print_help():
print("""Usage:
{cmd} VERB [args]
Verbs:
snapshot [--process] [--rank2]
Create a snapshot. If --process is passed, do all optional work. If
--rank2 is passed, the new snapshot is automatically promoted to Rank 2
and --process is implied.
stream full SNAPSHOT OUTPUT_DIR
Streams the SNAPSHOT into OUTPUT_DIR as a full (milestone) bin.
stream diff SNAPSHOT_FROM SNAPSHOT_TO OUTPUT_DIR
Streams the difference between SNAPSHOTs into OUTPUT_DIR as a diff bin.
list
List snapshots and their rank.
drop SNAPSHOT
Drops (removes) SNAPSHOT. Use the full name as provided by {cmd} list.
gc
Drops old local snapshots based on garbage collector settings.
gc greedy
Drop ALL snapshots except the newest Rank 2.
gc COUNT
Drop COUNT oldest snapshots.
""".format(cmd=sys.argv[0]))
if __name__ == "__main__":
try:
verb = sys.argv[1].lower()
assert verb in verb_router
except:
print_help()
sys.exit(1)
verify_user()
verify_keyfile()
try:
verb_router[verb](sys.argv[2:])
except UsageError as e:
print("!!! %s" %(e))
print()
print_help()
sys.exit(1)