Enqueueing Jobs#

Oban has flexible options for enqueueing jobs, including scheduling and transactional insertion. This guide covers scheduling jobs for execution in the future and inserting jobs atomically with other database changes.

Schedule in Relative Time#

You can schedule jobs to run after a specific delay using the schedule_in parameter:

from datetime import timedelta

# Using timedelta
await MyWorker.enqueue({"id": 1}, schedule_in=timedelta(seconds=5))

# Or using seconds directly
await MyWorker.enqueue({"id": 1}, schedule_in=5)

This is useful for tasks that need to happen after a short delay, such as sending a follow-up email or retrying a failed operation.

Schedule at a Specific Time#

For tasks that need to run at a precise moment, you can schedule jobs at a specific timestamp using the scheduled_at parameter:

await MyWorker.enqueue({"id": 1}, scheduled_at=some_datetime)

This is particularly useful for time-sensitive operations like executing a maintenance task at off-hours, or preparing monthly reports.

Time Zone Considerations#

Scheduling in Oban is always done in UTC. If you’re working with timestamps in different time zones, you should convert them to UTC before scheduling:

from datetime import timezone

utc_time = local_datetime.astimezone(timezone.utc)

await MyWorker.enqueue({"id": 1}, scheduled_at=utc_time)

This ensures consistent behavior across different server locations and prevents daylight saving time issues.

How Scheduling Works#

Behind the scenes, Oban stores your job in the database with the specified scheduled time. The job remains in a scheduled state until that time arrives, at which point it becomes available for execution by the appropriate worker.

Scheduled jobs don’t consume worker resources until they’re ready to execute, allowing you to queue thousands of future jobs without impacting current performance.

Transactional Insertion#

Oban supports inserting jobs within a database transaction, ensuring that insertion is atomic with other database changes. If the transaction rolls back, the job is never inserted.

This is useful when you need to guarantee consistency between your application data and background jobs. For example, when creating a user record and enqueueing a welcome email together.

With SQLAlchemy#

Pass your SQLAlchemy session directly to enqueue():

async with session.begin():
    user = User(email="user@example.com", status="active")
    session.add(user)

    await WelcomeEmailWorker.enqueue({"email": user.email}, conn=session)

For multiple jobs, use enqueue_many():

async with session.begin():
    order = Order(user_id=user.id, total=100)
    session.add(order)

    await oban.enqueue_many(
        SendReceiptWorker.new({"user_id": user.id}),
        UpdateInventoryWorker.new({"user_id": user.id}),
        conn=session,
    )

With Psycopg#

You can also pass a psycopg connection directly:

async with oban._connection() as conn:
    async with conn.transaction():
        await conn.execute("INSERT INTO users ...")
        await oban.enqueue(MyWorker.new({"user_id": user_id}), conn=conn)

Supported Connection Types#

The conn parameter accepts serveral connection types, all of which resolve to a psycopg connection:

  • SQLAlchemy AsyncSession

  • SQLAlchemy AsyncConnection

  • psycopg AsyncConnection

  • Any object with an execute() method