feat: replace liblzma with lzma-rust2 for multi-threaded XZ decompression

- Replace liblzma C bindings with native Rust lzma-rust2 library
- Use XzReaderMt for parallel decompression across all platforms
- Remove dependency on system xz command
- Use get_recommended_threads() to utilize half of CPU cores
- Clean up unused find_binary and get_binary_search_paths functions
- Update module documentation to reflect native Rust approach

This change provides consistent multi-threaded decompression performance
across macOS, Linux, and Windows without requiring external tools.
This commit is contained in:
SuperKali
2025-12-26 00:58:29 +01:00
parent ab0a9c3959
commit 07879be87c
4 changed files with 44 additions and 174 deletions

View File

@@ -20,9 +20,10 @@ serde_json = "1"
tokio = { version = "1", features = ["full"] }
reqwest = { version = "0.12", default-features = false, features = ["json", "stream", "rustls-tls"] }
futures-util = "0.3"
liblzma = { version = "0.4", features = ["static"] }
flate2 = "1.0"
# Multi-threaded decompression libraries
lzma-rust2 = { version = "0.15", features = ["xz", "std"] }
bzip2 = "0.4"
flate2 = "1.0"
zstd = "0.13"
sha2 = "0.10"
hex = "0.4"

View File

@@ -1,24 +1,23 @@
//! Decompression module
//!
//! Handles decompressing compressed image files (XZ, GZ, BZ2, ZST)
//! using system tools or fallback Rust libraries.
//! using Rust native libraries with multi-threading support.
use std::fs::File;
use std::io::{BufReader, BufWriter, Read, Write};
use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use bzip2::read::BzDecoder;
use flate2::read::GzDecoder;
use liblzma::read::XzDecoder;
use lzma_rust2::XzReaderMt;
use zstd::stream::read::Decoder as ZstdDecoder;
use crate::config;
use crate::download::DownloadState;
use crate::utils::{find_binary, get_recommended_threads};
use crate::{log_error, log_info, log_warn};
use crate::log_info;
use crate::utils::get_recommended_threads;
const MODULE: &str = "decompress";
@@ -28,86 +27,7 @@ pub fn needs_decompression(path: &Path) -> bool {
matches!(ext.to_lowercase().as_str(), "xz" | "gz" | "bz2" | "zst")
}
/// Decompress using system xz command (much faster, uses multiple threads)
pub fn decompress_with_system_xz(
input_path: &Path,
output_path: &Path,
state: &Arc<DownloadState>,
) -> Result<(), String> {
use std::io::Read as IoRead;
use std::process::Stdio;
let xz_path = find_binary("xz").ok_or("xz command not found")?;
let threads = get_recommended_threads();
log_info!(
MODULE,
"Using system xz at: {} with {} threads",
xz_path.display(),
threads
);
let mut child = Command::new(&xz_path)
.args(["-d", "-k", "-c"]) // decompress, keep original, output to stdout
.arg(format!("-T{}", threads))
.arg(input_path)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(|e| {
log_error!(MODULE, "Failed to spawn xz: {}", e);
format!("Failed to spawn xz: {}", e)
})?;
let mut stdout = child.stdout.take().ok_or("Failed to capture xz stdout")?;
let mut output_file =
File::create(output_path).map_err(|e| format!("Failed to create output file: {}", e))?;
// Read in chunks to allow cancellation checks
let mut buffer = vec![0u8; config::download::CHUNK_SIZE];
loop {
// Check for cancellation
if state.is_cancelled.load(Ordering::SeqCst) {
log_info!(
MODULE,
"Decompression cancelled by user, killing xz process"
);
let _ = child.kill();
let _ = child.wait();
drop(output_file);
let _ = std::fs::remove_file(output_path);
return Err("Decompression cancelled".to_string());
}
let bytes_read = stdout
.read(&mut buffer)
.map_err(|e| format!("Failed to read from xz: {}", e))?;
if bytes_read == 0 {
break;
}
output_file
.write_all(&buffer[..bytes_read])
.map_err(|e| format!("Failed to write decompressed data: {}", e))?;
}
let status = child
.wait()
.map_err(|e| format!("Failed to wait for xz: {}", e))?;
if status.success() {
log_info!(MODULE, "System xz decompression complete");
Ok(())
} else {
let _ = std::fs::remove_file(output_path);
log_error!(MODULE, "xz decompression failed");
Err("xz decompression failed".to_string())
}
}
/// Decompress using Rust xz2 library (slower, single-threaded fallback)
/// Decompress using Rust lzma-rust2 library (multi-threaded)
pub fn decompress_with_rust_xz(
input_path: &Path,
output_path: &Path,
@@ -115,12 +35,22 @@ pub fn decompress_with_rust_xz(
) -> Result<(), String> {
let input_file =
File::open(input_path).map_err(|e| format!("Failed to open input file: {}", e))?;
let buf_reader = BufReader::with_capacity(config::download::DECOMPRESS_BUFFER_SIZE, input_file);
let decoder = XzDecoder::new(buf_reader);
decompress_with_reader(decoder, output_path, state, "xz")
let threads = get_recommended_threads();
log_info!(
MODULE,
"Using Rust lzma-rust2 with {} threads for XZ decompression",
threads
);
// XzReaderMt requires Seek + Read, so we pass the file directly
let decoder = XzReaderMt::new(input_file, false, threads as u32)
.map_err(|e| format!("Failed to create XZ decoder: {}", e))?;
decompress_with_reader_mt(decoder, output_path, state, "xz")
}
/// Decompress gzip files using flate2
/// Decompress gzip files using flate2 (single-threaded - TODO: add pigz system tool support)
pub fn decompress_with_gz(
input_path: &Path,
output_path: &Path,
@@ -130,10 +60,10 @@ pub fn decompress_with_gz(
File::open(input_path).map_err(|e| format!("Failed to open input file: {}", e))?;
let buf_reader = BufReader::with_capacity(config::download::DECOMPRESS_BUFFER_SIZE, input_file);
let decoder = GzDecoder::new(buf_reader);
decompress_with_reader(decoder, output_path, state, "gz")
decompress_with_reader_mt(decoder, output_path, state, "gz")
}
/// Decompress bzip2 files
/// Decompress bzip2 files using bzip2 (single-threaded - TODO: add parallel support)
pub fn decompress_with_bz2(
input_path: &Path,
output_path: &Path,
@@ -143,10 +73,10 @@ pub fn decompress_with_bz2(
File::open(input_path).map_err(|e| format!("Failed to open input file: {}", e))?;
let buf_reader = BufReader::with_capacity(config::download::DECOMPRESS_BUFFER_SIZE, input_file);
let decoder = BzDecoder::new(buf_reader);
decompress_with_reader(decoder, output_path, state, "bz2")
decompress_with_reader_mt(decoder, output_path, state, "bz2")
}
/// Decompress zstd files
/// Decompress zstd files (single-threaded - zstd doesn't have good multithreaded Rust support yet)
pub fn decompress_with_zstd(
input_path: &Path,
output_path: &Path,
@@ -157,11 +87,11 @@ pub fn decompress_with_zstd(
let buf_reader = BufReader::with_capacity(config::download::DECOMPRESS_BUFFER_SIZE, input_file);
let decoder = ZstdDecoder::new(buf_reader)
.map_err(|e| format!("Failed to create zstd decoder: {}", e))?;
decompress_with_reader(decoder, output_path, state, "zstd")
decompress_with_reader_mt(decoder, output_path, state, "zstd")
}
/// Generic decompression using any Read implementation
fn decompress_with_reader<R: Read>(
/// Generic decompression using any Read implementation (mut reference for multithreaded decoders)
fn decompress_with_reader_mt<R: Read>(
mut decoder: R,
output_path: &Path,
state: &Arc<DownloadState>,
@@ -251,21 +181,12 @@ pub fn decompress_local_file(
// Handle different compression formats
let result = if filename.ends_with(".xz") {
// Try system xz first (faster, multi-threaded), fall back to Rust library
log_info!(MODULE, "Decompressing XZ format");
if let Err(e) = decompress_with_system_xz(input_path, &output_path, state) {
if state.is_cancelled.load(Ordering::SeqCst) {
return Err("Decompression cancelled".to_string());
}
log_warn!(
MODULE,
"System xz failed: {}, falling back to Rust library (slower)",
e
);
decompress_with_rust_xz(input_path, &output_path, state)
} else {
Ok(())
}
// Use Rust lzma-rust2 library (multi-threaded) on all platforms
log_info!(
MODULE,
"Decompressing XZ format with Rust lzma-rust2 (multi-threaded)"
);
decompress_with_rust_xz(input_path, &output_path, state)
} else if filename.ends_with(".gz") {
log_info!(MODULE, "Decompressing GZ format");
decompress_with_gz(input_path, &output_path, state)

View File

@@ -13,7 +13,7 @@ use std::sync::Arc;
use tokio::sync::Mutex;
use crate::config;
use crate::decompress::{decompress_with_rust_xz, decompress_with_system_xz};
use crate::decompress::decompress_with_rust_xz;
use crate::{log_error, log_info, log_warn};
const MODULE: &str = "download";
@@ -290,23 +290,14 @@ pub async fn download_image(
// Decompress if needed
if filename.ends_with(".xz") {
state.is_decompressing.store(true, Ordering::SeqCst);
log_info!(MODULE, "Starting decompression...");
log_info!(
MODULE,
"Starting decompression with Rust lzma-rust2 (multi-threaded)..."
);
// Try system xz first, fall back to Rust library
if let Err(e) = decompress_with_system_xz(&temp_path, &output_path, &state) {
// Check if it was cancelled
if state.is_cancelled.load(Ordering::SeqCst) {
let _ = std::fs::remove_file(&temp_path);
return Err("Decompression cancelled".to_string());
}
log_warn!(
MODULE,
"System xz failed: {}, falling back to Rust library (slower)",
e
);
decompress_with_rust_xz(&temp_path, &output_path, &state)?;
log_info!(MODULE, "Rust fallback decompression complete");
}
// Use Rust lzma-rust2 library (multi-threaded) on all platforms
decompress_with_rust_xz(&temp_path, &output_path, &state)?;
log_info!(MODULE, "Decompression complete");
// Clean up temp file
let _ = std::fs::remove_file(&temp_path);

View File

@@ -5,7 +5,7 @@
use std::path::PathBuf;
/// Get the number of CPU cores available on the system
fn get_cpu_cores() -> usize {
pub fn get_cpu_cores() -> usize {
std::thread::available_parallelism()
.map(|p| p.get())
.unwrap_or(2)
@@ -17,49 +17,6 @@ pub fn get_recommended_threads() -> usize {
std::cmp::max(1, get_cpu_cores() / 2)
}
/// Find a binary in common system locations
/// Returns the first path that exists
pub fn find_binary(name: &str) -> Option<PathBuf> {
let paths = get_binary_search_paths(name);
paths.into_iter().find(|path| path.exists())
}
/// Get platform-specific search paths for a binary
fn get_binary_search_paths(name: &str) -> Vec<PathBuf> {
#[cfg(target_os = "macos")]
{
vec![
PathBuf::from(format!("/opt/homebrew/bin/{}", name)), // macOS ARM
PathBuf::from(format!("/usr/local/bin/{}", name)), // macOS Intel
PathBuf::from(format!("/usr/bin/{}", name)),
]
}
#[cfg(target_os = "linux")]
{
vec![
PathBuf::from(format!("/usr/bin/{}", name)),
PathBuf::from(format!("/bin/{}", name)),
PathBuf::from(format!("/usr/local/bin/{}", name)),
]
}
#[cfg(target_os = "windows")]
{
// On Windows, rely on PATH or specific install locations
vec![
PathBuf::from(format!("C:\\Program Files\\{0}\\{0}.exe", name)),
PathBuf::from(format!("C:\\Program Files (x86)\\{0}\\{0}.exe", name)),
]
}
#[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
{
vec![PathBuf::from(format!("/usr/bin/{}", name))]
}
}
/// Get the cache directory for the application
/// On Linux, when running as root via pkexec/sudo, uses the original user's cache directory
pub fn get_cache_dir(app_name: &str) -> PathBuf {