mirror of
https://github.com/token2/snapd.git
synced 2026-03-13 11:15:47 -07:00
Python 3 should be available everywhere at this point. Try dropping the wrapper. Signed-off-by: Maciej Borzecki <maciej.zenon.borzecki@canonical.com>
1326 lines
45 KiB
Python
Executable File
1326 lines
45 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
from __future__ import print_function, absolute_import, unicode_literals
|
|
|
|
from argparse import Action, ArgumentParser, FileType, RawTextHelpFormatter, SUPPRESS
|
|
import re
|
|
import sys
|
|
import unittest
|
|
|
|
# PY2 is true when we're running under Python 2.x It is used for appropriate
|
|
# return value selection of __str__ and __repr_ methods, which must both
|
|
# return str, not unicode (in Python 2) and str (in Python 3). In both cases
|
|
# the return type annotation is exactly the same, but due to unicode_literals
|
|
# being in effect, and the fact we often use a format string (which is an
|
|
# unicode string in Python 2), we must encode the it to byte string when
|
|
# running under Python 2.
|
|
PY2 = sys.version_info[0] == 2
|
|
|
|
# Define MYPY as False and use it as a conditional for typing import. Despite
|
|
# this declaration mypy will really treat MYPY as True when type-checking.
|
|
# This is required so that we can import typing on Python 2.x without the
|
|
# typing module installed. For more details see:
|
|
# https://mypy.readthedocs.io/en/latest/common_issues.html#import-cycles
|
|
MYPY = False
|
|
if MYPY:
|
|
from typing import Any, Dict, List, Text, Tuple, Match, Optional, Union, Sequence
|
|
from argparse import Namespace
|
|
|
|
|
|
class sint(int):
|
|
"""sint is an integer that always renders with sign."""
|
|
|
|
def __str__(self):
|
|
# type: () -> str
|
|
result = "{:+}".format(int(self))
|
|
if PY2:
|
|
return result.encode()
|
|
return result
|
|
|
|
def __repr__(self):
|
|
# type: () -> str
|
|
result = "sint({:+})".format(int(self))
|
|
if PY2:
|
|
return result.encode()
|
|
return result
|
|
|
|
|
|
class SintTests(unittest.TestCase):
|
|
def test_smoke(self):
|
|
# type: () -> None
|
|
self.assertEqual(sint(1), 1)
|
|
self.assertEqual(sint(-1), -1)
|
|
self.assertEqual(hash(sint(1)), hash(1))
|
|
|
|
def test_str(self):
|
|
# type: () -> None
|
|
self.assertEqual("{}".format(sint(1)), "+1")
|
|
self.assertEqual("{}".format(sint(-1)), "-1")
|
|
|
|
def test_repor(self):
|
|
# type: () -> None
|
|
self.assertEqual("{!r}".format(sint(1)), "sint(+1)")
|
|
self.assertEqual("{!r}".format(sint(-1)), "sint(-1)")
|
|
|
|
|
|
class Device(object):
|
|
"""Device is a device number with major and minor components."""
|
|
|
|
def __init__(self, major, minor):
|
|
# type: (int, int) -> None
|
|
self.major = major
|
|
self.minor = minor
|
|
|
|
def __hash__(self):
|
|
# type: () -> int
|
|
return hash((self.major, self.minor))
|
|
|
|
def __eq__(self, other):
|
|
# type: (object) -> Union[NotImplemented, bool]
|
|
if not isinstance(other, Device):
|
|
return NotImplemented
|
|
return self.major == other.major and self.minor == other.minor
|
|
|
|
def __str__(self):
|
|
# type: () -> str
|
|
result = "{}:{}".format(self.major, self.minor)
|
|
if PY2:
|
|
return result.encode()
|
|
return result
|
|
|
|
def __repr__(self):
|
|
# type: () -> str
|
|
result = "Device({}, {})".format(self.major, self.minor)
|
|
if PY2:
|
|
return result.encode()
|
|
return result
|
|
|
|
@classmethod
|
|
def differentiate(cls, a, b):
|
|
# type: (Device, Device) -> Device
|
|
return cls(sint(a.major - b.major), sint(a.minor - b.minor))
|
|
|
|
|
|
def _format_opt_fields(fields):
|
|
# type: (List[Text]) -> str
|
|
"""_format_opt_fields returns the special formatting of optional fields."""
|
|
if len(fields):
|
|
result = " ".join(fields) + " -"
|
|
else:
|
|
result = "-"
|
|
if PY2:
|
|
return result.encode()
|
|
return result
|
|
|
|
|
|
def _diff_opt_fields(a, b):
|
|
# type: (List[Text], List[Text]) -> List[Text]
|
|
if len(a) != len(b):
|
|
return a
|
|
diff = [] # type: List[Text]
|
|
for elem_a, elem_b in zip(a, b):
|
|
key_a, value_a = elem_a.split(":", 1)
|
|
key_b, value_b = elem_b.split(":", 1)
|
|
if key_a != key_b:
|
|
return a
|
|
try:
|
|
value_a_int = int(value_a)
|
|
value_b_int = int(value_b)
|
|
except ValueError:
|
|
return a
|
|
diff.append("{}:{}".format(key_a, sint(value_a_int - value_b_int)))
|
|
return diff
|
|
|
|
|
|
class DifferntiateOptionalFieldsTest(unittest.TestCase):
|
|
def test_ok(self):
|
|
# type: () -> None
|
|
a = OptionalFields(["master:6"])
|
|
b = OptionalFields(["master:5"])
|
|
aDelta = OptionalFields.differentiate(a, b)
|
|
self.assertEqual(aDelta, OptionalFields(["master:+1"]))
|
|
|
|
def test_ok_many(self):
|
|
# type: () -> None
|
|
a = OptionalFields(["master:6", "shared:10"])
|
|
b = OptionalFields(["master:5", "shared:7"])
|
|
aDelta = OptionalFields.differentiate(a, b)
|
|
self.assertEqual(aDelta, OptionalFields(["master:+1", "shared:+3"]))
|
|
|
|
def test_different_lengths(self):
|
|
# type: () -> None
|
|
a = OptionalFields(["master:6", "shared:2"])
|
|
b = OptionalFields(["shared:7"])
|
|
aDelta = OptionalFields.differentiate(a, b)
|
|
self.assertEqual(aDelta, OptionalFields(["master:6", "shared:2"]))
|
|
|
|
def test_different_keys(self):
|
|
# type: () -> None
|
|
a = OptionalFields(["master:6"])
|
|
b = OptionalFields(["shared:7"])
|
|
aDelta = OptionalFields.differentiate(a, b)
|
|
self.assertEqual(aDelta, OptionalFields(["master:6"]))
|
|
|
|
def test_non_numeric_value(self):
|
|
# type: () -> None
|
|
a = OptionalFields(["master:9"])
|
|
b = OptionalFields(["master:nan"])
|
|
aDelta = OptionalFields.differentiate(a, b)
|
|
self.assertEqual(aDelta, OptionalFields(["master:9"]))
|
|
|
|
|
|
# OptionalFields is a specialization of List[Text] but because of some Python 2
|
|
# distributions lacking the typing module, we must define a variant for mypy
|
|
# and one for without mypy. In both cases the customized formatting logic is
|
|
# implemented by _format_opt_fields().
|
|
if MYPY:
|
|
|
|
class OptionalFields(List[Text]):
|
|
def __str__(self):
|
|
# type: () -> str
|
|
return _format_opt_fields(self)
|
|
|
|
@classmethod
|
|
def differentiate(cls, a, b):
|
|
# type: (OptionalFields, OptionalFields) -> OptionalFields
|
|
return cls(_diff_opt_fields(a, b))
|
|
|
|
|
|
else:
|
|
|
|
class OptionalFields(list):
|
|
def __str__(self):
|
|
# type: () -> str
|
|
return _format_opt_fields(self)
|
|
|
|
@classmethod
|
|
def differentiate(cls, a, b):
|
|
# type: (OptionalFields, OptionalFields) -> OptionalFields
|
|
return cls(_diff_opt_fields(a, b))
|
|
|
|
|
|
class OptionalFieldsTests(unittest.TestCase):
|
|
def test_str(self):
|
|
# type: () -> None
|
|
fields = OptionalFields()
|
|
self.assertEqual("{}".format(fields), "-")
|
|
fields.append("master:123")
|
|
self.assertEqual("{}".format(fields), "master:123 -")
|
|
|
|
|
|
class MountInfoEntry(object):
|
|
"""Single entry in /proc/pid/mointinfo, see proc(5)"""
|
|
|
|
known_attrs = {
|
|
"mount_id": int,
|
|
"parent_id": int,
|
|
"dev": Device,
|
|
"root_dir": str,
|
|
"mount_point": str,
|
|
"mount_opts": str,
|
|
"opt_fields": list,
|
|
"fs_type": str,
|
|
"mount_source": str,
|
|
"sb_opts": str,
|
|
}
|
|
|
|
def __init__(self):
|
|
# type: () -> None
|
|
self.mount_id = 0
|
|
self.parent_id = 0
|
|
self.dev = Device(0, 0)
|
|
self.root_dir = ""
|
|
self.mount_point = ""
|
|
self.mount_opts = ""
|
|
self.opt_fields = OptionalFields() # type: OptionalFields
|
|
self.fs_type = ""
|
|
self.mount_source = ""
|
|
self.sb_opts = ""
|
|
# Attributes beyond this line do not represent kernel state.
|
|
# Marker for an entry being matched by a filter.
|
|
self.matched = False
|
|
|
|
@classmethod
|
|
def differentiate(cls, a, b):
|
|
# type: (MountInfoEntry, MountInfoEntry) -> MountInfoEntry
|
|
diff = MountInfoEntry()
|
|
diff.matched = a.matched
|
|
# Diff all the attributes that it makes sense to diff
|
|
diff.mount_id = sint(a.mount_id - b.mount_id)
|
|
diff.parent_id = sint(a.parent_id - b.parent_id)
|
|
diff.dev = Device.differentiate(a.dev, b.dev)
|
|
diff.root_dir = a.root_dir
|
|
diff.mount_point = a.mount_point
|
|
diff.mount_opts = a.mount_opts
|
|
diff.opt_fields = OptionalFields.differentiate(a.opt_fields, b.opt_fields)
|
|
diff.fs_type = a.fs_type
|
|
diff.mount_source = a.mount_source
|
|
diff.sb_opts = a.sb_opts
|
|
return diff
|
|
|
|
def __eq__(self, other):
|
|
# type: (object) -> Union[NotImplemented, bool]
|
|
if not isinstance(other, MountInfoEntry):
|
|
return NotImplemented
|
|
return (
|
|
self.mount_id == other.mount_id
|
|
and self.parent_id == other.parent_id
|
|
and self.dev == other.dev
|
|
and self.root_dir == other.root_dir
|
|
and self.mount_point == other.mount_point
|
|
and self.mount_opts == other.mount_opts
|
|
and self.opt_fields == other.opt_fields
|
|
and self.fs_type == other.fs_type
|
|
and self.mount_source == other.mount_source
|
|
and self.sb_opts == other.sb_opts
|
|
)
|
|
|
|
@classmethod
|
|
def parse(cls, line):
|
|
# type: (Text) -> MountInfoEntry
|
|
it = iter(line.split())
|
|
self = cls()
|
|
self.mount_id = int(next(it))
|
|
self.parent_id = int(next(it))
|
|
dev_maj, dev_min = map(int, next(it).split(":"))
|
|
self.dev = Device(dev_maj, dev_min)
|
|
self.root_dir = next(it)
|
|
self.mount_point = next(it)
|
|
self.mount_opts = next(it)
|
|
self.opt_fields = OptionalFields()
|
|
for opt_field in it:
|
|
if opt_field == "-":
|
|
break
|
|
self.opt_fields.append(opt_field)
|
|
self.fs_type = next(it)
|
|
self.mount_source = next(it)
|
|
self.sb_opts = next(it)
|
|
try:
|
|
next(it)
|
|
except StopIteration:
|
|
pass
|
|
else:
|
|
raise ValueError("leftovers after parsing {!r}".format(line))
|
|
return self
|
|
|
|
def __str__(self):
|
|
# type: () -> str
|
|
result = (
|
|
"{0.mount_id} {0.parent_id} {0.dev} {0.root_dir}"
|
|
" {0.mount_point} {0.mount_opts} {0.opt_fields} {0.fs_type}"
|
|
" {0.mount_source} {0.sb_opts}"
|
|
).format(self)
|
|
if PY2:
|
|
return result.encode()
|
|
return result
|
|
|
|
def __repr__(self):
|
|
# type: () -> str
|
|
result = "MountInfoEntry.parse({!r})".format(str(self))
|
|
if PY2:
|
|
return result.encode()
|
|
return result
|
|
|
|
@property
|
|
def dev_maj(self):
|
|
# type: () -> int
|
|
return self.dev.major
|
|
|
|
@property
|
|
def dev_min(self):
|
|
# type: () -> int
|
|
return self.dev.minor
|
|
|
|
|
|
class FilterExpr(object):
|
|
"""FilterExpr is the interface for filtering mount entries."""
|
|
|
|
def __contains__(self, entry):
|
|
# type: (MountInfoEntry) -> bool
|
|
"""__contains__ returns true if a mount entry matches the filter."""
|
|
|
|
|
|
class AttrFilter(FilterExpr):
|
|
"""AttrFilter performs equality test against a given attribute."""
|
|
|
|
def __init__(self, attr, value):
|
|
# type: (Text, Any) -> None
|
|
self.attr = attr
|
|
self.value = value
|
|
|
|
def __contains__(self, entry):
|
|
# type: (MountInfoEntry) -> bool
|
|
value = getattr(entry, self.attr)
|
|
return bool(value == self.value)
|
|
|
|
|
|
class AttrPrefixFilter(FilterExpr):
|
|
"""AttrPrefixFilter performs prefix test against a given attribute."""
|
|
|
|
def __init__(self, attr, value):
|
|
# type: (Text, Text) -> None
|
|
self.attr = attr
|
|
self.value = value
|
|
|
|
def __contains__(self, entry):
|
|
# type: (MountInfoEntry) -> bool
|
|
value = str(getattr(entry, self.attr))
|
|
return value.startswith(self.value)
|
|
|
|
|
|
def parse_filter(expr):
|
|
# type: (Text) -> FilterExpr
|
|
"""parse_filter parses one of the known filter expressions."""
|
|
if "=" in expr:
|
|
# Accept both .attr=value and attr=value as exact attribute match.
|
|
if expr.startswith("."):
|
|
expr = expr.lstrip(".")
|
|
attr, value = expr.split("=", 1)
|
|
try:
|
|
typ = MountInfoEntry.known_attrs[attr]
|
|
except KeyError:
|
|
raise ValueError("invalid filter expression {!r}".format(expr))
|
|
else:
|
|
return AttrFilter(attr, typ(value))
|
|
elif expr.endswith("..."):
|
|
# Treat /path/... as prefix match on mount_point.
|
|
return AttrPrefixFilter("mount_point", expr.rstrip("..."))
|
|
else:
|
|
# Treat /path as exact match on mount_point.
|
|
return AttrFilter("mount_point", expr)
|
|
|
|
|
|
def parse_attr(expr):
|
|
# type: (Text) -> Text
|
|
"""parse_attr parses attribute references (for display)."""
|
|
known = sorted(MountInfoEntry.known_attrs)
|
|
if expr.lstrip(".") in known:
|
|
return expr.lstrip(".")
|
|
raise ValueError(
|
|
"invalid attribute selector {!r}" " (known: {})".format(expr, known)
|
|
)
|
|
|
|
|
|
def parse_exprs(exprs):
|
|
# type: (List[Text]) -> Tuple[List[FilterExpr], List[Text]]
|
|
"""parse_exprs parses filter expressions and attribute references."""
|
|
# Filters are either .attr=value, /path, /path...
|
|
filters = [
|
|
parse_filter(expr) for expr in exprs if "=" in expr or not expr.startswith(".")
|
|
]
|
|
# Attributes are always .attr
|
|
attrs = [
|
|
parse_attr(expr) for expr in exprs if expr.startswith(".") and "=" not in expr
|
|
]
|
|
return filters, attrs
|
|
|
|
|
|
def matches(entry, filters):
|
|
# type: (MountInfoEntry, List[FilterExpr]) -> bool
|
|
"""
|
|
matches checks if a mount entry matches a list of filter expressions.
|
|
Filter expressions are ANDed together.
|
|
"""
|
|
for f in filters:
|
|
if entry not in f:
|
|
return False
|
|
return True
|
|
|
|
|
|
def renumber_snap_revision(entry, seen):
|
|
# type: (MountInfoEntry, Dict[Tuple[Text, Text], int]) -> None
|
|
"""renumber_snap_revisions re-numbers snap revision numbers in paths."""
|
|
|
|
def compose_preferred(parts, n):
|
|
# type: (List[Text], int) -> List[Text]
|
|
return parts[:3] + ["{}".format(n)] + parts[4:]
|
|
|
|
def compose_alternate(parts, n):
|
|
# type: (List[Text], int) -> List[Text]
|
|
return parts[:6] + ["{}".format(n)] + parts[7:]
|
|
|
|
def compose_hostfs_preferred(parts, n):
|
|
# type: (List[Text], int) -> List[Text]
|
|
return parts[:7] + ["{}".format(n)] + parts[8:]
|
|
|
|
def compose_hostfs_alternate(parts, n):
|
|
# type: (List[Text], int) -> List[Text]
|
|
return parts[:10] + ["{}".format(n)] + parts[11:]
|
|
|
|
def compose_writable(parts, n):
|
|
# type: (List[Text], int) -> List[Text]
|
|
return parts[:5] + ["{}".format(n)] + parts[6:]
|
|
|
|
def compose_hostfs_writable(parts, n):
|
|
# type: (List[Text], int) -> List[Text]
|
|
return parts[:9] + ["{}".format(n)] + parts[10:]
|
|
|
|
def alloc_n(snap_name, snap_rev):
|
|
# type: (Text, Text) -> int
|
|
key = (snap_name, snap_rev)
|
|
try:
|
|
return seen[key]
|
|
except KeyError:
|
|
n = len([name for (name, rev) in seen if name == snap_name]) + 1
|
|
seen[key] = n
|
|
return n
|
|
|
|
parts = entry.mount_point.split("/")
|
|
if len(parts) >= 4 and parts[:2] == ["", "snap"]:
|
|
snap_name = parts[2]
|
|
snap_rev = parts[3]
|
|
compose = compose_preferred
|
|
elif len(parts) >= 7 and parts[:5] == ["", "var", "lib", "snapd", "snap"]:
|
|
snap_name = parts[5]
|
|
snap_rev = parts[6]
|
|
compose = compose_alternate
|
|
elif len(parts) >= 6 and parts[:4] == ["", "writable", "system-data", "snap"]:
|
|
snap_name = parts[4]
|
|
snap_rev = parts[5]
|
|
compose = compose_writable
|
|
elif len(parts) >= 8 and parts[:6] == ["", "var", "lib", "snapd", "hostfs", "snap"]:
|
|
snap_name = parts[6]
|
|
snap_rev = parts[7]
|
|
compose = compose_hostfs_preferred
|
|
elif len(parts) >= 11 and parts[:9] == [
|
|
"",
|
|
"var",
|
|
"lib",
|
|
"snapd",
|
|
"hostfs",
|
|
"var",
|
|
"lib",
|
|
"snapd",
|
|
"snap",
|
|
]:
|
|
snap_name = parts[9]
|
|
snap_rev = parts[10]
|
|
compose = compose_hostfs_alternate
|
|
elif len(parts) >= 10 and parts[:8] == [
|
|
"", # 0
|
|
"var", # 1
|
|
"lib", # 2
|
|
"snapd", # 3
|
|
"hostfs", # 4
|
|
"writable", # 5
|
|
"system-data", # 6
|
|
"snap", # 7
|
|
]:
|
|
snap_name = parts[8]
|
|
snap_rev = parts[9]
|
|
compose = compose_hostfs_writable
|
|
else:
|
|
return
|
|
n = alloc_n(snap_name, snap_rev)
|
|
entry.mount_point = "/".join(compose(parts, n))
|
|
|
|
|
|
def renumber_opt_fields(entry, seen, base_n):
|
|
# type: (MountInfoEntry, Dict[int, int], int) -> None
|
|
"""renumber_opt_fields re-numbers peer group in optional fields."""
|
|
|
|
def alloc_n(peer_group):
|
|
# type: (int) -> int
|
|
key = peer_group
|
|
try:
|
|
return seen[key]
|
|
except KeyError:
|
|
n = (
|
|
len({orig for orig, renumbered in seen.items() if renumbered >= base_n})
|
|
+ base_n
|
|
+ 1
|
|
)
|
|
seen[key] = n
|
|
return n
|
|
|
|
def fn(m):
|
|
# type: (Match[Text]) -> Text
|
|
return "{}".format(alloc_n(int(m.group(1))))
|
|
|
|
entry.opt_fields = OptionalFields(
|
|
[re.sub("(\\d+)", fn, opt) for opt in entry.opt_fields]
|
|
)
|
|
|
|
|
|
def renumber_loop_devices(entry, seen):
|
|
# type: (MountInfoEntry, Dict[int, int]) -> None
|
|
"""renumber_loop_devices re-numbers loop device numbers."""
|
|
|
|
def alloc_n(loop_nr):
|
|
# type: (int) -> int
|
|
key = loop_nr
|
|
try:
|
|
return seen[key]
|
|
except KeyError:
|
|
n = len(seen)
|
|
seen[key] = n
|
|
return n
|
|
|
|
def fn(m):
|
|
# type: (Match[Text]) -> Text
|
|
return "loop{}".format(alloc_n(int(m.group(1))))
|
|
|
|
entry.mount_source = re.sub("loop(\\d+)", fn, entry.mount_source)
|
|
|
|
|
|
def renumber_mount_ids(entry, seen, base_n):
|
|
# type: (MountInfoEntry, Dict[int, int], int) -> None
|
|
"""renumber_mount_ids re-numbers mount and parent mount IDs."""
|
|
|
|
def alloc_n(mount_id):
|
|
# type: (int) -> int
|
|
key = mount_id
|
|
try:
|
|
return seen[key]
|
|
except KeyError:
|
|
n = (
|
|
len({orig for orig, renumbered in seen.items() if renumbered >= base_n})
|
|
+ base_n
|
|
)
|
|
seen[key] = n
|
|
return n
|
|
|
|
# NOTE: renumber the parent ahead of the mount to get more
|
|
# expected relationship between them.
|
|
entry.parent_id = alloc_n(entry.parent_id)
|
|
entry.mount_id = alloc_n(entry.mount_id)
|
|
|
|
|
|
def renumber_devices(entry, seen):
|
|
# type: (MountInfoEntry, Dict[Device, Device]) -> None
|
|
"""renumber_devices re-numbers major:minor device numbers."""
|
|
|
|
def alloc_n(dev):
|
|
# type: (Device) -> Device
|
|
key = dev
|
|
try:
|
|
return seen[key]
|
|
except KeyError:
|
|
# We haven't seen the major:minor pair precisely but perhaps we've
|
|
# seen the major number already? Check if this major is already
|
|
# remapped, if so reuse that value. If not just allocate the next
|
|
# one based on cardinality of the set of major numbers we've seen.
|
|
major = 0
|
|
for orig, remapped in seen.items():
|
|
if orig.major == dev.major:
|
|
major = remapped.major
|
|
break
|
|
else:
|
|
major = len({orig.major for orig in seen})
|
|
# Allocate the next minor number based on the cardinality of the
|
|
# set of minor numbers matching the major number.
|
|
minor = len({orig.minor for orig in seen if orig.major == dev.major})
|
|
n = Device(major, minor)
|
|
seen[key] = n
|
|
return n
|
|
|
|
entry.dev = alloc_n(entry.dev)
|
|
|
|
|
|
def renumber_ns(entry, seen):
|
|
# type: (MountInfoEntry, Dict[Tuple[Text, int], int]) -> None
|
|
"""renumber_mount_ns re-numbers mount namespace ID from .root_dir property."""
|
|
|
|
def alloc_n(ns_type, ns_id):
|
|
# type: (Text, int) -> int
|
|
key = (ns_type, ns_id)
|
|
try:
|
|
return seen[key]
|
|
except KeyError:
|
|
n = len(seen)
|
|
seen[key] = n
|
|
return n
|
|
|
|
if entry.fs_type != "nsfs":
|
|
return
|
|
match = re.match(r"^([a-z_]+):\[(\d+)\]$", entry.root_dir)
|
|
if match:
|
|
ns_type = match.group(1)
|
|
ns_id = int(match.group(2))
|
|
entry.root_dir = "{}:[{}]".format(ns_type, alloc_n(ns_type, ns_id))
|
|
|
|
|
|
def renumber_mount_option(opt, seen):
|
|
# type: (Text, Dict[Tuple[Text, Text], int]) -> Text
|
|
"""renumber_mount_option re-numbers various numbers in mount options."""
|
|
|
|
def alloc_n(mount_opt_key, mount_opt_value):
|
|
# type: (Text, Text) -> int
|
|
key = (mount_opt_key, mount_opt_value)
|
|
try:
|
|
return seen[key]
|
|
except KeyError:
|
|
n = len(
|
|
{
|
|
opt_value
|
|
for opt_key, opt_value in seen.keys()
|
|
if opt_key == mount_opt_key
|
|
}
|
|
)
|
|
seen[key] = n
|
|
return n
|
|
|
|
if "=" in opt:
|
|
mount_opt_key, mount_opt_value = opt.split("=", 1)
|
|
# size, nr_inode: used by tmpfs
|
|
# fd, pipe_ino: used by binfmtmisc
|
|
if mount_opt_key == "size":
|
|
return "size=VARIABLE"
|
|
if mount_opt_key in {"nr_inodes", "fd", "pipe_ino"}:
|
|
return "{}={}".format(
|
|
mount_opt_key, alloc_n(mount_opt_key, mount_opt_value)
|
|
)
|
|
return opt
|
|
|
|
|
|
def renumber_mount_opts(entry, seen):
|
|
# type: (MountInfoEntry, Dict[Tuple[Text, Text], int]) -> None
|
|
"""renumber_mount_opts alters numbers in mount options."""
|
|
entry.mount_opts = ",".join(
|
|
renumber_mount_option(opt, seen) for opt in entry.mount_opts.split(",")
|
|
)
|
|
entry.sb_opts = ",".join(
|
|
renumber_mount_option(opt, seen) for opt in entry.sb_opts.split(",")
|
|
)
|
|
|
|
|
|
class RewriteState(object):
|
|
"""RewriteState holds state used in rewriting mount entries."""
|
|
|
|
def __init__(self):
|
|
# type: () -> None
|
|
self.seen_opt_fields = {} # type: Dict[int, int]
|
|
self.seen_loops = {} # type: Dict[int, int]
|
|
self.seen_snap_revs = {} # type: Dict[Tuple[Text, Text], int]
|
|
self.seen_mount_ids = {} # type: Dict[int, int]
|
|
self.seen_devices = {} # type: Dict[Device, Device]
|
|
self.seen_ns = {} # type: Dict[Tuple[Text, int], int]
|
|
# NOTE: The type of the dictionary key is Tuple[Text, Text] because
|
|
# while generally "numeric" the values may include suffixes like
|
|
# "1024k" and it is just easier to handle this way.
|
|
self.seen_mount_opts = {} # type: Dict[Tuple[Text, Text], int]
|
|
|
|
|
|
def rewrite_renumber(entries, order, rs, base_n=0):
|
|
# type: (List[MountInfoEntry], List[int], RewriteState, int) -> None
|
|
"""rewrite_renumber applies all re-numbering helpers to a single entry."""
|
|
for i in range(len(entries)):
|
|
entry = entries[order[i]]
|
|
renumber_mount_ids(entry, rs.seen_mount_ids, base_n)
|
|
renumber_devices(entry, rs.seen_devices)
|
|
renumber_snap_revision(entry, rs.seen_snap_revs)
|
|
renumber_opt_fields(entry, rs.seen_opt_fields, base_n)
|
|
renumber_loop_devices(entry, rs.seen_loops)
|
|
renumber_ns(entry, rs.seen_ns)
|
|
renumber_mount_opts(entry, rs.seen_mount_opts)
|
|
|
|
|
|
def rewrite_rename(entries, order, rs):
|
|
# type: (List[MountInfoEntry], List[int], RewriteState) -> None
|
|
"""rewrite_rename applies all re-naming helpers to a single entry."""
|
|
# TODO: allocate devices like everything else above.
|
|
for i in range(len(entries)):
|
|
entry = entries[order[i]]
|
|
entry.mount_source = re.sub(
|
|
"/dev/[sv]d([a-z])", "/dev/sd\\1", entry.mount_source
|
|
)
|
|
|
|
|
|
class _UnitTestAction(Action):
|
|
def __init__(
|
|
self,
|
|
option_strings,
|
|
dest=SUPPRESS,
|
|
default=SUPPRESS,
|
|
help="run program's unit test suite and exit",
|
|
):
|
|
# type: (Text, Text, Text, Text) -> None
|
|
super(_UnitTestAction, self).__init__(
|
|
option_strings=option_strings,
|
|
dest=dest,
|
|
default=default,
|
|
nargs="...",
|
|
help=help,
|
|
)
|
|
|
|
def __call__(self, parser, ns, values, option_string=None):
|
|
# type: (ArgumentParser, Namespace, Union[str, Sequence[Any], None], Optional[Text]) -> None
|
|
# We allow the caller to provide the test to invoke by giving
|
|
# --run-unit-tests a set of arguments.
|
|
argv = [sys.argv[0]]
|
|
if isinstance(values, list):
|
|
argv += values
|
|
unittest.main(argv=argv)
|
|
parser.exit()
|
|
|
|
|
|
def main():
|
|
# type: () -> None
|
|
parser = ArgumentParser(
|
|
epilog="""
|
|
Expressions are ANDed together and have one of the following forms:
|
|
|
|
.ATTR=VALUE mount entry attribute ATTR is equal to VALUE
|
|
PATH mount point is equal to PATH
|
|
PATH... mount point starts with PATH
|
|
|
|
In addition .ATTR syntax can be used to limit display to only certain
|
|
attributes. By default the output is identical to raw mountinfo.
|
|
Known attributes, applicable for both filtering and display.
|
|
|
|
mount_point: path where mount is attached in the file system
|
|
mount_source: path of the mounted device or bind-mount origin
|
|
fs_type: filesystem type
|
|
mount_opts: options applying to the mount point only
|
|
sb_opts: options applying to the mounted filesystem
|
|
opt_fields: optional fields, used for propagation information
|
|
mount_id: mount point identifier
|
|
parent_id: identifier of parent mount point
|
|
dev: major:minor numbers of the mounted device
|
|
root_dir: subtree of the mounted filesystem exposed at mount_point
|
|
|
|
The exit code indicates if any mount point matched the query.
|
|
""",
|
|
formatter_class=RawTextHelpFormatter,
|
|
)
|
|
parser.register("action", "unit-test", _UnitTestAction)
|
|
parser.add_argument("-v", "--version", action="version", version="1.0")
|
|
parser.add_argument("--run-unit-tests", action="unit-test")
|
|
parser.add_argument(
|
|
"-f",
|
|
metavar="MOUNTINFO",
|
|
dest="file",
|
|
type=FileType(),
|
|
default="/proc/self/mountinfo",
|
|
help="parse specified mountinfo file",
|
|
)
|
|
parser.add_argument(
|
|
"--ref",
|
|
dest="refs",
|
|
metavar="MOUNTINFO",
|
|
type=FileType(),
|
|
action="append",
|
|
default=[],
|
|
help="refer to another table while rewriting, makes output comparable across namespaces",
|
|
)
|
|
parser.add_argument(
|
|
"--ref-x1000",
|
|
dest="ref_x1000",
|
|
action="store_true",
|
|
default=False,
|
|
help="start mount point IDs and peer groups at multiple of 1000, once for each ref",
|
|
)
|
|
parser.add_argument(
|
|
"--one", default=False, action="store_true", help="expect exactly one match"
|
|
)
|
|
parser.add_argument(
|
|
"--rewrite-order",
|
|
metavar="FIELD",
|
|
action="append",
|
|
default=[],
|
|
choices=MountInfoEntry.known_attrs.keys(),
|
|
help="rewrite entries in the order determined by given fields",
|
|
)
|
|
parser.add_argument(
|
|
"--display-order",
|
|
metavar="FIELD",
|
|
action="append",
|
|
default=[],
|
|
choices=MountInfoEntry.known_attrs.keys(),
|
|
help="display entries in the order determined by given fields",
|
|
)
|
|
parser.add_argument(
|
|
"exprs",
|
|
metavar="EXPRESSION",
|
|
nargs="*",
|
|
help="filter or display expression (see below)",
|
|
)
|
|
group = parser.add_argument_group("Rewriting rules")
|
|
group.add_argument(
|
|
"--renumber",
|
|
action="store_true",
|
|
help="Reassign mount IDs, device numbers, snap revisions"
|
|
" and loopback devices",
|
|
)
|
|
group.add_argument(
|
|
"--rename", action="store_true", help="Reassign block device names"
|
|
)
|
|
group.add_argument(
|
|
"--differential",
|
|
action="store_true",
|
|
help="Display numeric values as delta from previous item",
|
|
)
|
|
opts = parser.parse_args()
|
|
try:
|
|
filters, attrs = parse_exprs(opts.exprs)
|
|
except ValueError as exc:
|
|
raise SystemExit(exc)
|
|
entries = [MountInfoEntry.parse(line) for line in opts.file]
|
|
|
|
# Apply entry filtering ahead of any renumbering.
|
|
num_matched = 0
|
|
for e in entries:
|
|
if matches(e, filters):
|
|
e.matched = True
|
|
num_matched += 1
|
|
|
|
# Build rewrite state based on reference tables. This way the entries
|
|
# we will display can be correlated to other tables.
|
|
rs = RewriteState()
|
|
for i, ref in enumerate(opts.refs):
|
|
ref_entries = [MountInfoEntry.parse(line) for line in ref]
|
|
ref_rewrite_order = list(range(len(ref_entries)))
|
|
if opts.rewrite_order:
|
|
|
|
def ref_rewrite_key_fn(i):
|
|
# type: (int) -> Tuple[Any, ...]
|
|
return tuple(
|
|
getattr(ref_entries[i], field) for field in opts.rewrite_order
|
|
)
|
|
|
|
ref_rewrite_order.sort(key=ref_rewrite_key_fn)
|
|
if opts.renumber:
|
|
rewrite_renumber(
|
|
ref_entries, ref_rewrite_order, rs, 1000 * i if opts.ref_x1000 else 0
|
|
)
|
|
if opts.rename:
|
|
rewrite_rename(ref_entries, ref_rewrite_order, rs)
|
|
|
|
# Apply entry renumbering and renaming, perhaps using reordering as well.
|
|
rewrite_order = list(range(len(entries)))
|
|
if opts.rewrite_order:
|
|
|
|
def rewrite_key_fn(i):
|
|
# type: (int) -> Tuple[Any, ...]
|
|
return tuple(getattr(entries[i], field) for field in opts.rewrite_order)
|
|
|
|
rewrite_order.sort(key=rewrite_key_fn)
|
|
if opts.renumber:
|
|
rewrite_renumber(
|
|
entries, rewrite_order, rs, 1000 * len(opts.refs) if opts.ref_x1000 else 0
|
|
)
|
|
if opts.rename:
|
|
rewrite_rename(entries, rewrite_order, rs)
|
|
|
|
# Apply entry reordering for display.
|
|
if opts.display_order:
|
|
|
|
def display_key_fn(entry):
|
|
# type: (MountInfoEntry) -> Tuple[Any, ...]
|
|
return tuple(getattr(entry, field) for field in opts.display_order)
|
|
|
|
entries.sort(key=display_key_fn)
|
|
|
|
# Display and differentiate
|
|
prev = None # type: Optional[MountInfoEntry]
|
|
for e in entries:
|
|
if not e.matched:
|
|
prev = e
|
|
continue
|
|
if prev is not None and opts.differential:
|
|
e_display = MountInfoEntry.differentiate(e, prev)
|
|
else:
|
|
e_display = e
|
|
if attrs:
|
|
print(*[getattr(e_display, attr) for attr in attrs])
|
|
else:
|
|
print(e_display)
|
|
prev = e
|
|
if opts.one and num_matched != 1:
|
|
raise SystemExit(
|
|
"--one requires exactly one match, found {}".format(num_matched)
|
|
)
|
|
# Return with an exit code indicating if anything matched.
|
|
# This allows mountinfo.query to be used in scripts.
|
|
if num_matched == 0:
|
|
raise SystemExit(1)
|
|
|
|
|
|
class MountInfoEntryTests(unittest.TestCase):
|
|
|
|
non_zero_values = {
|
|
"mount_id": 1,
|
|
"parent_id": 2,
|
|
"dev": Device(3, 4),
|
|
"root_dir": "/root-dir",
|
|
"mount_point": "/mount-point",
|
|
"mount_opts": "mount-opts",
|
|
"opt_fields": OptionalFields(["opt:1", "fields:2"]),
|
|
"fs_type": "fs-type",
|
|
"mount_source": "mount-source",
|
|
"sb_opts": "sb-opts",
|
|
} # Dict[Text, Any]
|
|
|
|
def test_init(self):
|
|
# type: () -> None
|
|
e = MountInfoEntry()
|
|
self.assertEqual(e.mount_id, 0)
|
|
self.assertEqual(e.parent_id, 0)
|
|
self.assertEqual(e.dev, Device(0, 0))
|
|
self.assertEqual(e.root_dir, "")
|
|
self.assertEqual(e.mount_point, "")
|
|
self.assertEqual(e.mount_opts, "")
|
|
self.assertEqual(e.opt_fields, [])
|
|
self.assertEqual(e.fs_type, "")
|
|
self.assertEqual(e.mount_source, "")
|
|
self.assertEqual(e.sb_opts, "")
|
|
|
|
def test_parse(self):
|
|
# type: () -> None
|
|
e = MountInfoEntry.parse(
|
|
"2079 2266 0:3 mnt:[4026532791] /run/snapd/ns/test-snapd-mountinfo.mnt rw - nsfs nsfs rw"
|
|
)
|
|
self.assertEqual(e.mount_id, 2079)
|
|
self.assertEqual(e.parent_id, 2266)
|
|
self.assertEqual(e.dev, Device(0, 3))
|
|
self.assertEqual(e.root_dir, "mnt:[4026532791]")
|
|
self.assertEqual(e.mount_point, "/run/snapd/ns/test-snapd-mountinfo.mnt")
|
|
self.assertEqual(e.mount_opts, "rw")
|
|
self.assertEqual(e.opt_fields, [])
|
|
self.assertEqual(e.fs_type, "nsfs")
|
|
self.assertEqual(e.mount_source, "nsfs")
|
|
self.assertEqual(e.sb_opts, "rw")
|
|
|
|
def test_eq(self):
|
|
# type: () -> None
|
|
e0 = MountInfoEntry()
|
|
e1 = MountInfoEntry()
|
|
self.assertEqual(e0, e1)
|
|
for field, value in self.non_zero_values.items():
|
|
self.assertEqual(e0, e1)
|
|
old_value = getattr(e1, field)
|
|
setattr(e1, field, value)
|
|
self.assertNotEqual(e0, e1)
|
|
setattr(e1, field, old_value)
|
|
self.assertEqual(e0, e1)
|
|
|
|
def test_str(self):
|
|
# type: () -> None
|
|
e = MountInfoEntry()
|
|
for field, value in self.non_zero_values.items():
|
|
setattr(e, field, value)
|
|
self.assertEqual(
|
|
str(e),
|
|
"1 2 3:4 /root-dir /mount-point mount-opts opt:1 fields:2 - fs-type mount-source sb-opts",
|
|
)
|
|
|
|
def test_repr(self):
|
|
# type: () -> None
|
|
e = MountInfoEntry()
|
|
for field, value in self.non_zero_values.items():
|
|
setattr(e, field, value)
|
|
self.assertEqual(
|
|
repr(e),
|
|
"MountInfoEntry.parse('1 2 3:4 /root-dir /mount-point mount-opts opt:1 fields:2 - fs-type mount-source sb-opts')",
|
|
)
|
|
|
|
def test_dev_maj_min(self):
|
|
# type: () -> None
|
|
e = MountInfoEntry()
|
|
e.dev = Device(1, 2)
|
|
self.assertEqual(e.dev_min, 2)
|
|
self.assertEqual(e.dev_maj, 1)
|
|
|
|
|
|
class RenumberSnapRevisionTests(unittest.TestCase):
|
|
def setUp(self):
|
|
# type: () -> None
|
|
self.entry = MountInfoEntry()
|
|
self.seen = {} # type: Dict[Tuple[Text, Text], int]
|
|
|
|
def test_renumbering_allocation(self):
|
|
# type: () -> None
|
|
self.entry.mount_point = "/snap/core/7079"
|
|
renumber_snap_revision(self.entry, self.seen)
|
|
self.assertEqual(self.entry.mount_point, "/snap/core/1")
|
|
|
|
self.entry.mount_point = "/snap/core/7080"
|
|
renumber_snap_revision(self.entry, self.seen)
|
|
self.assertEqual(self.entry.mount_point, "/snap/core/2")
|
|
|
|
self.entry.mount_point = "/snap/snapd/x1"
|
|
renumber_snap_revision(self.entry, self.seen)
|
|
self.assertEqual(self.entry.mount_point, "/snap/snapd/1")
|
|
|
|
self.assertEqual(
|
|
self.seen, {("core", "7079"): 1, ("core", "7080"): 2, ("snapd", "x1"): 1}
|
|
)
|
|
|
|
def test_preferred(self):
|
|
# type: () -> None
|
|
self.entry.mount_point = "/snap/core/7079"
|
|
renumber_snap_revision(self.entry, self.seen)
|
|
self.assertEqual(self.entry.mount_point, "/snap/core/1")
|
|
|
|
self.entry.mount_point = "/snap/core/7079/subdir"
|
|
renumber_snap_revision(self.entry, self.seen)
|
|
self.assertEqual(self.entry.mount_point, "/snap/core/1/subdir")
|
|
|
|
self.assertEqual(self.seen, {("core", "7079"): 1})
|
|
|
|
def test_alternate(self):
|
|
# type: () -> None
|
|
self.entry.mount_point = "/var/lib/snapd/snap/core/7079"
|
|
renumber_snap_revision(self.entry, self.seen)
|
|
self.assertEqual(self.entry.mount_point, "/var/lib/snapd/snap/core/1")
|
|
|
|
self.entry.mount_point = "/var/lib/snapd/snap/core/7079/subdir"
|
|
renumber_snap_revision(self.entry, self.seen)
|
|
self.assertEqual(self.entry.mount_point, "/var/lib/snapd/snap/core/1/subdir")
|
|
|
|
self.assertEqual(self.seen, {("core", "7079"): 1})
|
|
|
|
def test_preferred_via_hostfs(self):
|
|
# type: () -> None
|
|
self.entry.mount_point = "/var/lib/snapd/hostfs/snap/core/7079"
|
|
renumber_snap_revision(self.entry, self.seen)
|
|
self.assertEqual(self.entry.mount_point, "/var/lib/snapd/hostfs/snap/core/1")
|
|
|
|
self.entry.mount_point = "/var/lib/snapd/hostfs/snap/core/7079/subdir"
|
|
renumber_snap_revision(self.entry, self.seen)
|
|
self.assertEqual(
|
|
self.entry.mount_point, "/var/lib/snapd/hostfs/snap/core/1/subdir"
|
|
)
|
|
|
|
self.assertEqual(self.seen, {("core", "7079"): 1})
|
|
|
|
def test_alternate_via_hostfs(self):
|
|
# type: () -> None
|
|
self.entry.mount_point = "/var/lib/snapd/hostfs/var/lib/snapd/snap/core/7079"
|
|
renumber_snap_revision(self.entry, self.seen)
|
|
self.assertEqual(
|
|
self.entry.mount_point, "/var/lib/snapd/hostfs/var/lib/snapd/snap/core/1"
|
|
)
|
|
|
|
self.entry.mount_point = (
|
|
"/var/lib/snapd/hostfs/var/lib/snapd/snap/core/7079/subdir"
|
|
)
|
|
renumber_snap_revision(self.entry, self.seen)
|
|
self.assertEqual(
|
|
self.entry.mount_point,
|
|
"/var/lib/snapd/hostfs/var/lib/snapd/snap/core/1/subdir",
|
|
)
|
|
|
|
self.assertEqual(self.seen, {("core", "7079"): 1})
|
|
|
|
def test_writable(self):
|
|
# type: () -> None
|
|
self.entry.mount_point = "/writable/system-data/snap/core18/1055"
|
|
renumber_snap_revision(self.entry, self.seen)
|
|
self.assertEqual(self.entry.mount_point, "/writable/system-data/snap/core18/1")
|
|
|
|
self.entry.mount_point = "/writable/system-data/snap/core18/1055/subdir"
|
|
renumber_snap_revision(self.entry, self.seen)
|
|
self.assertEqual(
|
|
self.entry.mount_point, "/writable/system-data/snap/core18/1/subdir"
|
|
)
|
|
|
|
self.assertEqual(self.seen, {("core18", "1055"): 1})
|
|
|
|
def test_writable_via_hostfs(self):
|
|
# type: () -> None
|
|
self.entry.mount_point = (
|
|
"/var/lib/snapd/hostfs/writable/system-data/snap/core18/1055"
|
|
)
|
|
renumber_snap_revision(self.entry, self.seen)
|
|
self.assertEqual(
|
|
self.entry.mount_point,
|
|
"/var/lib/snapd/hostfs/writable/system-data/snap/core18/1",
|
|
)
|
|
|
|
self.entry.mount_point = (
|
|
"/var/lib/snapd/hostfs/writable/system-data/snap/core18/1055/subdir"
|
|
)
|
|
renumber_snap_revision(self.entry, self.seen)
|
|
self.assertEqual(
|
|
self.entry.mount_point,
|
|
"/var/lib/snapd/hostfs/writable/system-data/snap/core18/1/subdir",
|
|
)
|
|
|
|
self.assertEqual(self.seen, {("core18", "1055"): 1})
|
|
|
|
|
|
class RenumberMountNsTests(unittest.TestCase):
|
|
def setUp(self):
|
|
# type: () -> None
|
|
self.entry = MountInfoEntry()
|
|
self.entry.fs_type = "nsfs"
|
|
self.seen = {} # type: Dict[Tuple[Text, int], int]
|
|
|
|
def test_renumbering_allocation(self):
|
|
# type: () -> None
|
|
self.entry.root_dir = "mnt:[4026532909]"
|
|
renumber_ns(self.entry, self.seen)
|
|
self.assertEqual(self.entry.root_dir, "mnt:[0]")
|
|
|
|
self.entry.root_dir = "mnt:[4026532791]"
|
|
renumber_ns(self.entry, self.seen)
|
|
self.assertEqual(self.entry.root_dir, "mnt:[1]")
|
|
|
|
self.entry.root_dir = "pid:[4026531836]"
|
|
renumber_ns(self.entry, self.seen)
|
|
self.assertEqual(self.entry.root_dir, "pid:[2]")
|
|
|
|
self.assertEqual(
|
|
self.seen,
|
|
{("mnt", 4026532909): 0, ("mnt", 4026532791): 1, ("pid", 4026531836): 2},
|
|
)
|
|
|
|
|
|
class RenumberMountOptionsTests(unittest.TestCase):
|
|
def setUp(self):
|
|
# type: () -> None
|
|
self.seen = {} # type: Dict[Tuple[Text, Text], int]
|
|
|
|
def test_renumber_allocation(self):
|
|
# type: () -> None
|
|
"""
|
|
numbers are allocated from subsets matching the key, this reduces delta.
|
|
"""
|
|
self.assertEqual(renumber_mount_option("pipe_ino=100", self.seen), "pipe_ino=0")
|
|
self.assertEqual(renumber_mount_option("pipe_ino=100", self.seen), "pipe_ino=0")
|
|
self.assertEqual(renumber_mount_option("pipe_ino=200", self.seen), "pipe_ino=1")
|
|
self.assertEqual(renumber_mount_option("pipe_ino=100", self.seen), "pipe_ino=0")
|
|
self.assertEqual(renumber_mount_option("fd=21", self.seen), "fd=0")
|
|
self.assertEqual(renumber_mount_option("fd=21", self.seen), "fd=0")
|
|
self.assertEqual(renumber_mount_option("fd=45", self.seen), "fd=1")
|
|
self.assertEqual(renumber_mount_option("fd=21", self.seen), "fd=0")
|
|
|
|
def test_renumber_size_variable(self):
|
|
# type: () -> None
|
|
"""
|
|
size is special-cased and always rewritten to the same value because it is prone to fluctuations
|
|
"""
|
|
self.assertEqual(renumber_mount_option("size=100", self.seen), "size=VARIABLE")
|
|
self.assertEqual(renumber_mount_option("size=200", self.seen), "size=VARIABLE")
|
|
|
|
def test_renumber_devtmpfs_opts(self):
|
|
# type: () -> None
|
|
"""
|
|
certain devtmpfs options are renumbered.
|
|
|
|
23 98 0:6 / /dev rw,nosuid shared:21 - devtmpfs devtmpfs rw,size=4057388k,nr_inodes=1014347,mode=755
|
|
|
|
Here the size and nr_inodes options are not deterministic and need to
|
|
be rewritten. The size quantity is very susceptible to free memory
|
|
fluctuations and is treated specially.
|
|
"""
|
|
# Options size= and nr_inodes= are renumbered.
|
|
self.assertEqual(
|
|
renumber_mount_option("size=4057388k", self.seen), "size=VARIABLE"
|
|
)
|
|
self.assertEqual(
|
|
renumber_mount_option("nr_inodes=1014347", self.seen), "nr_inodes=0"
|
|
)
|
|
# Option mode= is not renumbered.
|
|
self.assertEqual(renumber_mount_option("mode=755", self.seen), "mode=755")
|
|
self.assertEqual(self.seen, {("nr_inodes", "1014347"): 0})
|
|
|
|
def test_renumber_binfmt_misc_opts(self):
|
|
# type: () -> None
|
|
"""
|
|
certain binfmt_misc options are renumbered.
|
|
|
|
47 22 0:42 / /proc/sys/fs/binfmt_misc rw,relatime shared:28 - autofs systemd-1 rw,fd=40,pgrp=1,timeout=0,minproto=5,maxproto=5,direct,pipe_ino=16610
|
|
|
|
Here the fd and pipe_ino options are not deterministic and need to be rewritten.
|
|
"""
|
|
self.assertEqual(renumber_mount_option("fd=40", self.seen), "fd=0")
|
|
self.assertEqual(
|
|
renumber_mount_option("pipe_ino=16610", self.seen), "pipe_ino=0"
|
|
)
|
|
self.assertEqual(self.seen, {("fd", "40"): 0, ("pipe_ino", "16610"): 0})
|
|
|
|
|
|
class RenumberMountIdsTests(unittest.TestCase):
|
|
def setUp(self):
|
|
# type: () -> None
|
|
self.seen = {} # type: Dict[int, int]
|
|
|
|
def test_smoke(self):
|
|
# type: () -> None
|
|
# Renumber the first mount entry, from the "initial" mount namespace,
|
|
# with the initial, zero multiple.
|
|
entry = MountInfoEntry()
|
|
entry.mount_id = 12
|
|
entry.parent_id = 5
|
|
renumber_mount_ids(entry, self.seen, 0)
|
|
self.assertEqual(entry.parent_id, 0)
|
|
self.assertEqual(entry.mount_id, 1)
|
|
|
|
# Renumber another entry, now in a separate mount namespace, with a
|
|
# different multiplier. The parent ID stays in the < 1000 range. NOTE:
|
|
# in reality mount namespaces never share mount IDs but that's fine.
|
|
# The test just checks the renumber logic.
|
|
entry = MountInfoEntry()
|
|
entry.mount_id = 13
|
|
entry.parent_id = 5
|
|
renumber_mount_ids(entry, self.seen, 1000)
|
|
self.assertEqual(entry.parent_id, 0)
|
|
self.assertEqual(entry.mount_id, 1000)
|
|
|
|
|
|
class RenumberOptFieldsTests(unittest.TestCase):
|
|
def setUp(self):
|
|
# type: () -> None
|
|
self.seen = {} # type: Dict[int, int]
|
|
|
|
def test_smoke(self):
|
|
# type: () -> None
|
|
# Renumber the first mount entry, from the "initial" mount namespace,
|
|
# with the initial, zero multiple.
|
|
entry = MountInfoEntry()
|
|
entry.opt_fields = OptionalFields(["shared:12"])
|
|
renumber_opt_fields(entry, self.seen, 0)
|
|
self.assertEqual(entry.opt_fields, ["shared:1"])
|
|
|
|
# Renumber the second entry, now in a separate mount namespace, with a
|
|
# different multiplier. Given that the peer group number was not seen
|
|
# before it gets allocated into the 1000-1999 range.
|
|
entry = MountInfoEntry()
|
|
entry.opt_fields = OptionalFields(["shared:13"])
|
|
renumber_opt_fields(entry, self.seen, 1000)
|
|
self.assertEqual(entry.opt_fields, ["shared:1001"])
|
|
|
|
# Renumber the third entry, in the same mount namespace as the second
|
|
# one. Given that the peer group number was seen in the initial mount
|
|
# namespace it is renumbered the same was as the original was, even
|
|
# though the actual sharing mode is different.
|
|
entry = MountInfoEntry()
|
|
entry.opt_fields = OptionalFields(["master:12"])
|
|
renumber_opt_fields(entry, self.seen, 1000)
|
|
self.assertEqual(entry.opt_fields, ["master:1"])
|
|
|
|
|
|
class RewriteTests(unittest.TestCase):
|
|
def setUp(self):
|
|
# type: () -> None
|
|
self.entries = [
|
|
MountInfoEntry.parse(line)
|
|
for line in (
|
|
"2079 2266 0:3 mnt:[4026532791] /run/snapd/ns/test-snapd-mountinfo.mnt rw - nsfs nsfs rw",
|
|
"23 98 0:6 / /dev rw,nosuid shared:21 - devtmpfs devtmpfs rw,size=4057388k,nr_inodes=1014347,mode=755",
|
|
"47 22 0:42 / /proc/sys/fs/binfmt_misc rw,relatime shared:28 - autofs systemd-1 rw,fd=40,pgrp=1,timeout=0,minproto=5,maxproto=5,direct,pipe_ino=16610",
|
|
)
|
|
]
|
|
self.order = list(range(len(self.entries)))
|
|
self.rs = RewriteState()
|
|
|
|
def test_rewrite_renumber(self):
|
|
# type: () -> None
|
|
rewrite_renumber(self.entries, self.order, self.rs)
|
|
self.assertEqual(
|
|
self.entries,
|
|
[
|
|
MountInfoEntry.parse(line)
|
|
for line in (
|
|
"1 0 0:0 mnt:[0] /run/snapd/ns/test-snapd-mountinfo.mnt rw - nsfs nsfs rw",
|
|
"3 2 0:1 / /dev rw,nosuid shared:1 - devtmpfs devtmpfs rw,size=VARIABLE,nr_inodes=0,mode=755",
|
|
"5 4 0:2 / /proc/sys/fs/binfmt_misc rw,relatime shared:2 - autofs systemd-1 rw,fd=0,pgrp=1,timeout=0,minproto=5,maxproto=5,direct,pipe_ino=0",
|
|
)
|
|
],
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|