This playbook walks you through creating a background Job that runs on a schedule or on-demand via CLI.

Prerequisites

  • Existing extension crate in extensions/
  • Understanding of what task needs to run in the background
  • Cron schedule pattern (or empty for manual-only)

Step 1: Create Your Job Struct

Create a new file in your extension's jobs/ directory:

// extensions/web/src/jobs/cleanup.rs
use systemprompt::traits::{Job, JobContext, JobResult};
use anyhow::Result;

#[derive(Debug, Clone, Copy, Default)]
pub struct CleanupJob;

Step 2: Implement the Job Trait

Implement the required trait methods:

#[async_trait::async_trait]
impl Job for CleanupJob {
    fn name(&self) -> &'static str {
        "cleanup_old_records"
    }

    fn description(&self) -> &'static str {
        "Deletes records older than 30 days"
    }

    fn schedule(&self) -> &'static str {
        "0 0 3 * * *"  // 3am daily
    }

    async fn execute(&self, ctx: &JobContext) -> Result<JobResult> {
        // Job logic here
        Ok(JobResult::success())
    }
}

Required Methods

Method Purpose
name() Unique identifier used in CLI and logs
description() Human-readable description
schedule() Cron expression (6-field, seconds included)
execute() Async function that performs the work

Step 3: Configure the Cron Schedule

Jobs use 6-field cron expressions (seconds, minutes, hours, day, month, weekday):

Pattern Description
0 0 * * * * Every hour
0 */15 * * * * Every 15 minutes
0 0 0 * * * Daily at midnight
0 0 3 * * * Daily at 3am
0 0 */2 * * * Every 2 hours
0 0 0 * * 0 Weekly on Sunday
"" No schedule (manual only)
fn schedule(&self) -> &'static str {
    "0 0 3 * * *"  // Runs at 3:00:00 AM every day
}

Step 4: Access Database from JobContext

Use JobContext to access the database pool:

async fn execute(&self, ctx: &JobContext) -> Result<JobResult> {
    let db = ctx
        .db_pool::<DbPool>()
        .ok_or_else(|| anyhow::anyhow!("Database not available"))?;

    let pool = db
        .pool()
        .ok_or_else(|| anyhow::anyhow!("PgPool not available"))?;

    // Use the pool for queries
    let count = sqlx::query_scalar!("SELECT COUNT(*) FROM my_table")
        .fetch_one(pool.as_ref())
        .await?;

    Ok(JobResult::success())
}

Step 5: Return JobResult with Statistics

Report execution results:

async fn execute(&self, ctx: &JobContext) -> Result<JobResult> {
    let start = std::time::Instant::now();

    // ... job logic ...
    let processed = 42u64;
    let errors = 0u64;

    let duration_ms = start.elapsed().as_millis() as u64;

    Ok(JobResult::success()
        .with_stats(processed, errors)
        .with_duration(duration_ms)
        .with_message(format!("Processed {} items", processed)))
}

JobResult Methods

Method Purpose
JobResult::success() Success with no details
.with_stats(processed, errors) Add item counts
.with_duration(ms) Add execution time
.with_message(msg) Add status message
JobResult::failure(msg) Report failure

Step 6: Register the Job

Use the submit_job! macro at module level:

// At the end of your job file
systemprompt::traits::submit_job!(&CleanupJob);

The job is automatically discovered at startup.

Step 7: Export from Module

Update jobs/mod.rs:

mod cleanup;

pub use cleanup::CleanupJob;

Step 8: Test via CLI

List jobs to verify registration:

systemprompt infra jobs list

Run manually:

systemprompt infra jobs run cleanup_old_records

View execution history:

systemprompt infra jobs history --job cleanup_old_records --limit 10

Complete Example

use std::sync::Arc;
use anyhow::Result;
use systemprompt::database::DbPool;
use systemprompt::traits::{Job, JobContext, JobResult};

#[derive(Debug, Clone, Copy, Default)]
pub struct CleanupJob;

#[async_trait::async_trait]
impl Job for CleanupJob {
    fn name(&self) -> &'static str {
        "cleanup_old_records"
    }

    fn description(&self) -> &'static str {
        "Deletes records older than 30 days"
    }

    fn schedule(&self) -> &'static str {
        "0 0 3 * * *"  // 3am daily
    }

    async fn execute(&self, ctx: &JobContext) -> Result<JobResult> {
        let start = std::time::Instant::now();

        let db = ctx
            .db_pool::<DbPool>()
            .ok_or_else(|| anyhow::anyhow!("Database not available"))?;

        let pool = db
            .pool()
            .ok_or_else(|| anyhow::anyhow!("PgPool not available"))?;

        let deleted = sqlx::query!(
            "DELETE FROM logs WHERE created_at < NOW() - INTERVAL '30 days'"
        )
        .execute(pool.as_ref())
        .await?
        .rows_affected();

        let duration_ms = start.elapsed().as_millis() as u64;

        Ok(JobResult::success()
            .with_stats(deleted, 0)
            .with_duration(duration_ms)
            .with_message(format!("Deleted {} old records", deleted)))
    }
}

systemprompt::traits::submit_job!(&CleanupJob);

Common Patterns

On-Demand Only Jobs

Set empty schedule for manual-only execution:

fn schedule(&self) -> &'static str {
    ""  // No automatic schedule
}

Pipeline Jobs

Jobs that orchestrate other jobs:

async fn execute(&self, ctx: &JobContext) -> Result<JobResult> {
    // Run sub-jobs in sequence
    ContentIngestionJob.execute(ctx).await?;
    CopyExtensionAssetsJob::execute_copy().await?;
    ContentPrerenderJob.execute(ctx).await?;

    Ok(JobResult::success())
}

Error Handling

Return JobResult::failure() for recoverable errors:

async fn execute(&self, ctx: &JobContext) -> Result<JobResult> {
    let db = match ctx.db_pool::<DbPool>() {
        Some(db) => db,
        None => return Ok(JobResult::failure("Database not available")),
    };

    // ... rest of logic
}

Return Err() for unrecoverable errors that should be logged:

async fn execute(&self, ctx: &JobContext) -> Result<JobResult> {
    let critical_data = fetch_required_data()
        .await
        .context("Failed to fetch required data")?;

    // ...
}

Project Structure

extensions/web/src/
├── jobs/
│   ├── mod.rs
│   ├── ingestion.rs
│   ├── cleanup.rs
│   └── publish.rs
└── extension.rs

Checklist

  • Created job struct with #[derive(Debug, Clone, Copy, Default)]
  • Implemented Job trait with all required methods
  • Set unique name() identifier
  • Configured appropriate schedule() cron pattern
  • Accessed database via ctx.db_pool::<DbPool>()
  • Returned JobResult with stats and duration
  • Registered with submit_job! macro
  • Exported from jobs/mod.rs
  • Verified with systemprompt infra jobs list
  • Tested with systemprompt infra jobs run <name>