From 1face2521bdcf321f43058be3de7fb7e607835ef Mon Sep 17 00:00:00 2001 From: Luke Street Date: Mon, 15 Dec 2025 17:44:02 -0700 Subject: [PATCH] Integrate apalis for background jobs --- Cargo.lock | 196 ++++++++++++++-- Cargo.toml | 4 + crates/jobs/Cargo.toml | 21 ++ crates/jobs/src/handlers.rs | 323 +++++++++++++++++++++++++++ crates/jobs/src/jobs/mod.rs | 5 + crates/jobs/src/jobs/refresh.rs | 13 ++ crates/jobs/src/jobs/workflow_run.rs | 24 ++ crates/jobs/src/lib.rs | 98 ++++++++ crates/web/Cargo.toml | 3 + crates/web/src/cron.rs | 63 ++++-- crates/web/src/handlers/manage.rs | 27 ++- crates/web/src/handlers/mod.rs | 3 +- crates/web/src/handlers/webhook.rs | 175 +++++++++++++++ crates/web/src/main.rs | 65 ++++-- 14 files changed, 948 insertions(+), 72 deletions(-) create mode 100644 crates/jobs/Cargo.toml create mode 100644 crates/jobs/src/handlers.rs create mode 100644 crates/jobs/src/jobs/mod.rs create mode 100644 crates/jobs/src/jobs/refresh.rs create mode 100644 crates/jobs/src/jobs/workflow_run.rs create mode 100644 crates/jobs/src/lib.rs create mode 100644 crates/web/src/handlers/webhook.rs diff --git a/Cargo.lock b/Cargo.lock index c99ed21..332b155 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -94,6 +94,87 @@ version = "1.0.89" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6" +[[package]] +name = "apalis" +version = "1.0.0-beta.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6556b89bb5cb40dab6e4d7ee1f2abfef6d372bd3aef300a3faa992bcb13bda8" +dependencies = [ + "apalis-core", + "futures-util", + "pin-project", + "thiserror 2.0.12", + "tower", + "tracing", +] + +[[package]] +name = "apalis-core" +version = "1.0.0-beta.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49039d4eb476cc05e196210153dbb34be180de9a0ef8b7a666fde0f80c5c357d" +dependencies = [ + "futures-channel", + "futures-core", + "futures-sink", + "futures-timer", + "futures-util", + "pin-project", + "serde", + "serde_json", + "thiserror 2.0.12", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "apalis-cron" +version = "1.0.0-beta.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53230aa8a09cdbe2e254d1b59999efe9f1cad09b66d48475f51b3fb38be6b840" +dependencies = [ + "apalis-core", + "chrono", + "cron", + "futures-util", + "serde", + "ulid", +] + +[[package]] +name = "apalis-sql" +version = "1.0.0-beta.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "604ed96ae8ff20d4c6f8533ffc9f8c91c5f99da6bc564de6a37b94829522f9ec" +dependencies = [ + "apalis-core", + "chrono", + "serde", + "serde_json", + "thiserror 2.0.12", +] + +[[package]] +name = "apalis-sqlite" +version = "1.0.0-beta.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be443bddc6ba6e023b8364304def0a6e3428f258b5addf5aa74bcb5688eb3d1" +dependencies = [ + "apalis-core", + "apalis-sql", + "chrono", + "futures", + "log", + "pin-project", + "serde", + "serde_json", + "sqlx", + "thiserror 2.0.12", + "tokio", + "ulid", +] + [[package]] name = "approx" version = "0.5.1" @@ -734,6 +815,17 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "cron" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5877d3fbf742507b66bc2a1945106bd30dd8504019d596901ddd012a4dd01740" +dependencies = [ + "chrono", + "once_cell", + "winnow 0.6.26", +] + [[package]] name = "croner" version = "2.1.0" @@ -959,11 +1051,32 @@ dependencies = [ "webp", ] +[[package]] +name = "decomp-dev-jobs" +version = "0.1.0" +dependencies = [ + "anyhow", + "apalis", + "apalis-cron", + "apalis-sqlite", + "decomp-dev-core", + "decomp-dev-db", + "decomp-dev-github", + "octocrab", + "serde", + "serde_json", + "sqlx", + "time", + "tokio", + "tracing", +] + [[package]] name = "decomp-dev-web" version = "0.1.0" dependencies = [ "anyhow", + "apalis", "axum", "axum_typed_multipart", "bytes", @@ -972,6 +1085,7 @@ dependencies = [ "decomp-dev-db", "decomp-dev-github", "decomp-dev-images", + "decomp-dev-jobs", "hex", "image", "itertools 0.14.0", @@ -980,6 +1094,7 @@ dependencies = [ "maud", "mime", "objdiff-core", + "octocrab", "prost", "regex", "reqwest", @@ -1361,6 +1476,12 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + [[package]] name = "futures-util" version = "0.3.31" @@ -4001,9 +4122,9 @@ dependencies = [ [[package]] name = "sqlx" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3c3a85280daca669cfd3bcb68a337882a8bc57ec882f72c5d13a430613a738e" +checksum = "1fefb893899429669dcdd979aff487bd78f4064e5e7907e4269081e0ef7d97dc" dependencies = [ "sqlx-core", "sqlx-macros", @@ -4014,12 +4135,13 @@ dependencies = [ [[package]] name = "sqlx-core" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f743f2a3cea30a58cd479013f75550e879009e3a02f616f18ca699335aa248c3" +checksum = "ee6798b1838b6a0f69c007c133b8df5866302197e404e8b6ee8ed3e3a5e68dc6" dependencies = [ "base64 0.22.1", "bytes", + "chrono", "crc", "crossbeam-queue", "either", @@ -4035,6 +4157,7 @@ dependencies = [ "memchr", "once_cell", "percent-encoding", + "rustls", "serde", "serde_json", "sha2", @@ -4045,13 +4168,14 @@ dependencies = [ "tokio-stream", "tracing", "url", + "webpki-roots 0.26.11", ] [[package]] name = "sqlx-macros" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f4200e0fde19834956d4252347c12a083bdcb237d7a1a1446bffd8768417dce" +checksum = "a2d452988ccaacfbf5e0bdbc348fb91d7c8af5bee192173ac3636b5fb6e6715d" dependencies = [ "proc-macro2", "quote", @@ -4062,9 +4186,9 @@ dependencies = [ [[package]] name = "sqlx-macros-core" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "882ceaa29cade31beca7129b6beeb05737f44f82dbe2a9806ecea5a7093d00b7" +checksum = "19a9c1841124ac5a61741f96e1d9e2ec77424bf323962dd894bdb93f37d5219b" dependencies = [ "dotenvy", "either", @@ -4081,22 +4205,22 @@ dependencies = [ "sqlx-postgres", "sqlx-sqlite", "syn 2.0.100", - "tempfile", "tokio", "url", ] [[package]] name = "sqlx-mysql" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0afdd3aa7a629683c2d750c2df343025545087081ab5942593a5288855b1b7a7" +checksum = "aa003f0038df784eb8fecbbac13affe3da23b45194bd57dba231c8f48199c526" dependencies = [ "atoi", "base64 0.22.1", "bitflags 2.9.0", "byteorder", "bytes", + "chrono", "crc", "digest", "dotenvy", @@ -4131,14 +4255,15 @@ dependencies = [ [[package]] name = "sqlx-postgres" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0bedbe1bbb5e2615ef347a5e9d8cd7680fb63e77d9dafc0f29be15e53f1ebe6" +checksum = "db58fcd5a53cf07c184b154801ff91347e4c30d17a3562a635ff028ad5deda46" dependencies = [ "atoi", "base64 0.22.1", "bitflags 2.9.0", "byteorder", + "chrono", "crc", "dotenvy", "etcetera", @@ -4169,11 +4294,12 @@ dependencies = [ [[package]] name = "sqlx-sqlite" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c26083e9a520e8eb87a06b12347679b142dc2ea29e6e409f805644a7a979a5bc" +checksum = "c2d12fe70b2c1b4401038055f90f151b78208de1f9f89a7dbfd41587a10c3eea" dependencies = [ "atoi", + "chrono", "flume", "futures-channel", "futures-core", @@ -4594,7 +4720,7 @@ dependencies = [ "serde", "serde_spanned", "toml_datetime", - "winnow", + "winnow 0.7.6", ] [[package]] @@ -4850,6 +4976,17 @@ version = "0.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f720def6ce1ee2fc44d40ac9ed6d3a59c361c80a75a7aa8e75bb9baed31cf2ea" +[[package]] +name = "ulid" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "470dbf6591da1b39d43c14523b2b469c86879a53e8b758c8e090a470fe7b1fbe" +dependencies = [ + "rand 0.9.1", + "serde", + "web-time", +] + [[package]] name = "unicase" version = "2.8.1" @@ -5153,6 +5290,24 @@ dependencies = [ "libwebp-sys", ] +[[package]] +name = "webpki-roots" +version = "0.26.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9" +dependencies = [ + "webpki-roots 1.0.4", +] + +[[package]] +name = "webpki-roots" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2878ef029c47c6e8cf779119f20fcf52bde7ad42a731b2a304bc221df17571e" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "weezl" version = "0.1.8" @@ -5546,6 +5701,15 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "271414315aff87387382ec3d271b52d7ae78726f5d44ac98b4f4030c91880486" +[[package]] +name = "winnow" +version = "0.6.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e90edd2ac1aa278a5c4599b1d89cf03074b610800f866d4026dc199d7929a28" +dependencies = [ + "memchr", +] + [[package]] name = "winnow" version = "0.7.6" diff --git a/Cargo.toml b/Cargo.toml index 68a5766..1464338 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "crates/db", "crates/github", "crates/images", + "crates/jobs", "crates/web", ] @@ -17,6 +18,9 @@ rust-version = "1.88" [workspace.dependencies] anyhow = "1.0" +apalis = "1.0.0-beta.2" +apalis-cron = "1.0.0-beta.2" +apalis-sqlite = "1.0.0-beta.2" axum = { version = "0.8", features = ["macros"] } futures-util = "0.3.31" hex = "0.4" diff --git a/crates/jobs/Cargo.toml b/crates/jobs/Cargo.toml new file mode 100644 index 0000000..2e053ed --- /dev/null +++ b/crates/jobs/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "decomp-dev-jobs" +version.workspace = true +edition.workspace = true +publish = false + +[dependencies] +anyhow.workspace = true +apalis.workspace = true +apalis-cron.workspace = true +apalis-sqlite.workspace = true +decomp-dev-core = { path = "../core" } +decomp-dev-db = { path = "../db" } +decomp-dev-github = { path = "../github" } +octocrab.workspace = true +serde.workspace = true +serde_json.workspace = true +sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite"] } +time.workspace = true +tokio.workspace = true +tracing.workspace = true diff --git a/crates/jobs/src/handlers.rs b/crates/jobs/src/handlers.rs new file mode 100644 index 0000000..d1e31ac --- /dev/null +++ b/crates/jobs/src/handlers.rs @@ -0,0 +1,323 @@ +use anyhow::{Context, Result}; +use apalis::prelude::*; +use octocrab::models::InstallationId; + +use crate::{JobContext, ProcessWorkflowRunJob, RefreshProjectJob}; + +/// Process a completed workflow run job. +/// +/// This handles: +/// - Fetching the project info from the database +/// - Getting the appropriate GitHub client (installation or personal token) +/// - Processing workflow run artifacts +/// - Inserting reports for push events +/// - Generating and posting PR comments for pull request events +pub async fn process_workflow_run_job( + job: ProcessWorkflowRunJob, + ctx: Data, +) -> Result<()> { + tracing::info!( + "Processing workflow run job: repo={} run={} event={}", + job.repository_id, + job.run_id, + job.event + ); + + // Look up the project + let Some(project_info) = ctx + .db + .get_project_info_by_id(job.repository_id, None) + .await + .context("Failed to fetch project info")? + else { + tracing::warn!("No project found for repository ID {}", job.repository_id); + return Ok(()); + }; + + // Get the appropriate GitHub client + let client = if let Some(installation_id) = job.installation_id { + if let Some(installations) = &ctx.github.installations { + let mut installations = installations.lock().await; + installations + .client_for_installation(InstallationId(installation_id)) + .await + .context("Failed to get installation client")? + } else { + ctx.github.client.clone() + } + } else { + ctx.github.client_for(job.repository_id).await.context("Failed to get GitHub client")? + }; + + // Fetch the repository to get the default branch + let repository = + client.repos_by_id(job.repository_id).get().await.context("Failed to fetch repository")?; + + // Process the workflow run to get artifacts + let result = + decomp_dev_github::process_workflow_run(&client, &project_info.project, job.run_id.into()) + .await + .context("Failed to process workflow run")?; + + tracing::debug!( + "Processed workflow run {} ({}) (artifacts {})", + job.run_id, + job.head_sha, + result.artifacts.len() + ); + + if result.artifacts.is_empty() { + return Ok(()); + } + + // Create commit info + let commit = decomp_dev_core::models::Commit { + sha: job.head_sha.clone(), + timestamp: time::UtcDateTime::now(), + message: None, + }; + + // Handle push events - insert reports into database + if job.event == "push" { + let is_default_branch = match &repository.default_branch { + Some(default_branch) => default_branch == &job.head_branch, + None => matches!(job.head_branch.as_str(), "master" | "main"), + }; + + if is_default_branch { + for artifact in result.artifacts { + let start = std::time::Instant::now(); + ctx.db + .insert_report( + &project_info.project, + &commit, + &artifact.version, + *artifact.report, + ) + .await + .context("Failed to insert report")?; + let duration = start.elapsed(); + tracing::info!( + "Inserted report {} ({}) in {}ms", + artifact.version, + commit.sha, + duration.as_millis() + ); + } + } + } else if matches!(job.event.as_str(), "pull_request" | "pull_request_target") { + // Handle pull request events - generate and post comments + if !project_info.project.enable_pr_comments { + return Ok(()); + } + + let Some(base_commit) = project_info.commit else { + tracing::warn!("No base commit found for repository ID {}", job.repository_id); + return Ok(()); + }; + + // Get all versions that exist on the base branch + let base_versions = ctx + .db + .get_versions_for_commit( + &project_info.project.owner, + &project_info.project.repo, + &base_commit.sha, + ) + .await + .context("Failed to get base versions")?; + + let mut version_comments = Vec::new(); + + // Process existing artifacts from PR + for artifact in &result.artifacts { + let cached_report = ctx + .db + .get_report( + &project_info.project.owner, + &project_info.project.repo, + &base_commit.sha, + &artifact.version, + ) + .await + .context("Failed to get cached report")?; + + if let Some(cached_report) = cached_report { + let report_file = ctx + .db + .upgrade_report(&cached_report) + .await + .context("Failed to upgrade report")?; + let report = report_file.report.flatten(); + let changes = + decomp_dev_github::changes::generate_changes(&report, &artifact.report) + .context("Failed to generate changes")?; + version_comments.push(decomp_dev_github::changes::generate_comment( + &report, + &artifact.report, + Some(&report_file.version), + Some(&report_file.commit), + Some(&commit), + changes, + )); + } else { + tracing::warn!( + "No base report found for version {} (base {})", + artifact.version, + base_commit.sha + ); + version_comments.push(decomp_dev_github::changes::generate_missing_report_comment( + &artifact.version, + Some(&base_commit), + Some(&commit), + )); + } + } + + // Check for versions that exist on base but are missing from PR + for base_version in &base_versions { + if !result.artifacts.iter().any(|a| a.version == *base_version) { + version_comments.push(decomp_dev_github::changes::generate_missing_report_comment( + base_version, + Some(&base_commit), + Some(&commit), + )); + } + } + + if !version_comments.is_empty() { + let combined_comment = + decomp_dev_github::changes::generate_combined_comment(version_comments); + + // Post/update comments for each associated PR + for pr_number in &job.pull_request_numbers { + post_pr_comment( + &client, + &project_info.project, + &repository, + *pr_number, + &combined_comment, + ) + .await + .context("Failed to post PR comment")?; + } + } + } + + Ok(()) +} + +/// Post or update a PR comment with the report. +async fn post_pr_comment( + client: &octocrab::Octocrab, + project: &decomp_dev_core::models::Project, + repository: &octocrab::models::Repository, + pr_number: u64, + combined_comment: &str, +) -> Result<()> { + use decomp_dev_core::models::PullReportStyle; + + if project.pr_report_style == PullReportStyle::Description { + let pulls = client.pulls(&project.owner, &project.repo); + let pull = pulls.get(pr_number).await.context("Failed to get pull request")?; + let start_marker = ""; + let end_marker = ""; + let new_section = format!("{start_marker}\n{combined_comment}\n{end_marker}"); + let existing_body = pull.body.unwrap_or_default(); + let new_body = if let Some(start_idx) = existing_body.find(start_marker) { + if let Some(end_rel) = existing_body[start_idx..].find(end_marker) { + let end_idx = start_idx + end_rel + end_marker.len(); + format!( + "{}{}{}", + &existing_body[..start_idx], + new_section, + &existing_body[end_idx..] + ) + } else { + format!("{existing_body}\n\n---\n\n{new_section}") + } + } else if existing_body.trim().is_empty() { + new_section + } else { + format!("{}\n\n---\n\n{}", existing_body.trim(), new_section) + }; + + pulls + .update(pr_number) + .body(new_body) + .send() + .await + .context("Failed to update pull request body")?; + } else { + let repository_id = repository.id.into_inner(); + let issues = client.issues_by_id(repository_id); + // Only fetch first page for now + let existing_comments = issues.list_comments(pr_number).send().await?; + + // Find existing report comments + let existing_report_comments: Vec<_> = existing_comments + .items + .iter() + .filter(|comment| { + comment.body.as_ref().is_some_and(|body| body.contains("### Report for ")) + }) + .collect(); + + if let Some(first_comment) = existing_report_comments.first() { + // Update the first comment + issues + .update_comment(first_comment.id, combined_comment.to_string()) + .await + .context("Failed to update existing comment")?; + + // Delete any additional report comments + for comment in existing_report_comments.iter().skip(1) { + if let Err(e) = issues.delete_comment(comment.id).await { + tracing::warn!("Failed to delete old comment {}: {}", comment.id, e); + } + } + } else { + // Create new comment + issues + .create_comment(pr_number, combined_comment.to_string()) + .await + .context("Failed to create comment")?; + } + } + + Ok(()) +} + +/// Process a project refresh job. +/// +/// This calls the existing refresh_project function to fetch workflow runs, +/// download artifacts, and insert reports. +pub async fn process_refresh_project_job( + job: RefreshProjectJob, + ctx: Data, +) -> Result<()> { + tracing::info!( + "Processing refresh project job: repo={} full_refresh={}", + job.repository_id, + job.full_refresh + ); + + match decomp_dev_github::refresh_project( + &ctx.github, + &ctx.db, + job.repository_id, + None, // No client override for background jobs + job.full_refresh, + ) + .await + { + Ok(count) => { + tracing::info!("Refreshed project {} with {} new reports", job.repository_id, count); + Ok(()) + } + Err(e) => { + tracing::error!("Failed to refresh project {}: {:?}", job.repository_id, e); + Err(e) + } + } +} diff --git a/crates/jobs/src/jobs/mod.rs b/crates/jobs/src/jobs/mod.rs new file mode 100644 index 0000000..e335974 --- /dev/null +++ b/crates/jobs/src/jobs/mod.rs @@ -0,0 +1,5 @@ +mod refresh; +mod workflow_run; + +pub use refresh::RefreshProjectJob; +pub use workflow_run::ProcessWorkflowRunJob; diff --git a/crates/jobs/src/jobs/refresh.rs b/crates/jobs/src/jobs/refresh.rs new file mode 100644 index 0000000..f452c90 --- /dev/null +++ b/crates/jobs/src/jobs/refresh.rs @@ -0,0 +1,13 @@ +use serde::{Deserialize, Serialize}; + +/// Job to refresh a project's reports from GitHub Actions. +/// +/// This job fetches workflow runs from GitHub, downloads artifacts, +/// parses reports, and inserts them into the database. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RefreshProjectJob { + /// The repository ID to refresh. + pub repository_id: u64, + /// Whether to do a full refresh (fetch all runs) or partial (stop at known commit). + pub full_refresh: bool, +} diff --git a/crates/jobs/src/jobs/workflow_run.rs b/crates/jobs/src/jobs/workflow_run.rs new file mode 100644 index 0000000..84ebd15 --- /dev/null +++ b/crates/jobs/src/jobs/workflow_run.rs @@ -0,0 +1,24 @@ +use serde::{Deserialize, Serialize}; + +/// Job to process a completed GitHub Actions workflow run. +/// +/// This job downloads artifacts from the workflow run, parses report files, +/// inserts them into the database (for push events), and posts/updates +/// PR comments (for pull request events). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ProcessWorkflowRunJob { + /// The repository ID (used to look up the project). + pub repository_id: u64, + /// The workflow run ID to process. + pub run_id: u64, + /// The SHA of the commit that triggered the workflow. + pub head_sha: String, + /// The branch that triggered the workflow. + pub head_branch: String, + /// The event that triggered the workflow ("push", "pull_request", etc.). + pub event: String, + /// PR numbers associated with this workflow run (empty for push events). + pub pull_request_numbers: Vec, + /// The GitHub App installation ID, if this was triggered via an installation. + pub installation_id: Option, +} diff --git a/crates/jobs/src/lib.rs b/crates/jobs/src/lib.rs new file mode 100644 index 0000000..c72b09d --- /dev/null +++ b/crates/jobs/src/lib.rs @@ -0,0 +1,98 @@ +mod handlers; +mod jobs; + +use std::time::Duration; + +use anyhow::Result; +use apalis::prelude::*; +use apalis_sqlite::{CompactType, SqliteStorage, fetcher::SqliteFetcher}; +use decomp_dev_core::config::Config; +use decomp_dev_db::Database; +use decomp_dev_github::GitHub; +pub use handlers::{process_refresh_project_job, process_workflow_run_job}; +pub use jobs::{ProcessWorkflowRunJob, RefreshProjectJob}; +use sqlx::sqlite::SqlitePool; + +/// Shared context available to all job handlers. +#[derive(Clone)] +pub struct JobContext { + pub config: Config, + pub db: Database, + pub github: GitHub, +} + +/// Type alias for the default codec used by SqliteStorage. +type DefaultCodec = apalis::prelude::json::JsonCodec; + +/// Type alias for workflow run storage. +pub type WorkflowRunStorage = SqliteStorage; + +/// Type alias for refresh project storage. +pub type RefreshProjectStorage = SqliteStorage; + +/// Storage handles for pushing jobs from request handlers. +#[derive(Clone)] +pub struct JobStorage { + workflow_run: WorkflowRunStorage, + refresh_project: RefreshProjectStorage, +} + +impl JobStorage { + /// Set up job storage tables and create storage instances. + pub async fn setup(pool: &SqlitePool) -> Result { + SqliteStorage::setup(pool).await?; + + Ok(Self { + workflow_run: SqliteStorage::new(pool), + refresh_project: SqliteStorage::new(pool), + }) + } + + /// Get a clone of the workflow run storage for pushing jobs. + pub fn workflow_run(&self) -> WorkflowRunStorage { self.workflow_run.clone() } + + /// Get a clone of the refresh project storage for pushing jobs. + pub fn refresh_project(&self) -> RefreshProjectStorage { self.refresh_project.clone() } +} + +/// Configuration for job workers. +#[derive(Debug, Clone)] +pub struct WorkerConfig { + /// Maximum concurrent workflow run jobs. + pub workflow_run_concurrency: usize, + /// Maximum concurrent refresh project jobs. + pub refresh_project_concurrency: usize, + /// Number of retry attempts for failed jobs. + pub retry_attempts: usize, +} + +impl Default for WorkerConfig { + fn default() -> Self { + Self { workflow_run_concurrency: 5, refresh_project_concurrency: 3, retry_attempts: 3 } + } +} + +/// Create the job monitor with all workers. +pub fn create_monitor(storage: JobStorage, context: JobContext, config: WorkerConfig) -> Monitor { + let ctx1 = context.clone(); + let ctx2 = context; + let config1 = config.clone(); + let config2 = config; + + Monitor::new() + .register(move |_| { + WorkerBuilder::new("workflow-run-worker") + .backend(storage.workflow_run.clone()) + .concurrency(config1.workflow_run_concurrency) + .data(ctx1.clone()) + .build(process_workflow_run_job) + }) + .register(move |_| { + WorkerBuilder::new("refresh-project-worker") + .backend(storage.refresh_project.clone()) + .concurrency(config2.refresh_project_concurrency) + .data(ctx2.clone()) + .build(process_refresh_project_job) + }) + .shutdown_timeout(Duration::from_secs(30)) +} diff --git a/crates/web/Cargo.toml b/crates/web/Cargo.toml index 1f5465f..ac14f99 100644 --- a/crates/web/Cargo.toml +++ b/crates/web/Cargo.toml @@ -7,6 +7,7 @@ default-run = "decomp-dev-web" [dependencies] anyhow.workspace = true +apalis.workspace = true axum.workspace = true axum_typed_multipart = "0.16" decomp-dev-auth = { path = "../auth" } @@ -14,6 +15,7 @@ decomp-dev-core = { path = "../core" } decomp-dev-db = { path = "../db" } decomp-dev-github = { path = "../github" } decomp-dev-images = { path = "../images" } +decomp-dev-jobs = { path = "../jobs" } hex.workspace = true image.workspace = true itertools = "0.14" @@ -21,6 +23,7 @@ lexicmp = "0.2" maud.workspace = true mime.workspace = true objdiff-core.workspace = true +octocrab.workspace = true regex.workspace = true reqwest.workspace = true serde.workspace = true diff --git a/crates/web/src/cron.rs b/crates/web/src/cron.rs index a3e2783..4566a87 100644 --- a/crates/web/src/cron.rs +++ b/crates/web/src/cron.rs @@ -1,4 +1,6 @@ use anyhow::Result; +use apalis::prelude::TaskSink; +use decomp_dev_jobs::RefreshProjectJob; use tokio_cron_scheduler::{Job, JobScheduler}; use tower_sessions::ExpiredDeletion; use tracing::log; @@ -12,57 +14,75 @@ pub async fn create( session_store: impl ExpiredDeletion + Clone, ) -> Result { let sched = JobScheduler::new().await?; + + // Every 5 minutes: Queue partial refresh jobs for projects without active installations { let state = state.clone(); sched .add(Job::new_async("every 5 minutes", move |_uuid, _l| { let state = state.clone(); Box::pin(async move { - refresh_projects(&state, false).await.expect("Failed to refresh projects"); + if let Err(e) = queue_refresh_jobs(&state, false).await { + log::error!("Failed to queue refresh jobs: {:?}", e); + } }) })?) .await?; } + + // Every 12 hours: Queue full refresh jobs for all projects { let state = state.clone(); sched .add(Job::new_async("every 12 hours", move |_uuid, _l| { let state = state.clone(); Box::pin(async move { - refresh_projects(&state, true).await.expect("Failed to refresh projects"); + if let Err(e) = queue_refresh_jobs(&state, true).await { + log::error!("Failed to queue full refresh jobs: {:?}", e); + } }) })?) .await?; } + + // At midnight: Cleanup report units and images { sched .add(Job::new_async("at midnight", move |_uuid, _l| { let state = state.clone(); Box::pin(async move { - state.db.cleanup_report_units().await.expect("Failed to clean up report units"); - state.db.cleanup_images().await.expect("Failed to clean up images"); + if let Err(e) = state.db.cleanup_report_units().await { + log::error!("Failed to clean up report units: {:?}", e); + } + if let Err(e) = state.db.cleanup_images().await { + log::error!("Failed to clean up images: {:?}", e); + } }) })?) .await?; } + + // Every 1 minute: Delete expired sessions { sched .add(Job::new_async("every 1 minute", move |_uuid, _l| { let session_store = session_store.clone(); Box::pin(async move { - session_store - .delete_expired() - .await - .expect("Failed to delete expired sessions"); + if let Err(e) = session_store.delete_expired().await { + log::error!("Failed to delete expired sessions: {:?}", e); + } }) })?) .await?; } + sched.start().await?; Ok(sched) } -pub async fn refresh_projects(state: &AppState, full_refresh: bool) -> Result<()> { +/// Queue refresh jobs for all enabled projects. +async fn queue_refresh_jobs(state: &AppState, full_refresh: bool) -> Result<()> { + let mut queued = 0; for project_info in state.db.get_projects().await? { if !project_info.project.enabled { log::debug!( @@ -73,7 +93,7 @@ pub async fn refresh_projects(state: &AppState, full_refresh: bool) -> Result<() continue; } if !full_refresh { - // Skip projects with active app installations + // Skip projects with active app installations (they get updates via webhooks) if let Some(installations) = &state.github.installations { let installations = installations.lock().await; if installations.repo_to_installation.contains_key(&project_info.project.id) { @@ -81,22 +101,25 @@ pub async fn refresh_projects(state: &AppState, full_refresh: bool) -> Result<() } } } - if let Err(e) = decomp_dev_github::refresh_project( - &state.github, - &state.db, - project_info.project.id, - None, - full_refresh, - ) - .await - { + + let job = RefreshProjectJob { repository_id: project_info.project.id, full_refresh }; + + let mut storage = state.jobs.refresh_project(); + if let Err(e) = storage.push(job).await { log::error!( - "Failed to refresh {}/{}: {:?}", + "Failed to queue refresh job for {}/{}: {:?}", project_info.project.owner, project_info.project.repo, e ); + } else { + queued += 1; } } + + if queued > 0 { + log::info!("Queued {} refresh jobs (full_refresh={})", queued, full_refresh); + } + Ok(()) } diff --git a/crates/web/src/handlers/manage.rs b/crates/web/src/handlers/manage.rs index d0dfd43..4005185 100644 --- a/crates/web/src/handlers/manage.rs +++ b/crates/web/src/handlers/manage.rs @@ -1,6 +1,7 @@ use std::io::Cursor; use anyhow::{Context, Result}; +use apalis::prelude::TaskSink; use axum::{ Form, extract::{Path, State}, @@ -20,6 +21,7 @@ use decomp_dev_core::{ use decomp_dev_github::{ check_for_reports, extract_github_url, graphql::RepositoryPermission, refresh_project, }; +use decomp_dev_jobs::RefreshProjectJob; use itertools::Itertools; use maud::{DOCTYPE, Markup, html}; use serde::{Deserialize, Serialize}; @@ -682,18 +684,19 @@ pub async fn manage_project_refresh( if !current_user.can_manage_repo(info.project.id) { return Err(AppError::Status(StatusCode::FORBIDDEN)); } - let client = current_user.client(&state.config.github)?; - let message = - match refresh_project(&state.github, &state.db, info.project.id, Some(&client), true).await - { - Ok(inserted_reports) => { - Message::Info(format!("Fetched {inserted_reports} new reports")) - } - Err(e) => { - tracing::error!("Failed to refresh project: {:?}", e); - Message::Error(format!("Failed to refresh project: {e}")) - } - }; + + // Enqueue a refresh job instead of processing synchronously + let job = RefreshProjectJob { repository_id: info.project.id, full_refresh: true }; + + let mut storage = state.jobs.refresh_project(); + let message = match storage.push(job).await { + Ok(()) => Message::Info("Refresh job queued. Reports will be updated shortly.".to_string()), + Err(e) => { + tracing::error!("Failed to enqueue refresh job: {:?}", e); + Message::Error(format!("Failed to queue refresh: {e}")) + } + }; + session.insert(&format!("manage_{}_message", info.project.id), message).await?; let redirect_url = format!("/manage/{}/{}", params.owner, params.repo); Ok(Redirect::to(&redirect_url).into_response()) diff --git a/crates/web/src/handlers/mod.rs b/crates/web/src/handlers/mod.rs index 2b13429..2acaaf8 100644 --- a/crates/web/src/handlers/mod.rs +++ b/crates/web/src/handlers/mod.rs @@ -23,6 +23,7 @@ mod manage; mod project; mod report; mod treemap; +mod webhook; pub fn build_router() -> Router { Router::new() @@ -55,7 +56,7 @@ pub fn build_router() -> Router { )) .route("/robots.txt", get(common::get_robots)) .route("/api", get(api::overview)) - .route("/api/github/webhook", post(decomp_dev_github::webhook::webhook)) + .route("/api/github/webhook", post(webhook::webhook)) .route("/api/github/oauth", get(decomp_dev_auth::oauth)) .route("/login", get(auth::login)) .route("/logout", post(auth::logout)) diff --git a/crates/web/src/handlers/webhook.rs b/crates/web/src/handlers/webhook.rs new file mode 100644 index 0000000..a0eca09 --- /dev/null +++ b/crates/web/src/handlers/webhook.rs @@ -0,0 +1,175 @@ +use anyhow::Context; +use apalis::prelude::TaskSink; +use axum::{ + extract::State, + http::StatusCode, + response::{IntoResponse, Response}, +}; +use decomp_dev_core::AppError; +use decomp_dev_github::webhook::{GitHubEvent, RunWithPullRequests}; +use decomp_dev_jobs::ProcessWorkflowRunJob; +use octocrab::models::{ + webhook_events::{ + EventInstallation, WebhookEventPayload, + payload::{InstallationWebhookEventAction, WorkflowRunWebhookEventAction}, + }, + workflows::WorkFlow, +}; + +use crate::AppState; + +/// Webhook handler that enqueues jobs for processing instead of handling synchronously. +pub async fn webhook( + State(state): State, + event: GitHubEvent, +) -> Result { + let Some(_installations) = &event.state.github.installations else { + tracing::warn!("Received webhook event {:?} with no GitHub app config", event.event.kind); + return Ok((StatusCode::OK, "No app config").into_response()); + }; + + // Log the event source + let mut owner = None; + if let Some(repository) = &event.event.repository { + owner = repository.owner.as_ref().map(|o| o.login.clone()); + if let Some(full_name) = &repository.full_name { + tracing::info!( + "Received webhook event {:?} from repository {}", + event.event.kind, + full_name + ); + } else { + tracing::info!( + "Received webhook event {:?} from repository ID {}", + event.event.kind, + repository.id.0 + ); + } + } else if let Some(organization) = &event.event.organization { + owner = Some(organization.login.clone()); + tracing::info!( + "Received webhook event {:?} from org {}", + event.event.kind, + organization.login + ); + } else if let Some(sender) = &event.event.sender { + tracing::info!("Received webhook event {:?} from @{}", event.event.kind, sender.login); + } else { + tracing::info!("Received webhook event {:?} from unknown source", event.event.kind); + } + + let installation_id = match &event.event.installation { + Some(EventInstallation::Full(installation)) => Some(installation.id), + Some(EventInstallation::Minimal(installation)) => Some(installation.id), + None => None, + }; + + match &event.event.specific { + WebhookEventPayload::WorkflowRun(inner) => { + if inner.action == WorkflowRunWebhookEventAction::Completed { + let Some(workflow) = &inner.workflow else { + tracing::error!("Received workflow_run event with no workflow"); + return Ok((StatusCode::OK, "No workflow").into_response()); + }; + let _workflow: WorkFlow = match serde_json::from_value(workflow.clone()) { + Ok(workflow) => workflow, + Err(e) => { + tracing::error!("Received workflow_run event with invalid workflow: {e}"); + return Ok((StatusCode::OK, "Invalid workflow").into_response()); + } + }; + let workflow_run: RunWithPullRequests = + match serde_json::from_value(inner.workflow_run.clone()) { + Ok(workflow_run) => workflow_run, + Err(e) => { + tracing::error!( + "Received workflow_run event with invalid workflow_run: {e}" + ); + return Ok((StatusCode::OK, "Invalid workflow run").into_response()); + } + }; + + // Create the job + let job = ProcessWorkflowRunJob { + repository_id: workflow_run.inner.repository.id.into_inner(), + run_id: workflow_run.inner.id.into_inner(), + head_sha: workflow_run.inner.head_sha.clone(), + head_branch: workflow_run.inner.head_branch.clone(), + event: workflow_run.inner.event.clone(), + pull_request_numbers: workflow_run + .pull_requests + .iter() + .map(|pr| pr.number) + .collect(), + installation_id: installation_id.map(|id| id.into_inner()), + }; + + // Enqueue the job + let mut storage = state.jobs.workflow_run(); + storage.push(job).await.context("Failed to enqueue workflow run job")?; + + tracing::info!("Enqueued workflow run {} for processing", workflow_run.inner.id); + } + } + WebhookEventPayload::PullRequest(_inner) => { + // Pull request events are handled as part of workflow_run events + } + WebhookEventPayload::Installation(inner) => { + tracing::info!( + "Installation {:?} for {}", + inner.action, + owner.as_deref().unwrap_or("[unknown]") + ); + if let Some(installation_id) = installation_id { + let installations = event.state.github.installations.as_ref().unwrap(); + match inner.action { + InstallationWebhookEventAction::Created => { + // Installation client is already created by the extractor + } + InstallationWebhookEventAction::Deleted => { + // Remove the installation client + let mut installations = installations.lock().await; + installations.repo_to_installation.retain(|_, v| *v != installation_id); + installations.clients.remove(&installation_id); + } + _ => {} + } + } else { + tracing::warn!("Received installation event with no installation ID"); + } + } + WebhookEventPayload::InstallationRepositories(inner) => { + tracing::info!( + "Installation {:?} for {} repositories changed", + inner.action, + owner.as_deref().unwrap_or("[unknown]") + ); + if let Some(installation_id) = installation_id { + let installations = event.state.github.installations.as_ref().unwrap(); + let mut installations = installations.lock().await; + for repository in &inner.repositories_added { + tracing::info!("Added repository {}", repository.full_name); + installations + .repo_to_installation + .insert(repository.id.into_inner(), installation_id); + } + if !inner.repositories_removed.is_empty() { + for repository in &inner.repositories_removed { + tracing::info!("Removed repository {}", repository.full_name); + } + installations.repo_to_installation.retain(|repo, id| { + if *id != installation_id { + return true; + } + inner.repositories_removed.iter().any(|r| r.id.into_inner() == *repo) + }); + } + } else { + tracing::warn!("Received installation_repositories event with no installation ID"); + } + } + _ => {} + } + + Ok((StatusCode::OK, "Event queued").into_response()) +} diff --git a/crates/web/src/main.rs b/crates/web/src/main.rs index b65ecfb..68d2677 100644 --- a/crates/web/src/main.rs +++ b/crates/web/src/main.rs @@ -22,6 +22,7 @@ use axum::{ use decomp_dev_core::config::{Config, GitHubConfig}; use decomp_dev_db::Database; use decomp_dev_github::{GitHub, webhook::WebhookState}; +use decomp_dev_jobs::{JobContext, JobStorage, WorkerConfig, create_monitor}; use tokio::{net::TcpListener, signal}; use tower::ServiceBuilder; use tower_http::{ @@ -38,10 +39,11 @@ use tracing_subscriber::{EnvFilter, filter::LevelFilter}; use crate::handlers::{build_router, csp::csp_middleware}; #[derive(Clone, FromRef)] -struct AppState { +pub struct AppState { config: Config, db: Database, github: GitHub, + jobs: JobStorage, } impl FromRef for WebhookState { @@ -75,7 +77,11 @@ async fn main() { }; let db = Database::new(&config.db).await.expect("Failed to open database"); let github = GitHub::new(&config.github).await.expect("Failed to create GitHub client"); - let state = AppState { config, db: db.clone(), github }; + + // Set up job storage + let jobs = JobStorage::setup(&db.pool).await.expect("Failed to set up job storage"); + + let state = AppState { config, db: db.clone(), github: github.clone(), jobs: jobs.clone() }; // Create session store let session_store = SqliteStore::new(db.pool.clone()); @@ -90,6 +96,10 @@ async fn main() { .await .expect("Failed to create scheduler"); + // Create the job monitor + let job_context = JobContext { config: state.config.clone(), db: state.db.clone(), github }; + let monitor = create_monitor(jobs, job_context, WorkerConfig::default()); + // Build the router let port = state.config.server.port; let router = app(state, session_store).into_make_service_with_connect_info::(); @@ -124,11 +134,24 @@ async fn main() { .expect("Failed to notify"); } - // Run our service - axum::serve(listener, router) - .with_graceful_shutdown(shutdown_signal()) - .await - .expect("server error"); + // Run both the web server and job monitor concurrently, with graceful shutdown + let web_server = async { + axum::serve(listener, router) + .with_graceful_shutdown(shutdown_signal()) + .await + .map_err(|e| anyhow::anyhow!("Web server error: {e}")) + }; + let job_monitor = async { + monitor + .run_with_signal(shutdown_signal_io()) + .await + .map_err(|e| anyhow::anyhow!("Job monitor error: {e}")) + }; + + // Wait for both to complete gracefully (early return on error) + if let Err(e) = tokio::try_join!(web_server, job_monitor) { + tracing::error!("{e}"); + } #[cfg(unix)] { @@ -167,25 +190,21 @@ fn app(state: AppState, session_store: impl SessionStore + Clone) -> Router { router.layer(middleware).with_state(state) } -async fn shutdown_signal() { - let ctrl_c = async { - signal::ctrl_c().await.expect("failed to install Ctrl+C handler"); - }; +async fn shutdown_signal() { shutdown_signal_io().await.ok(); } +/// Shutdown signal that returns io::Result for apalis compatibility. +async fn shutdown_signal_io() -> std::io::Result<()> { #[cfg(unix)] - let terminate = async { - signal::unix::signal(signal::unix::SignalKind::terminate()) - .expect("failed to install signal handler") - .recv() - .await; - }; - + { + let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate())?; + tokio::select! { + result = signal::ctrl_c() => result, + _ = sigterm.recv() => Ok(()), + } + } #[cfg(not(unix))] - let terminate = std::future::pending::<()>(); - - tokio::select! { - _ = ctrl_c => {}, - _ = terminate => {}, + { + signal::ctrl_c().await } }