Quickstart Guide#

This guide walks you through the essentials of defining workers, enqueueing jobs, and actually running Oban.

Defining Workers#

Oban provides two ways to define workers: function-based for simple tasks, and class-based when you need more control.

Function Workers#

For simple tasks, use the @job decorator to turn any function into a worker:

from oban import job

@job(queue="default")
def send_email(email, subject, body):
    print("Sending an email...")

Class Workers#

Use the @worker decorator when you need access to job details (e.g. attempts, tags, meta) or want to customize retry backoff logic:

from oban import worker

@worker(queue="exports", priority=2)
class ExportWorker:
    async def process(self, job):
        file_path = job.args["path"]
        print("Generating an export...")

Periodic Jobs (Cron)#

Periodic jobs run on a predictable schedule. Unlike one-time scheduled jobs, periodic jobs repeat automatically without requiring you to insert new jobs after each execution.

Define workers that run on a schedule using the cron parameter:

from oban import worker

@worker(queue="maintenance", cron="@daily")
class DailyCleanup:
    async def process(self, job):
        print("Running daily cleanup...")

@worker(queue="reports", cron="0 9 * * MON-FRI")
class BusinessHoursReport:
    async def process(self, job):
        print("Generating report during business hours...")

Enqueueing Jobs#

Once you’ve defined workers, you can enqueue jobs to run them. Jobs are stored in PostgreSQL and executed later when available.

For function workers, pass the same parameters as you would for the original function:

await send_email.enqueue("[email protected]", "Welcome", "Thanks for signing up!")

For class-based workers, enqueue using a single args dictionary:

await ExportWorker.enqueue({"path": "/data/export.csv"})

Batch Enqueueing#

When you need to enqueue many jobs at once, use enqueue_many() for better performance. It inserts all jobs in a single database query:

oban.enqueue_many(
    send_email.new("[email protected]", "Hello", "Message 1"),
    send_email.new("[email protected]", "Hello", "Message 2"),
)

Scheduled Jobs#

Schedule jobs to run at a specific time in the future. This is useful for reminders, follow-ups, or any task that shouldn’t run immediately:

from datetime import timedelta

await send_email.enqueue(
    "[email protected]",
    "Reminder",
    "Don't forget!",
    schedule_in=timedelta(hours=6)
)

Setting up and Running Oban#

Before enqueueing or processing jobs, you need to get an Oban instance running.

That’s either done “standalone” (separate worker process) or “embedded” (workers run alongside your web app). For either option, you need some configuration.

Configuration#

Start by creating an oban.toml file in your project root:

dsn = "postgresql://user:password@localhost/mydb"

[queues]
default = 10
mailers = 5
exports = 2

[pruner]
max_age = 3_600  # Prune completed jobs older than 1 hour

The queue values specify maximum concurrent jobs per queue. Configuration is loaded automatically from oban.toml, environment variables, and can be overridden programmatically as needed.

Running in Standalone Mode#

This approach separates your web app (which only enqueues jobs) from the worker process (which executes them). It’s ideal for scaling workers independently and keeping job execution out of the web process:

In your web app:

from oban import Oban

pool = await Oban.create_pool()
oban = Oban(pool=pool)  # No queues, client-only

@app.route("/send-email")
async def send_email_route():
    await send_email.enqueue("[email protected]", "Hi", "Hello!")
    return "Email queued"

Start the worker process with the CLI:

oban start

Running in Embedded Mode#

This approach runs workers directly in your application process. It’s simpler for development and works well for lightweight apps where job processing doesn’t need independent scaling.

from contextlib import asynccontextmanager
from fastapi import FastAPI
from oban import Oban

@asynccontextmanager
async def lifespan(app: FastAPI):
    pool = await Oban.create_pool()
    oban = Oban(pool=pool, queues={"default": 10})

    async with oban:
        yield

app = FastAPI(lifespan=lifespan)

Keep exploring to learn about advanced features like retry backoff and advanced scheduling.