Files
snapd/tests/lib/tools/tests.device-cgroup
2021-09-16 09:45:47 +02:00

227 lines
6.8 KiB
Python
Executable File

#!/usr/bin/env python3
"""
A tool to convert device cgroup entries to a uniform v1-like format, which is:
<type> <major>:<minor> rwm
"""
import argparse
import ctypes
import json
import logging
import os
import os.path
import subprocess
import sys
def parse_arguments():
parser = argparse.ArgumentParser(
description="tool to control device cgroup settings"
)
parser.add_argument(
"--verbose", help="verbose logging", action="store_true", default=False
)
parser.add_argument(
"snap_app",
help="snap and application in the format <snap>.<app>",
metavar="snap.app",
)
sub = parser.add_subparsers()
dump = sub.add_parser("dump", description="dump cgroup settings")
dump.set_defaults(func="dump")
allow = sub.add_parser("allow", description="allow device access")
allow.add_argument("kind", choices=("c", "b"), help="device type (c)har, (b)lock")
allow.add_argument("major_minor", metavar="major:minor", help="major:minor")
allow.set_defaults(func="allow")
deny = sub.add_parser("deny", description="deny device access")
deny.add_argument("kind", choices=("c", "b"), help="device type (c)har, (b)lock")
deny.add_argument("major_minor", metavar="major:minor", help="major:minor")
deny.set_defaults(func="deny")
opts = parser.parse_args()
if not hasattr(opts, "func"):
parser.print_help()
raise SystemExit(1)
return opts
def is_cgroup_v2():
CGROUP2_SUPER_MAGIC = "0x63677270"
output = subprocess.check_output(
["stat", "-f", "-c", "0x%t", "/sys/fs/cgroup"],
)
fstype = output.decode().strip()
logging.debug("/sys/fs/cgroup FS type %s", fstype)
return fstype == CGROUP2_SUPER_MAGIC
def dump_v1(snap_app):
"""Dump device entries from a v1 device cgroup. Entries are already formatted
in the correct way"""
group_name = "snap.{}".format(snap_app)
with open(
os.path.join("/sys/fs/cgroup/devices/", group_name, "devices.list")
) as inf:
sys.stdout.write(inf.read())
def device_change_v1(snap_app, allow, kind, major_minor):
group_name = "snap.{}".format(snap_app)
devices_control = "devices.allow" if allow else "devices.deny"
with open(
os.path.join("/sys/fs/cgroup/devices/", group_name, devices_control), "w+"
) as outf:
outf.write("{} {} rwm".format(kind, major_minor))
class DeviceCgroupV2Key(ctypes.Structure):
"""A class representing v2 device cgroup key entry, which is 9 bytes long. The
fields must match sc_cgroup_v2_device_key defined in
cmd/snap-confine/device-cgroup-support.c
"""
# UINT32_MAX
DEVICE_MINOR_ANY = 0xFFFFFFFF
_pack_ = 1
_fields_ = [
# type is already a correct v1-like char 'b', 'c'
("_typ", ctypes.c_char),
("major", ctypes.c_uint32),
("minor", ctypes.c_uint32),
]
@property
def typ(self):
return self._typ.decode()
def dump_v2(snap_app):
"""Dump device entries for a v2 device cgroup. Entries are stored in a BPF map
created by snap-confine. We dump its contents using bpftool, and then
convert each entry to a format that looks like v1.
"""
try:
snap, app = snap_app.split(".", maxsplit=1)
except ValueError:
print("error: malformed snap.app name", file=sys.stderr)
raise SystemExit(1)
map_pin_name = "snap_{}_{}".format(snap, app)
output = subprocess.check_output(
[
"bpftool",
"map",
"dump",
"pinned",
"/sys/fs/bpf/snap/{}".format(map_pin_name),
"-j",
],
)
logging.debug("got bpftool output %s", output)
entries = json.loads(output)
for entry in entries:
raw_key = entry.get("key")
if not raw_key:
raise RuntimeError("unexpected object content in {}".format(output))
if len(raw_key) != ctypes.sizeof(DeviceCgroupV2Key):
raise RuntimeError(
"unexpected size of raw key {} (expected {})".format(
len(raw_key), ctypes.sizeof(DeviceCgroupV2Key)
)
)
raw = bytearray([int(v, 16) for v in raw_key])
key = DeviceCgroupV2Key.from_buffer(raw)
# when dumping a cgroup v1 format, the minor number can be presented as
# either the actual value, or a special value '*' which represents any
# minor number, make sure we present the v2 minor number in the same
# format as well
minor = key.minor
if minor == DeviceCgroupV2Key.DEVICE_MINOR_ANY:
minor = "*"
print(
"{typ} {major}:{minor} rwm".format(
typ=key.typ,
major=key.major,
minor=minor,
)
)
def device_change_v2(snap_app, allow, kind, major_minor):
"""Allow or deny a given combination of device type and major:minor in a device
cgroup v2 map.
"""
try:
snap, app = snap_app.split(".", maxsplit=1)
except ValueError:
print("error: malformed snap.app name", file=sys.stderr)
raise SystemExit(1)
try:
major, minor = major_minor.split(":", maxsplit=1)
except ValueError:
print("error: malformed major:minor", file=sys.stderr)
raise SystemExit(1)
map_pin_name = "snap_{}_{}".format(snap, app)
key = DeviceCgroupV2Key()
key.major = int(major)
if minor == "*":
key.minor = DeviceCgroupV2Key.DEVICE_MINOR_ANY
else:
key.minor = int(minor)
key._typ = kind.encode()[0]
key_arg = ["0x%02x" % b for b in bytes(key)]
if allow:
cmd = [
"bpftool",
"map",
"update",
"pinned",
"/sys/fs/bpf/snap/{}".format(map_pin_name),
"key",
]
cmd.extend(key_arg)
cmd.extend(["value", "01", "any"])
else:
cmd = [
"bpftool",
"map",
"delete",
"pinned",
"/sys/fs/bpf/snap/{}".format(map_pin_name),
"key",
]
cmd.extend(key_arg)
logging.debug("running command %s", cmd)
subprocess.check_call(
cmd,
stdout=subprocess.PIPE,
)
def main(opts):
func = opts.func
is_v2 = is_cgroup_v2()
logging.debug("cgroup v2? %s", is_v2)
if func == "dump":
if not is_v2:
dump_v1(opts.snap_app)
else:
dump_v2(opts.snap_app)
elif func == "allow" or func == "deny":
allow_or_deny = func == "allow"
if not is_v2:
device_change_v1(opts.snap_app, allow_or_deny, opts.kind, opts.major_minor)
else:
device_change_v2(opts.snap_app, allow_or_deny, opts.kind, opts.major_minor)
if __name__ == "__main__":
options = parse_arguments()
lvl = logging.INFO
if options.verbose:
lvl = logging.DEBUG
logging.basicConfig(level=lvl)
main(options)