ESC
Type to search...

Background Jobs

Persistent job queue backed by PostgreSQL

Background jobs let you defer work to run outside the request cycle. Sending emails, processing uploads, generating reports — anything that shouldn't block an HTTP response.

Rapina's job system uses your existing PostgreSQL database as the queue. No Redis, no RabbitMQ, no extra infrastructure. Jobs are rows in a rapina_jobs table, claimed by in-process workers with FOR UPDATE SKIP LOCKED for safe concurrent processing.

This page covers setup, defining jobs, enqueuing, running the worker, and the retry system.

Cron Scheduler vs Background Jobs

tl;dr: Use Background Jobs for durable, transactional work that must complete reliably. Use the Cron Scheduler for lightweight, periodic tasks that are safe to miss if the server restarts.

Cron SchedulerBackground Jobs
TriggerTime-based (cron expression)Event-based (enqueued from code)
PersistenceNone, in-memory onlyPostgreSQL-backed
RetriesNone built-inConfigurable (exponential, fixed, none)
Survives restartsNo. Schedule restarts with the processYes. Pending jobs persist in the database
Use casePeriodic maintenance, polling, cache refreshDurable, transactional deferred work: emails, uploads, reports
InfrastructureNo extra dependenciesRequires PostgreSQL

Prerequisites

You need the database feature with PostgreSQL. The jobs migration uses PostgreSQL-specific features (gen_random_uuid(), partial indexes) and does not support MySQL or SQLite.

[dependencies]
rapina = { version = "0.11.0", features = ["postgres"] }

You also need a database connection configured in your app — see the Database page.

Setup

Run the CLI command from your project root:

rapina jobs init

This adds the framework's create_rapina_jobs migration to your src/migrations/mod.rs. If the file doesn't exist yet, it creates one. If the migration is already configured, it skips silently.

The result looks like this:

use rapina::jobs::create_rapina_jobs;

mod m20260315_000001_create_users;

rapina::migrations! {
    create_rapina_jobs,
    m20260315_000001_create_users,
}

The framework migration uses a zero timestamp (m00000000_000000_) so it always sorts before your application migrations, regardless of their dates.

Next time your app starts and runs migrations, the rapina_jobs table will be created.

Defining a Job

Use the #[job] macro to define a handler. The first argument is always the payload — a struct that implements Serialize + DeserializeOwned. Remaining arguments are dependency-injected from AppState via State<T> or Db.

use rapina::prelude::*;

#[derive(Serialize, Deserialize)]
pub struct WelcomeEmailPayload {
    pub email: String,
}

#[job(queue = "emails", max_retries = 5)]
async fn send_welcome_email(
    payload: WelcomeEmailPayload,
    mailer: State<Mailer>,
) -> JobResult {
    mailer.send(&payload.email).await?;
    Ok(())
}

The macro generates a send_welcome_email(payload) -> JobRequest helper used for enqueuing.

AttributeDefaultDescription
queue"default"Queue to place the job in
max_retries3Total execution count before permanent failure (includes the initial run)
retry_policy"exponential"Retry strategy: "exponential", "fixed", or "none"
retry_delay_secs1.0Base delay in seconds — used as the backoff base for "exponential" and the fixed interval for "fixed"

Enqueuing Jobs

Use the Jobs extractor in HTTP handlers to dispatch jobs.

#[post("/users")]
async fn create_user(body: Json<CreateUserRequest>, jobs: Jobs) -> Result<StatusCode> {
    jobs.enqueue(send_welcome_email(WelcomeEmailPayload {
        email: body.email.clone(),
    })).await?;

    Ok(StatusCode::CREATED)
}

For transactional enqueue — where the job row commits atomically with your business logic — use enqueue_with:

let txn = db.conn().begin().await?;
let user = User::insert(&txn, &body).await?;
jobs.enqueue_with(&txn, send_welcome_email(WelcomeEmailPayload {
    email: user.email.clone(),
})).await?;
txn.commit().await?;

If the transaction rolls back, the job is never created.

Starting the Worker

Call .jobs() on the application builder before .listen(). The worker spawns in-process alongside the HTTP server and shuts down gracefully on SIGINT/SIGTERM — it finishes its current batch before stopping.

use rapina::jobs::JobConfig;

Rapina::new()
    .with_database(db_config).await?
    .jobs(JobConfig::default())
    .listen("127.0.0.1:3000")
    .await

All options have sensible defaults. Override only what you need:

JobConfig::default()
    .poll_interval(Duration::from_secs(2))
    .batch_size(20)
    .queues(["default", "emails", "heavy"])
    .job_timeout(Duration::from_secs(60))
OptionDefaultDescription
poll_interval5sHow often the worker wakes up to claim jobs
batch_size10Maximum jobs claimed per poll cycle
queues["default"]Queue names to subscribe to
job_timeout30sHow long a job lock is held — expired locks can be reclaimed after a worker crash

Job Lifecycle

pending → running → completed
                  ↘ failed   (or back to pending if retries remain)

The worker atomically transitions each job from pending to running in a single SQL statement. On completion the job moves to completed or failed.

Failed jobs are retried according to the retry_policy set on the handler.

Exponential backoff (default)

#[job(max_retries = 5, retry_policy = "exponential", retry_delay_secs = 1.0)]
async fn send_welcome_email(payload: EmailPayload) -> JobResult { ... }
AttemptDelay (base = 1s)
1immediate
21s + jitter
34s + jitter
416s + jitter

Jitter is seeded from the job's UUID so concurrent failures don't retry in lockstep. Delay is capped at one week.

Fixed delay

#[job(max_retries = 10, retry_policy = "fixed", retry_delay_secs = 30.0)]
async fn sync_inventory(payload: SyncPayload) -> JobResult { ... }

Every retry waits the same retry_delay_secs. The first retry is always immediate regardless of the configured delay.

No retries

#[job(max_retries = 1, retry_policy = "none")]
async fn charge_card(payload: ChargePayload) -> JobResult { ... }

The job is permanently marked failed on the first error. Use this for operations that must not be duplicated.

DI Limitations

Job handlers run outside the request cycle. Only State<T> and Db work — they source data from AppState directly. Request-bound extractors (Context, Headers, Path, Query, CurrentUser) will fail at runtime and must not be used in job handlers.

Trace Propagation

When a job is enqueued from an HTTP handler, the request's trace_id is stored on the job row. The worker restores it into its tracing span before calling the handler, so all log lines emitted during job execution are correlated with the original HTTP request.

Table Schema

The migration creates a rapina_jobs table with the following columns:

ColumnTypeDefaultDescription
idUUIDgen_random_uuid()Primary key
queueVARCHAR(255)'default'Logical queue name
job_typeVARCHAR(255)Fully-qualified type name for dispatch
payloadJSONB'{}'Arbitrary data passed to the handler
statusVARCHAR(32)'pending'Lifecycle state
attemptsINTEGER0Number of times this job has been attempted
max_retriesINTEGER3Maximum retry count before permanent failure
run_atTIMESTAMPTZnow()Earliest time to execute
started_atTIMESTAMPTZNULLWhen a worker started processing
locked_untilTIMESTAMPTZNULLLease expiry for crash recovery
finished_atTIMESTAMPTZNULLWhen the job completed or permanently failed
last_errorTEXTNULLError from the most recent failed attempt
trace_idVARCHAR(64)NULLDistributed trace ID from the enqueuing request
created_atTIMESTAMPTZnow()Insertion timestamp

A partial index on (queue, run_at) WHERE status = 'pending' optimizes the worker's claim query.

Types

JobStatus

The JobStatus enum represents the lifecycle of a job:

use rapina::prelude::*;

// Available when the `database` feature is enabled
let status = JobStatus::Pending;
println!("{status}"); // "pending"

let parsed: JobStatus = "running".parse().unwrap();
VariantMeaning
PendingQueued and waiting for a worker
RunningClaimed by a worker, currently executing
CompletedFinished successfully
FailedExhausted all retries or hit a fatal error

JobStatus implements Display, FromStr, Serialize, Deserialize, Hash, Copy, and Eq. The string representation is always lowercase.

JobRow

JobRow is a plain struct that maps directly to a row in the rapina_jobs table. It derives SeaORM's FromQueryResult so you can use it with raw queries:

use rapina::jobs::JobRow;
use rapina::sea_orm::{FromQueryResult, Statement, DatabaseBackend};
use rapina::database::Db;

let rows: Vec<JobRow> = JobRow::find_by_statement(
    Statement::from_string(
        DatabaseBackend::Postgres,
        "SELECT * FROM rapina_jobs WHERE queue = 'emails' AND status = 'failed'"
    )
)
.all(db.conn())
.await
.map_err(DbError::from)?;

for row in &rows {
    let status = row.parse_status().unwrap();
    println!("{}: {} (attempts: {})", row.id, status, row.attempts);
}

The status field is a String because SeaORM's FromQueryResult derive doesn't support custom enum deserialization. Use parse_status() to get a typed JobStatus.

Manual Setup

If you prefer not to use the CLI, add the migration reference manually:

// src/migrations/mod.rs
use rapina::jobs::create_rapina_jobs;

mod m20260315_000001_create_users;

rapina::migrations! {
    create_rapina_jobs,
    m20260315_000001_create_users,
}

The create_rapina_jobs module is exported from the rapina crate, so there's no file to create in your project — just the use import and the macro entry.