Integrate apalis for background jobs

This commit is contained in:
Luke Street
2025-12-15 17:44:02 -07:00
parent e6a58d2818
commit 1face2521b
14 changed files with 948 additions and 72 deletions
Generated
+180 -16
View File
@@ -94,6 +94,87 @@ version = "1.0.89"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6"
[[package]]
name = "apalis"
version = "1.0.0-beta.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6556b89bb5cb40dab6e4d7ee1f2abfef6d372bd3aef300a3faa992bcb13bda8"
dependencies = [
"apalis-core",
"futures-util",
"pin-project",
"thiserror 2.0.12",
"tower",
"tracing",
]
[[package]]
name = "apalis-core"
version = "1.0.0-beta.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49039d4eb476cc05e196210153dbb34be180de9a0ef8b7a666fde0f80c5c357d"
dependencies = [
"futures-channel",
"futures-core",
"futures-sink",
"futures-timer",
"futures-util",
"pin-project",
"serde",
"serde_json",
"thiserror 2.0.12",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "apalis-cron"
version = "1.0.0-beta.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53230aa8a09cdbe2e254d1b59999efe9f1cad09b66d48475f51b3fb38be6b840"
dependencies = [
"apalis-core",
"chrono",
"cron",
"futures-util",
"serde",
"ulid",
]
[[package]]
name = "apalis-sql"
version = "1.0.0-beta.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "604ed96ae8ff20d4c6f8533ffc9f8c91c5f99da6bc564de6a37b94829522f9ec"
dependencies = [
"apalis-core",
"chrono",
"serde",
"serde_json",
"thiserror 2.0.12",
]
[[package]]
name = "apalis-sqlite"
version = "1.0.0-beta.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8be443bddc6ba6e023b8364304def0a6e3428f258b5addf5aa74bcb5688eb3d1"
dependencies = [
"apalis-core",
"apalis-sql",
"chrono",
"futures",
"log",
"pin-project",
"serde",
"serde_json",
"sqlx",
"thiserror 2.0.12",
"tokio",
"ulid",
]
[[package]]
name = "approx"
version = "0.5.1"
@@ -734,6 +815,17 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "cron"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5877d3fbf742507b66bc2a1945106bd30dd8504019d596901ddd012a4dd01740"
dependencies = [
"chrono",
"once_cell",
"winnow 0.6.26",
]
[[package]]
name = "croner"
version = "2.1.0"
@@ -959,11 +1051,32 @@ dependencies = [
"webp",
]
[[package]]
name = "decomp-dev-jobs"
version = "0.1.0"
dependencies = [
"anyhow",
"apalis",
"apalis-cron",
"apalis-sqlite",
"decomp-dev-core",
"decomp-dev-db",
"decomp-dev-github",
"octocrab",
"serde",
"serde_json",
"sqlx",
"time",
"tokio",
"tracing",
]
[[package]]
name = "decomp-dev-web"
version = "0.1.0"
dependencies = [
"anyhow",
"apalis",
"axum",
"axum_typed_multipart",
"bytes",
@@ -972,6 +1085,7 @@ dependencies = [
"decomp-dev-db",
"decomp-dev-github",
"decomp-dev-images",
"decomp-dev-jobs",
"hex",
"image",
"itertools 0.14.0",
@@ -980,6 +1094,7 @@ dependencies = [
"maud",
"mime",
"objdiff-core",
"octocrab",
"prost",
"regex",
"reqwest",
@@ -1361,6 +1476,12 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
[[package]]
name = "futures-timer"
version = "3.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24"
[[package]]
name = "futures-util"
version = "0.3.31"
@@ -4001,9 +4122,9 @@ dependencies = [
[[package]]
name = "sqlx"
version = "0.8.5"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3c3a85280daca669cfd3bcb68a337882a8bc57ec882f72c5d13a430613a738e"
checksum = "1fefb893899429669dcdd979aff487bd78f4064e5e7907e4269081e0ef7d97dc"
dependencies = [
"sqlx-core",
"sqlx-macros",
@@ -4014,12 +4135,13 @@ dependencies = [
[[package]]
name = "sqlx-core"
version = "0.8.5"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f743f2a3cea30a58cd479013f75550e879009e3a02f616f18ca699335aa248c3"
checksum = "ee6798b1838b6a0f69c007c133b8df5866302197e404e8b6ee8ed3e3a5e68dc6"
dependencies = [
"base64 0.22.1",
"bytes",
"chrono",
"crc",
"crossbeam-queue",
"either",
@@ -4035,6 +4157,7 @@ dependencies = [
"memchr",
"once_cell",
"percent-encoding",
"rustls",
"serde",
"serde_json",
"sha2",
@@ -4045,13 +4168,14 @@ dependencies = [
"tokio-stream",
"tracing",
"url",
"webpki-roots 0.26.11",
]
[[package]]
name = "sqlx-macros"
version = "0.8.5"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f4200e0fde19834956d4252347c12a083bdcb237d7a1a1446bffd8768417dce"
checksum = "a2d452988ccaacfbf5e0bdbc348fb91d7c8af5bee192173ac3636b5fb6e6715d"
dependencies = [
"proc-macro2",
"quote",
@@ -4062,9 +4186,9 @@ dependencies = [
[[package]]
name = "sqlx-macros-core"
version = "0.8.5"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "882ceaa29cade31beca7129b6beeb05737f44f82dbe2a9806ecea5a7093d00b7"
checksum = "19a9c1841124ac5a61741f96e1d9e2ec77424bf323962dd894bdb93f37d5219b"
dependencies = [
"dotenvy",
"either",
@@ -4081,22 +4205,22 @@ dependencies = [
"sqlx-postgres",
"sqlx-sqlite",
"syn 2.0.100",
"tempfile",
"tokio",
"url",
]
[[package]]
name = "sqlx-mysql"
version = "0.8.5"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0afdd3aa7a629683c2d750c2df343025545087081ab5942593a5288855b1b7a7"
checksum = "aa003f0038df784eb8fecbbac13affe3da23b45194bd57dba231c8f48199c526"
dependencies = [
"atoi",
"base64 0.22.1",
"bitflags 2.9.0",
"byteorder",
"bytes",
"chrono",
"crc",
"digest",
"dotenvy",
@@ -4131,14 +4255,15 @@ dependencies = [
[[package]]
name = "sqlx-postgres"
version = "0.8.5"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0bedbe1bbb5e2615ef347a5e9d8cd7680fb63e77d9dafc0f29be15e53f1ebe6"
checksum = "db58fcd5a53cf07c184b154801ff91347e4c30d17a3562a635ff028ad5deda46"
dependencies = [
"atoi",
"base64 0.22.1",
"bitflags 2.9.0",
"byteorder",
"chrono",
"crc",
"dotenvy",
"etcetera",
@@ -4169,11 +4294,12 @@ dependencies = [
[[package]]
name = "sqlx-sqlite"
version = "0.8.5"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c26083e9a520e8eb87a06b12347679b142dc2ea29e6e409f805644a7a979a5bc"
checksum = "c2d12fe70b2c1b4401038055f90f151b78208de1f9f89a7dbfd41587a10c3eea"
dependencies = [
"atoi",
"chrono",
"flume",
"futures-channel",
"futures-core",
@@ -4594,7 +4720,7 @@ dependencies = [
"serde",
"serde_spanned",
"toml_datetime",
"winnow",
"winnow 0.7.6",
]
[[package]]
@@ -4850,6 +4976,17 @@ version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f720def6ce1ee2fc44d40ac9ed6d3a59c361c80a75a7aa8e75bb9baed31cf2ea"
[[package]]
name = "ulid"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "470dbf6591da1b39d43c14523b2b469c86879a53e8b758c8e090a470fe7b1fbe"
dependencies = [
"rand 0.9.1",
"serde",
"web-time",
]
[[package]]
name = "unicase"
version = "2.8.1"
@@ -5153,6 +5290,24 @@ dependencies = [
"libwebp-sys",
]
[[package]]
name = "webpki-roots"
version = "0.26.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9"
dependencies = [
"webpki-roots 1.0.4",
]
[[package]]
name = "webpki-roots"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2878ef029c47c6e8cf779119f20fcf52bde7ad42a731b2a304bc221df17571e"
dependencies = [
"rustls-pki-types",
]
[[package]]
name = "weezl"
version = "0.1.8"
@@ -5546,6 +5701,15 @@ version = "0.53.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "271414315aff87387382ec3d271b52d7ae78726f5d44ac98b4f4030c91880486"
[[package]]
name = "winnow"
version = "0.6.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e90edd2ac1aa278a5c4599b1d89cf03074b610800f866d4026dc199d7929a28"
dependencies = [
"memchr",
]
[[package]]
name = "winnow"
version = "0.7.6"
+4
View File
@@ -7,6 +7,7 @@ members = [
"crates/db",
"crates/github",
"crates/images",
"crates/jobs",
"crates/web",
]
@@ -17,6 +18,9 @@ rust-version = "1.88"
[workspace.dependencies]
anyhow = "1.0"
apalis = "1.0.0-beta.2"
apalis-cron = "1.0.0-beta.2"
apalis-sqlite = "1.0.0-beta.2"
axum = { version = "0.8", features = ["macros"] }
futures-util = "0.3.31"
hex = "0.4"
+21
View File
@@ -0,0 +1,21 @@
[package]
name = "decomp-dev-jobs"
version.workspace = true
edition.workspace = true
publish = false
[dependencies]
anyhow.workspace = true
apalis.workspace = true
apalis-cron.workspace = true
apalis-sqlite.workspace = true
decomp-dev-core = { path = "../core" }
decomp-dev-db = { path = "../db" }
decomp-dev-github = { path = "../github" }
octocrab.workspace = true
serde.workspace = true
serde_json.workspace = true
sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite"] }
time.workspace = true
tokio.workspace = true
tracing.workspace = true
+323
View File
@@ -0,0 +1,323 @@
use anyhow::{Context, Result};
use apalis::prelude::*;
use octocrab::models::InstallationId;
use crate::{JobContext, ProcessWorkflowRunJob, RefreshProjectJob};
/// Process a completed workflow run job.
///
/// This handles:
/// - Fetching the project info from the database
/// - Getting the appropriate GitHub client (installation or personal token)
/// - Processing workflow run artifacts
/// - Inserting reports for push events
/// - Generating and posting PR comments for pull request events
pub async fn process_workflow_run_job(
job: ProcessWorkflowRunJob,
ctx: Data<JobContext>,
) -> Result<()> {
tracing::info!(
"Processing workflow run job: repo={} run={} event={}",
job.repository_id,
job.run_id,
job.event
);
// Look up the project
let Some(project_info) = ctx
.db
.get_project_info_by_id(job.repository_id, None)
.await
.context("Failed to fetch project info")?
else {
tracing::warn!("No project found for repository ID {}", job.repository_id);
return Ok(());
};
// Get the appropriate GitHub client
let client = if let Some(installation_id) = job.installation_id {
if let Some(installations) = &ctx.github.installations {
let mut installations = installations.lock().await;
installations
.client_for_installation(InstallationId(installation_id))
.await
.context("Failed to get installation client")?
} else {
ctx.github.client.clone()
}
} else {
ctx.github.client_for(job.repository_id).await.context("Failed to get GitHub client")?
};
// Fetch the repository to get the default branch
let repository =
client.repos_by_id(job.repository_id).get().await.context("Failed to fetch repository")?;
// Process the workflow run to get artifacts
let result =
decomp_dev_github::process_workflow_run(&client, &project_info.project, job.run_id.into())
.await
.context("Failed to process workflow run")?;
tracing::debug!(
"Processed workflow run {} ({}) (artifacts {})",
job.run_id,
job.head_sha,
result.artifacts.len()
);
if result.artifacts.is_empty() {
return Ok(());
}
// Create commit info
let commit = decomp_dev_core::models::Commit {
sha: job.head_sha.clone(),
timestamp: time::UtcDateTime::now(),
message: None,
};
// Handle push events - insert reports into database
if job.event == "push" {
let is_default_branch = match &repository.default_branch {
Some(default_branch) => default_branch == &job.head_branch,
None => matches!(job.head_branch.as_str(), "master" | "main"),
};
if is_default_branch {
for artifact in result.artifacts {
let start = std::time::Instant::now();
ctx.db
.insert_report(
&project_info.project,
&commit,
&artifact.version,
*artifact.report,
)
.await
.context("Failed to insert report")?;
let duration = start.elapsed();
tracing::info!(
"Inserted report {} ({}) in {}ms",
artifact.version,
commit.sha,
duration.as_millis()
);
}
}
} else if matches!(job.event.as_str(), "pull_request" | "pull_request_target") {
// Handle pull request events - generate and post comments
if !project_info.project.enable_pr_comments {
return Ok(());
}
let Some(base_commit) = project_info.commit else {
tracing::warn!("No base commit found for repository ID {}", job.repository_id);
return Ok(());
};
// Get all versions that exist on the base branch
let base_versions = ctx
.db
.get_versions_for_commit(
&project_info.project.owner,
&project_info.project.repo,
&base_commit.sha,
)
.await
.context("Failed to get base versions")?;
let mut version_comments = Vec::new();
// Process existing artifacts from PR
for artifact in &result.artifacts {
let cached_report = ctx
.db
.get_report(
&project_info.project.owner,
&project_info.project.repo,
&base_commit.sha,
&artifact.version,
)
.await
.context("Failed to get cached report")?;
if let Some(cached_report) = cached_report {
let report_file = ctx
.db
.upgrade_report(&cached_report)
.await
.context("Failed to upgrade report")?;
let report = report_file.report.flatten();
let changes =
decomp_dev_github::changes::generate_changes(&report, &artifact.report)
.context("Failed to generate changes")?;
version_comments.push(decomp_dev_github::changes::generate_comment(
&report,
&artifact.report,
Some(&report_file.version),
Some(&report_file.commit),
Some(&commit),
changes,
));
} else {
tracing::warn!(
"No base report found for version {} (base {})",
artifact.version,
base_commit.sha
);
version_comments.push(decomp_dev_github::changes::generate_missing_report_comment(
&artifact.version,
Some(&base_commit),
Some(&commit),
));
}
}
// Check for versions that exist on base but are missing from PR
for base_version in &base_versions {
if !result.artifacts.iter().any(|a| a.version == *base_version) {
version_comments.push(decomp_dev_github::changes::generate_missing_report_comment(
base_version,
Some(&base_commit),
Some(&commit),
));
}
}
if !version_comments.is_empty() {
let combined_comment =
decomp_dev_github::changes::generate_combined_comment(version_comments);
// Post/update comments for each associated PR
for pr_number in &job.pull_request_numbers {
post_pr_comment(
&client,
&project_info.project,
&repository,
*pr_number,
&combined_comment,
)
.await
.context("Failed to post PR comment")?;
}
}
}
Ok(())
}
/// Post or update a PR comment with the report.
async fn post_pr_comment(
client: &octocrab::Octocrab,
project: &decomp_dev_core::models::Project,
repository: &octocrab::models::Repository,
pr_number: u64,
combined_comment: &str,
) -> Result<()> {
use decomp_dev_core::models::PullReportStyle;
if project.pr_report_style == PullReportStyle::Description {
let pulls = client.pulls(&project.owner, &project.repo);
let pull = pulls.get(pr_number).await.context("Failed to get pull request")?;
let start_marker = "<!-- decomp.dev report start -->";
let end_marker = "<!-- decomp.dev report end -->";
let new_section = format!("{start_marker}\n{combined_comment}\n{end_marker}");
let existing_body = pull.body.unwrap_or_default();
let new_body = if let Some(start_idx) = existing_body.find(start_marker) {
if let Some(end_rel) = existing_body[start_idx..].find(end_marker) {
let end_idx = start_idx + end_rel + end_marker.len();
format!(
"{}{}{}",
&existing_body[..start_idx],
new_section,
&existing_body[end_idx..]
)
} else {
format!("{existing_body}\n\n---\n\n{new_section}")
}
} else if existing_body.trim().is_empty() {
new_section
} else {
format!("{}\n\n---\n\n{}", existing_body.trim(), new_section)
};
pulls
.update(pr_number)
.body(new_body)
.send()
.await
.context("Failed to update pull request body")?;
} else {
let repository_id = repository.id.into_inner();
let issues = client.issues_by_id(repository_id);
// Only fetch first page for now
let existing_comments = issues.list_comments(pr_number).send().await?;
// Find existing report comments
let existing_report_comments: Vec<_> = existing_comments
.items
.iter()
.filter(|comment| {
comment.body.as_ref().is_some_and(|body| body.contains("### Report for "))
})
.collect();
if let Some(first_comment) = existing_report_comments.first() {
// Update the first comment
issues
.update_comment(first_comment.id, combined_comment.to_string())
.await
.context("Failed to update existing comment")?;
// Delete any additional report comments
for comment in existing_report_comments.iter().skip(1) {
if let Err(e) = issues.delete_comment(comment.id).await {
tracing::warn!("Failed to delete old comment {}: {}", comment.id, e);
}
}
} else {
// Create new comment
issues
.create_comment(pr_number, combined_comment.to_string())
.await
.context("Failed to create comment")?;
}
}
Ok(())
}
/// Process a project refresh job.
///
/// This calls the existing refresh_project function to fetch workflow runs,
/// download artifacts, and insert reports.
pub async fn process_refresh_project_job(
job: RefreshProjectJob,
ctx: Data<JobContext>,
) -> Result<()> {
tracing::info!(
"Processing refresh project job: repo={} full_refresh={}",
job.repository_id,
job.full_refresh
);
match decomp_dev_github::refresh_project(
&ctx.github,
&ctx.db,
job.repository_id,
None, // No client override for background jobs
job.full_refresh,
)
.await
{
Ok(count) => {
tracing::info!("Refreshed project {} with {} new reports", job.repository_id, count);
Ok(())
}
Err(e) => {
tracing::error!("Failed to refresh project {}: {:?}", job.repository_id, e);
Err(e)
}
}
}
+5
View File
@@ -0,0 +1,5 @@
mod refresh;
mod workflow_run;
pub use refresh::RefreshProjectJob;
pub use workflow_run::ProcessWorkflowRunJob;
+13
View File
@@ -0,0 +1,13 @@
use serde::{Deserialize, Serialize};
/// Job to refresh a project's reports from GitHub Actions.
///
/// This job fetches workflow runs from GitHub, downloads artifacts,
/// parses reports, and inserts them into the database.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RefreshProjectJob {
/// The repository ID to refresh.
pub repository_id: u64,
/// Whether to do a full refresh (fetch all runs) or partial (stop at known commit).
pub full_refresh: bool,
}
+24
View File
@@ -0,0 +1,24 @@
use serde::{Deserialize, Serialize};
/// Job to process a completed GitHub Actions workflow run.
///
/// This job downloads artifacts from the workflow run, parses report files,
/// inserts them into the database (for push events), and posts/updates
/// PR comments (for pull request events).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProcessWorkflowRunJob {
/// The repository ID (used to look up the project).
pub repository_id: u64,
/// The workflow run ID to process.
pub run_id: u64,
/// The SHA of the commit that triggered the workflow.
pub head_sha: String,
/// The branch that triggered the workflow.
pub head_branch: String,
/// The event that triggered the workflow ("push", "pull_request", etc.).
pub event: String,
/// PR numbers associated with this workflow run (empty for push events).
pub pull_request_numbers: Vec<u64>,
/// The GitHub App installation ID, if this was triggered via an installation.
pub installation_id: Option<u64>,
}
+98
View File
@@ -0,0 +1,98 @@
mod handlers;
mod jobs;
use std::time::Duration;
use anyhow::Result;
use apalis::prelude::*;
use apalis_sqlite::{CompactType, SqliteStorage, fetcher::SqliteFetcher};
use decomp_dev_core::config::Config;
use decomp_dev_db::Database;
use decomp_dev_github::GitHub;
pub use handlers::{process_refresh_project_job, process_workflow_run_job};
pub use jobs::{ProcessWorkflowRunJob, RefreshProjectJob};
use sqlx::sqlite::SqlitePool;
/// Shared context available to all job handlers.
#[derive(Clone)]
pub struct JobContext {
pub config: Config,
pub db: Database,
pub github: GitHub,
}
/// Type alias for the default codec used by SqliteStorage.
type DefaultCodec = apalis::prelude::json::JsonCodec<CompactType>;
/// Type alias for workflow run storage.
pub type WorkflowRunStorage = SqliteStorage<ProcessWorkflowRunJob, DefaultCodec, SqliteFetcher>;
/// Type alias for refresh project storage.
pub type RefreshProjectStorage = SqliteStorage<RefreshProjectJob, DefaultCodec, SqliteFetcher>;
/// Storage handles for pushing jobs from request handlers.
#[derive(Clone)]
pub struct JobStorage {
workflow_run: WorkflowRunStorage,
refresh_project: RefreshProjectStorage,
}
impl JobStorage {
/// Set up job storage tables and create storage instances.
pub async fn setup(pool: &SqlitePool) -> Result<Self> {
SqliteStorage::setup(pool).await?;
Ok(Self {
workflow_run: SqliteStorage::new(pool),
refresh_project: SqliteStorage::new(pool),
})
}
/// Get a clone of the workflow run storage for pushing jobs.
pub fn workflow_run(&self) -> WorkflowRunStorage { self.workflow_run.clone() }
/// Get a clone of the refresh project storage for pushing jobs.
pub fn refresh_project(&self) -> RefreshProjectStorage { self.refresh_project.clone() }
}
/// Configuration for job workers.
#[derive(Debug, Clone)]
pub struct WorkerConfig {
/// Maximum concurrent workflow run jobs.
pub workflow_run_concurrency: usize,
/// Maximum concurrent refresh project jobs.
pub refresh_project_concurrency: usize,
/// Number of retry attempts for failed jobs.
pub retry_attempts: usize,
}
impl Default for WorkerConfig {
fn default() -> Self {
Self { workflow_run_concurrency: 5, refresh_project_concurrency: 3, retry_attempts: 3 }
}
}
/// Create the job monitor with all workers.
pub fn create_monitor(storage: JobStorage, context: JobContext, config: WorkerConfig) -> Monitor {
let ctx1 = context.clone();
let ctx2 = context;
let config1 = config.clone();
let config2 = config;
Monitor::new()
.register(move |_| {
WorkerBuilder::new("workflow-run-worker")
.backend(storage.workflow_run.clone())
.concurrency(config1.workflow_run_concurrency)
.data(ctx1.clone())
.build(process_workflow_run_job)
})
.register(move |_| {
WorkerBuilder::new("refresh-project-worker")
.backend(storage.refresh_project.clone())
.concurrency(config2.refresh_project_concurrency)
.data(ctx2.clone())
.build(process_refresh_project_job)
})
.shutdown_timeout(Duration::from_secs(30))
}
+3
View File
@@ -7,6 +7,7 @@ default-run = "decomp-dev-web"
[dependencies]
anyhow.workspace = true
apalis.workspace = true
axum.workspace = true
axum_typed_multipart = "0.16"
decomp-dev-auth = { path = "../auth" }
@@ -14,6 +15,7 @@ decomp-dev-core = { path = "../core" }
decomp-dev-db = { path = "../db" }
decomp-dev-github = { path = "../github" }
decomp-dev-images = { path = "../images" }
decomp-dev-jobs = { path = "../jobs" }
hex.workspace = true
image.workspace = true
itertools = "0.14"
@@ -21,6 +23,7 @@ lexicmp = "0.2"
maud.workspace = true
mime.workspace = true
objdiff-core.workspace = true
octocrab.workspace = true
regex.workspace = true
reqwest.workspace = true
serde.workspace = true
+43 -20
View File
@@ -1,4 +1,6 @@
use anyhow::Result;
use apalis::prelude::TaskSink;
use decomp_dev_jobs::RefreshProjectJob;
use tokio_cron_scheduler::{Job, JobScheduler};
use tower_sessions::ExpiredDeletion;
use tracing::log;
@@ -12,57 +14,75 @@ pub async fn create(
session_store: impl ExpiredDeletion + Clone,
) -> Result<Scheduler> {
let sched = JobScheduler::new().await?;
// Every 5 minutes: Queue partial refresh jobs for projects without active installations
{
let state = state.clone();
sched
.add(Job::new_async("every 5 minutes", move |_uuid, _l| {
let state = state.clone();
Box::pin(async move {
refresh_projects(&state, false).await.expect("Failed to refresh projects");
if let Err(e) = queue_refresh_jobs(&state, false).await {
log::error!("Failed to queue refresh jobs: {:?}", e);
}
})
})?)
.await?;
}
// Every 12 hours: Queue full refresh jobs for all projects
{
let state = state.clone();
sched
.add(Job::new_async("every 12 hours", move |_uuid, _l| {
let state = state.clone();
Box::pin(async move {
refresh_projects(&state, true).await.expect("Failed to refresh projects");
if let Err(e) = queue_refresh_jobs(&state, true).await {
log::error!("Failed to queue full refresh jobs: {:?}", e);
}
})
})?)
.await?;
}
// At midnight: Cleanup report units and images
{
sched
.add(Job::new_async("at midnight", move |_uuid, _l| {
let state = state.clone();
Box::pin(async move {
state.db.cleanup_report_units().await.expect("Failed to clean up report units");
state.db.cleanup_images().await.expect("Failed to clean up images");
if let Err(e) = state.db.cleanup_report_units().await {
log::error!("Failed to clean up report units: {:?}", e);
}
if let Err(e) = state.db.cleanup_images().await {
log::error!("Failed to clean up images: {:?}", e);
}
})
})?)
.await?;
}
// Every 1 minute: Delete expired sessions
{
sched
.add(Job::new_async("every 1 minute", move |_uuid, _l| {
let session_store = session_store.clone();
Box::pin(async move {
session_store
.delete_expired()
.await
.expect("Failed to delete expired sessions");
if let Err(e) = session_store.delete_expired().await {
log::error!("Failed to delete expired sessions: {:?}", e);
}
})
})?)
.await?;
}
sched.start().await?;
Ok(sched)
}
pub async fn refresh_projects(state: &AppState, full_refresh: bool) -> Result<()> {
/// Queue refresh jobs for all enabled projects.
async fn queue_refresh_jobs(state: &AppState, full_refresh: bool) -> Result<()> {
let mut queued = 0;
for project_info in state.db.get_projects().await? {
if !project_info.project.enabled {
log::debug!(
@@ -73,7 +93,7 @@ pub async fn refresh_projects(state: &AppState, full_refresh: bool) -> Result<()
continue;
}
if !full_refresh {
// Skip projects with active app installations
// Skip projects with active app installations (they get updates via webhooks)
if let Some(installations) = &state.github.installations {
let installations = installations.lock().await;
if installations.repo_to_installation.contains_key(&project_info.project.id) {
@@ -81,22 +101,25 @@ pub async fn refresh_projects(state: &AppState, full_refresh: bool) -> Result<()
}
}
}
if let Err(e) = decomp_dev_github::refresh_project(
&state.github,
&state.db,
project_info.project.id,
None,
full_refresh,
)
.await
{
let job = RefreshProjectJob { repository_id: project_info.project.id, full_refresh };
let mut storage = state.jobs.refresh_project();
if let Err(e) = storage.push(job).await {
log::error!(
"Failed to refresh {}/{}: {:?}",
"Failed to queue refresh job for {}/{}: {:?}",
project_info.project.owner,
project_info.project.repo,
e
);
} else {
queued += 1;
}
}
if queued > 0 {
log::info!("Queued {} refresh jobs (full_refresh={})", queued, full_refresh);
}
Ok(())
}
+15 -12
View File
@@ -1,6 +1,7 @@
use std::io::Cursor;
use anyhow::{Context, Result};
use apalis::prelude::TaskSink;
use axum::{
Form,
extract::{Path, State},
@@ -20,6 +21,7 @@ use decomp_dev_core::{
use decomp_dev_github::{
check_for_reports, extract_github_url, graphql::RepositoryPermission, refresh_project,
};
use decomp_dev_jobs::RefreshProjectJob;
use itertools::Itertools;
use maud::{DOCTYPE, Markup, html};
use serde::{Deserialize, Serialize};
@@ -682,18 +684,19 @@ pub async fn manage_project_refresh(
if !current_user.can_manage_repo(info.project.id) {
return Err(AppError::Status(StatusCode::FORBIDDEN));
}
let client = current_user.client(&state.config.github)?;
let message =
match refresh_project(&state.github, &state.db, info.project.id, Some(&client), true).await
{
Ok(inserted_reports) => {
Message::Info(format!("Fetched {inserted_reports} new reports"))
}
Err(e) => {
tracing::error!("Failed to refresh project: {:?}", e);
Message::Error(format!("Failed to refresh project: {e}"))
}
};
// Enqueue a refresh job instead of processing synchronously
let job = RefreshProjectJob { repository_id: info.project.id, full_refresh: true };
let mut storage = state.jobs.refresh_project();
let message = match storage.push(job).await {
Ok(()) => Message::Info("Refresh job queued. Reports will be updated shortly.".to_string()),
Err(e) => {
tracing::error!("Failed to enqueue refresh job: {:?}", e);
Message::Error(format!("Failed to queue refresh: {e}"))
}
};
session.insert(&format!("manage_{}_message", info.project.id), message).await?;
let redirect_url = format!("/manage/{}/{}", params.owner, params.repo);
Ok(Redirect::to(&redirect_url).into_response())
+2 -1
View File
@@ -23,6 +23,7 @@ mod manage;
mod project;
mod report;
mod treemap;
mod webhook;
pub fn build_router() -> Router<AppState> {
Router::new()
@@ -55,7 +56,7 @@ pub fn build_router() -> Router<AppState> {
))
.route("/robots.txt", get(common::get_robots))
.route("/api", get(api::overview))
.route("/api/github/webhook", post(decomp_dev_github::webhook::webhook))
.route("/api/github/webhook", post(webhook::webhook))
.route("/api/github/oauth", get(decomp_dev_auth::oauth))
.route("/login", get(auth::login))
.route("/logout", post(auth::logout))
+175
View File
@@ -0,0 +1,175 @@
use anyhow::Context;
use apalis::prelude::TaskSink;
use axum::{
extract::State,
http::StatusCode,
response::{IntoResponse, Response},
};
use decomp_dev_core::AppError;
use decomp_dev_github::webhook::{GitHubEvent, RunWithPullRequests};
use decomp_dev_jobs::ProcessWorkflowRunJob;
use octocrab::models::{
webhook_events::{
EventInstallation, WebhookEventPayload,
payload::{InstallationWebhookEventAction, WorkflowRunWebhookEventAction},
},
workflows::WorkFlow,
};
use crate::AppState;
/// Webhook handler that enqueues jobs for processing instead of handling synchronously.
pub async fn webhook(
State(state): State<AppState>,
event: GitHubEvent,
) -> Result<Response, AppError> {
let Some(_installations) = &event.state.github.installations else {
tracing::warn!("Received webhook event {:?} with no GitHub app config", event.event.kind);
return Ok((StatusCode::OK, "No app config").into_response());
};
// Log the event source
let mut owner = None;
if let Some(repository) = &event.event.repository {
owner = repository.owner.as_ref().map(|o| o.login.clone());
if let Some(full_name) = &repository.full_name {
tracing::info!(
"Received webhook event {:?} from repository {}",
event.event.kind,
full_name
);
} else {
tracing::info!(
"Received webhook event {:?} from repository ID {}",
event.event.kind,
repository.id.0
);
}
} else if let Some(organization) = &event.event.organization {
owner = Some(organization.login.clone());
tracing::info!(
"Received webhook event {:?} from org {}",
event.event.kind,
organization.login
);
} else if let Some(sender) = &event.event.sender {
tracing::info!("Received webhook event {:?} from @{}", event.event.kind, sender.login);
} else {
tracing::info!("Received webhook event {:?} from unknown source", event.event.kind);
}
let installation_id = match &event.event.installation {
Some(EventInstallation::Full(installation)) => Some(installation.id),
Some(EventInstallation::Minimal(installation)) => Some(installation.id),
None => None,
};
match &event.event.specific {
WebhookEventPayload::WorkflowRun(inner) => {
if inner.action == WorkflowRunWebhookEventAction::Completed {
let Some(workflow) = &inner.workflow else {
tracing::error!("Received workflow_run event with no workflow");
return Ok((StatusCode::OK, "No workflow").into_response());
};
let _workflow: WorkFlow = match serde_json::from_value(workflow.clone()) {
Ok(workflow) => workflow,
Err(e) => {
tracing::error!("Received workflow_run event with invalid workflow: {e}");
return Ok((StatusCode::OK, "Invalid workflow").into_response());
}
};
let workflow_run: RunWithPullRequests =
match serde_json::from_value(inner.workflow_run.clone()) {
Ok(workflow_run) => workflow_run,
Err(e) => {
tracing::error!(
"Received workflow_run event with invalid workflow_run: {e}"
);
return Ok((StatusCode::OK, "Invalid workflow run").into_response());
}
};
// Create the job
let job = ProcessWorkflowRunJob {
repository_id: workflow_run.inner.repository.id.into_inner(),
run_id: workflow_run.inner.id.into_inner(),
head_sha: workflow_run.inner.head_sha.clone(),
head_branch: workflow_run.inner.head_branch.clone(),
event: workflow_run.inner.event.clone(),
pull_request_numbers: workflow_run
.pull_requests
.iter()
.map(|pr| pr.number)
.collect(),
installation_id: installation_id.map(|id| id.into_inner()),
};
// Enqueue the job
let mut storage = state.jobs.workflow_run();
storage.push(job).await.context("Failed to enqueue workflow run job")?;
tracing::info!("Enqueued workflow run {} for processing", workflow_run.inner.id);
}
}
WebhookEventPayload::PullRequest(_inner) => {
// Pull request events are handled as part of workflow_run events
}
WebhookEventPayload::Installation(inner) => {
tracing::info!(
"Installation {:?} for {}",
inner.action,
owner.as_deref().unwrap_or("[unknown]")
);
if let Some(installation_id) = installation_id {
let installations = event.state.github.installations.as_ref().unwrap();
match inner.action {
InstallationWebhookEventAction::Created => {
// Installation client is already created by the extractor
}
InstallationWebhookEventAction::Deleted => {
// Remove the installation client
let mut installations = installations.lock().await;
installations.repo_to_installation.retain(|_, v| *v != installation_id);
installations.clients.remove(&installation_id);
}
_ => {}
}
} else {
tracing::warn!("Received installation event with no installation ID");
}
}
WebhookEventPayload::InstallationRepositories(inner) => {
tracing::info!(
"Installation {:?} for {} repositories changed",
inner.action,
owner.as_deref().unwrap_or("[unknown]")
);
if let Some(installation_id) = installation_id {
let installations = event.state.github.installations.as_ref().unwrap();
let mut installations = installations.lock().await;
for repository in &inner.repositories_added {
tracing::info!("Added repository {}", repository.full_name);
installations
.repo_to_installation
.insert(repository.id.into_inner(), installation_id);
}
if !inner.repositories_removed.is_empty() {
for repository in &inner.repositories_removed {
tracing::info!("Removed repository {}", repository.full_name);
}
installations.repo_to_installation.retain(|repo, id| {
if *id != installation_id {
return true;
}
inner.repositories_removed.iter().any(|r| r.id.into_inner() == *repo)
});
}
} else {
tracing::warn!("Received installation_repositories event with no installation ID");
}
}
_ => {}
}
Ok((StatusCode::OK, "Event queued").into_response())
}
+42 -23
View File
@@ -22,6 +22,7 @@ use axum::{
use decomp_dev_core::config::{Config, GitHubConfig};
use decomp_dev_db::Database;
use decomp_dev_github::{GitHub, webhook::WebhookState};
use decomp_dev_jobs::{JobContext, JobStorage, WorkerConfig, create_monitor};
use tokio::{net::TcpListener, signal};
use tower::ServiceBuilder;
use tower_http::{
@@ -38,10 +39,11 @@ use tracing_subscriber::{EnvFilter, filter::LevelFilter};
use crate::handlers::{build_router, csp::csp_middleware};
#[derive(Clone, FromRef)]
struct AppState {
pub struct AppState {
config: Config,
db: Database,
github: GitHub,
jobs: JobStorage,
}
impl FromRef<AppState> for WebhookState {
@@ -75,7 +77,11 @@ async fn main() {
};
let db = Database::new(&config.db).await.expect("Failed to open database");
let github = GitHub::new(&config.github).await.expect("Failed to create GitHub client");
let state = AppState { config, db: db.clone(), github };
// Set up job storage
let jobs = JobStorage::setup(&db.pool).await.expect("Failed to set up job storage");
let state = AppState { config, db: db.clone(), github: github.clone(), jobs: jobs.clone() };
// Create session store
let session_store = SqliteStore::new(db.pool.clone());
@@ -90,6 +96,10 @@ async fn main() {
.await
.expect("Failed to create scheduler");
// Create the job monitor
let job_context = JobContext { config: state.config.clone(), db: state.db.clone(), github };
let monitor = create_monitor(jobs, job_context, WorkerConfig::default());
// Build the router
let port = state.config.server.port;
let router = app(state, session_store).into_make_service_with_connect_info::<SocketAddr>();
@@ -124,11 +134,24 @@ async fn main() {
.expect("Failed to notify");
}
// Run our service
axum::serve(listener, router)
.with_graceful_shutdown(shutdown_signal())
.await
.expect("server error");
// Run both the web server and job monitor concurrently, with graceful shutdown
let web_server = async {
axum::serve(listener, router)
.with_graceful_shutdown(shutdown_signal())
.await
.map_err(|e| anyhow::anyhow!("Web server error: {e}"))
};
let job_monitor = async {
monitor
.run_with_signal(shutdown_signal_io())
.await
.map_err(|e| anyhow::anyhow!("Job monitor error: {e}"))
};
// Wait for both to complete gracefully (early return on error)
if let Err(e) = tokio::try_join!(web_server, job_monitor) {
tracing::error!("{e}");
}
#[cfg(unix)]
{
@@ -167,25 +190,21 @@ fn app(state: AppState, session_store: impl SessionStore + Clone) -> Router {
router.layer(middleware).with_state(state)
}
async fn shutdown_signal() {
let ctrl_c = async {
signal::ctrl_c().await.expect("failed to install Ctrl+C handler");
};
async fn shutdown_signal() { shutdown_signal_io().await.ok(); }
/// Shutdown signal that returns io::Result for apalis compatibility.
async fn shutdown_signal_io() -> std::io::Result<()> {
#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};
{
let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate())?;
tokio::select! {
result = signal::ctrl_c() => result,
_ = sigterm.recv() => Ok(()),
}
}
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
{
signal::ctrl_c().await
}
}