mirror of
https://github.com/token2/snapd.git
synced 2026-03-13 11:15:47 -07:00
227 lines
6.8 KiB
Python
Executable File
227 lines
6.8 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
"""
|
|
A tool to convert device cgroup entries to a uniform v1-like format, which is:
|
|
<type> <major>:<minor> rwm
|
|
"""
|
|
|
|
import argparse
|
|
import ctypes
|
|
import json
|
|
import logging
|
|
import os
|
|
import os.path
|
|
import subprocess
|
|
import sys
|
|
|
|
|
|
def parse_arguments():
|
|
parser = argparse.ArgumentParser(
|
|
description="tool to control device cgroup settings"
|
|
)
|
|
parser.add_argument(
|
|
"--verbose", help="verbose logging", action="store_true", default=False
|
|
)
|
|
parser.add_argument(
|
|
"snap_app",
|
|
help="snap and application in the format <snap>.<app>",
|
|
metavar="snap.app",
|
|
)
|
|
sub = parser.add_subparsers()
|
|
dump = sub.add_parser("dump", description="dump cgroup settings")
|
|
dump.set_defaults(func="dump")
|
|
allow = sub.add_parser("allow", description="allow device access")
|
|
allow.add_argument("kind", choices=("c", "b"), help="device type (c)har, (b)lock")
|
|
allow.add_argument("major_minor", metavar="major:minor", help="major:minor")
|
|
allow.set_defaults(func="allow")
|
|
deny = sub.add_parser("deny", description="deny device access")
|
|
deny.add_argument("kind", choices=("c", "b"), help="device type (c)har, (b)lock")
|
|
deny.add_argument("major_minor", metavar="major:minor", help="major:minor")
|
|
deny.set_defaults(func="deny")
|
|
|
|
opts = parser.parse_args()
|
|
if not hasattr(opts, "func"):
|
|
parser.print_help()
|
|
raise SystemExit(1)
|
|
return opts
|
|
|
|
|
|
def is_cgroup_v2():
|
|
CGROUP2_SUPER_MAGIC = "0x63677270"
|
|
output = subprocess.check_output(
|
|
["stat", "-f", "-c", "0x%t", "/sys/fs/cgroup"],
|
|
)
|
|
fstype = output.decode().strip()
|
|
logging.debug("/sys/fs/cgroup FS type %s", fstype)
|
|
return fstype == CGROUP2_SUPER_MAGIC
|
|
|
|
|
|
def dump_v1(snap_app):
|
|
"""Dump device entries from a v1 device cgroup. Entries are already formatted
|
|
in the correct way"""
|
|
group_name = "snap.{}".format(snap_app)
|
|
with open(
|
|
os.path.join("/sys/fs/cgroup/devices/", group_name, "devices.list")
|
|
) as inf:
|
|
sys.stdout.write(inf.read())
|
|
|
|
|
|
def device_change_v1(snap_app, allow, kind, major_minor):
|
|
group_name = "snap.{}".format(snap_app)
|
|
devices_control = "devices.allow" if allow else "devices.deny"
|
|
with open(
|
|
os.path.join("/sys/fs/cgroup/devices/", group_name, devices_control), "w+"
|
|
) as outf:
|
|
outf.write("{} {} rwm".format(kind, major_minor))
|
|
|
|
|
|
class DeviceCgroupV2Key(ctypes.Structure):
|
|
"""A class representing v2 device cgroup key entry, which is 9 bytes long. The
|
|
fields must match sc_cgroup_v2_device_key defined in
|
|
cmd/snap-confine/device-cgroup-support.c
|
|
"""
|
|
|
|
# UINT32_MAX
|
|
DEVICE_MINOR_ANY = 0xFFFFFFFF
|
|
|
|
_pack_ = 1
|
|
_fields_ = [
|
|
# type is already a correct v1-like char 'b', 'c'
|
|
("_typ", ctypes.c_char),
|
|
("major", ctypes.c_uint32),
|
|
("minor", ctypes.c_uint32),
|
|
]
|
|
|
|
@property
|
|
def typ(self):
|
|
return self._typ.decode()
|
|
|
|
|
|
def dump_v2(snap_app):
|
|
"""Dump device entries for a v2 device cgroup. Entries are stored in a BPF map
|
|
created by snap-confine. We dump its contents using bpftool, and then
|
|
convert each entry to a format that looks like v1.
|
|
"""
|
|
try:
|
|
snap, app = snap_app.split(".", maxsplit=1)
|
|
except ValueError:
|
|
print("error: malformed snap.app name", file=sys.stderr)
|
|
raise SystemExit(1)
|
|
map_pin_name = "snap_{}_{}".format(snap, app)
|
|
output = subprocess.check_output(
|
|
[
|
|
"bpftool",
|
|
"map",
|
|
"dump",
|
|
"pinned",
|
|
"/sys/fs/bpf/snap/{}".format(map_pin_name),
|
|
"-j",
|
|
],
|
|
)
|
|
logging.debug("got bpftool output %s", output)
|
|
entries = json.loads(output)
|
|
for entry in entries:
|
|
raw_key = entry.get("key")
|
|
if not raw_key:
|
|
raise RuntimeError("unexpected object content in {}".format(output))
|
|
if len(raw_key) != ctypes.sizeof(DeviceCgroupV2Key):
|
|
raise RuntimeError(
|
|
"unexpected size of raw key {} (expected {})".format(
|
|
len(raw_key), ctypes.sizeof(DeviceCgroupV2Key)
|
|
)
|
|
)
|
|
raw = bytearray([int(v, 16) for v in raw_key])
|
|
key = DeviceCgroupV2Key.from_buffer(raw)
|
|
# when dumping a cgroup v1 format, the minor number can be presented as
|
|
# either the actual value, or a special value '*' which represents any
|
|
# minor number, make sure we present the v2 minor number in the same
|
|
# format as well
|
|
minor = key.minor
|
|
if minor == DeviceCgroupV2Key.DEVICE_MINOR_ANY:
|
|
minor = "*"
|
|
print(
|
|
"{typ} {major}:{minor} rwm".format(
|
|
typ=key.typ,
|
|
major=key.major,
|
|
minor=minor,
|
|
)
|
|
)
|
|
|
|
|
|
def device_change_v2(snap_app, allow, kind, major_minor):
|
|
"""Allow or deny a given combination of device type and major:minor in a device
|
|
cgroup v2 map.
|
|
"""
|
|
try:
|
|
snap, app = snap_app.split(".", maxsplit=1)
|
|
except ValueError:
|
|
print("error: malformed snap.app name", file=sys.stderr)
|
|
raise SystemExit(1)
|
|
try:
|
|
major, minor = major_minor.split(":", maxsplit=1)
|
|
except ValueError:
|
|
print("error: malformed major:minor", file=sys.stderr)
|
|
raise SystemExit(1)
|
|
map_pin_name = "snap_{}_{}".format(snap, app)
|
|
key = DeviceCgroupV2Key()
|
|
key.major = int(major)
|
|
if minor == "*":
|
|
key.minor = DeviceCgroupV2Key.DEVICE_MINOR_ANY
|
|
else:
|
|
key.minor = int(minor)
|
|
key._typ = kind.encode()[0]
|
|
key_arg = ["0x%02x" % b for b in bytes(key)]
|
|
|
|
if allow:
|
|
cmd = [
|
|
"bpftool",
|
|
"map",
|
|
"update",
|
|
"pinned",
|
|
"/sys/fs/bpf/snap/{}".format(map_pin_name),
|
|
"key",
|
|
]
|
|
cmd.extend(key_arg)
|
|
cmd.extend(["value", "01", "any"])
|
|
else:
|
|
cmd = [
|
|
"bpftool",
|
|
"map",
|
|
"delete",
|
|
"pinned",
|
|
"/sys/fs/bpf/snap/{}".format(map_pin_name),
|
|
"key",
|
|
]
|
|
cmd.extend(key_arg)
|
|
logging.debug("running command %s", cmd)
|
|
subprocess.check_call(
|
|
cmd,
|
|
stdout=subprocess.PIPE,
|
|
)
|
|
|
|
|
|
def main(opts):
|
|
func = opts.func
|
|
is_v2 = is_cgroup_v2()
|
|
logging.debug("cgroup v2? %s", is_v2)
|
|
if func == "dump":
|
|
if not is_v2:
|
|
dump_v1(opts.snap_app)
|
|
else:
|
|
dump_v2(opts.snap_app)
|
|
elif func == "allow" or func == "deny":
|
|
allow_or_deny = func == "allow"
|
|
if not is_v2:
|
|
device_change_v1(opts.snap_app, allow_or_deny, opts.kind, opts.major_minor)
|
|
else:
|
|
device_change_v2(opts.snap_app, allow_or_deny, opts.kind, opts.major_minor)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
options = parse_arguments()
|
|
lvl = logging.INFO
|
|
if options.verbose:
|
|
lvl = logging.DEBUG
|
|
logging.basicConfig(level=lvl)
|
|
main(options)
|