Relay#

Relay lets you dispatch jobs and await their results synchronously—even when jobs execute on different servers. Because jobs are persisted in the database, relay provides the benefits of distributed task execution with the durability of background jobs.

Think of relay as persistent, distributed async/await.

Basic Usage#

Every worker automatically gains a relay() method when Oban Pro is installed. Use it to dispatch a job and await the result:

from oban import worker

@worker(queue="default")
class CalculatorWorker:
    async def process(self, job):
        return {"result": job.args["a"] + job.args["b"]}

result = await CalculatorWorker.relay({"a": 10, "b": 32}, timeout=5.0)

print(result)  # {"result": 42}

The relay() method accepts the same arguments as new() and enqueue(), plus additional options for controlling the wait behavior.

Options#

  • timeout — Maximum time to wait for the job to complete, in seconds. Defaults to 30.0. If the job doesn’t complete within this time, a RelayTimeout exception is raised.

  • poll_interval — Backup polling interval in seconds. Defaults to 0.5. Results are delivered immediately via PubSub notifications; polling only occurs if a notification is missed.

  • with_retries — If True, wait through job retries until the final outcome. If False (the default), return immediately when the job enters a retryable state.

# With custom timeout and queue
result = await MyWorker.relay({"data": "value"}, timeout=60.0, queue="priority")

# Wait through retries
result = await MyWorker.relay({"data": "value"}, timeout=120.0, with_retries=True)

Note

Relay requires a running Oban instance to process jobs. If no workers are available for the job’s queue, the relay will eventually timeout.

Exception Handling#

When jobs timeout, fail, are discarded, or cancelled, relay() will raise an appropriate exception:

from oban_pro import RelayCancelled, RelayDiscard, RelayTimeout

try:
    result = await MyWorker.relay({"id": 1}, timeout=10.0)
except RelayTimeout:
    print("Job timed out")
except RelayDiscard as exc:
    print(f"Job failed: {exc.reason}")
except RelayCancelled as exc:
    print(f"Job cancelled: {exc.reason}")

Exception Types#

  • RelayTimeout — Raised when the job doesn’t complete within the specified timeout. The job may still be running or waiting to execute.

  • RelayDiscard — Raised when the job is discarded after exhausting all retry attempts, or when with_retries=False and the job enters a retryable state. The reason attribute contains the error message from the last attempt.

  • RelayCancelled — Raised when the job returns a Cancel result. The reason attribute contains the cancellation message.

Concurrent Relays#

Multiple relay calls can run concurrently using asyncio.gather():

import asyncio

results = await asyncio.gather(
    ProcessorWorker.relay({"id": 1}, timeout=10.0),
    ProcessorWorker.relay({"id": 2}, timeout=10.0),
    ProcessorWorker.relay({"id": 3}, timeout=10.0),
)
# results == [result1, result2, result3]

Each relay operates independently with its own timeout and notification handling.

Concurrent relays are excellent for service coordination where you dispatch work to specialized workers and collect results. For example, an order processing system that needs to validate inventory, check fraud, and calculate shipping in parallel:

async def process_order(order):
    inventory, fraud, shipping = await asyncio.gather(
        InventoryWorker.relay({"sku": order["sku"]}, timeout=5.0),
        FraudCheckWorker.relay({"user_id": order["user_id"]}, timeout=10.0),
        ShippingWorker.relay({"address": order["address"]}, timeout=5.0),
    )

    if not inventory["available"]:
        raise OutOfStockError()

    if fraud["risk_score"] > 0.8:
        raise FraudDetectedError()

    return {"total": order["subtotal"] + shipping["cost"]}