Relay#
Relay lets you dispatch jobs and await their results synchronously—even when jobs execute on different servers. Because jobs are persisted in the database, relay provides the benefits of distributed task execution with the durability of background jobs.
Think of relay as persistent, distributed async/await.
Basic Usage#
Every worker automatically gains a relay() method when Oban Pro is installed. Use it to
dispatch a job and await the result:
from oban import worker
@worker(queue="default")
class CalculatorWorker:
async def process(self, job):
return {"result": job.args["a"] + job.args["b"]}
result = await CalculatorWorker.relay({"a": 10, "b": 32}, timeout=5.0)
print(result) # {"result": 42}
The relay() method accepts the same arguments as new() and enqueue(), plus additional
options for controlling the wait behavior.
Options#
timeout — Maximum time to wait for the job to complete, in seconds. Defaults to
30.0. If the job doesn’t complete within this time, aRelayTimeoutexception is raised.poll_interval — Backup polling interval in seconds. Defaults to
0.5. Results are delivered immediately via PubSub notifications; polling only occurs if a notification is missed.with_retries — If
True, wait through job retries until the final outcome. IfFalse(the default), return immediately when the job enters a retryable state.
# With custom timeout and queue
result = await MyWorker.relay({"data": "value"}, timeout=60.0, queue="priority")
# Wait through retries
result = await MyWorker.relay({"data": "value"}, timeout=120.0, with_retries=True)
Note
Relay requires a running Oban instance to process jobs. If no workers are available for the job’s queue, the relay will eventually timeout.
Exception Handling#
When jobs timeout, fail, are discarded, or cancelled, relay() will raise an appropriate
exception:
from oban_pro import RelayCancelled, RelayDiscard, RelayTimeout
try:
result = await MyWorker.relay({"id": 1}, timeout=10.0)
except RelayTimeout:
print("Job timed out")
except RelayDiscard as exc:
print(f"Job failed: {exc.reason}")
except RelayCancelled as exc:
print(f"Job cancelled: {exc.reason}")
Exception Types#
RelayTimeout — Raised when the job doesn’t complete within the specified timeout. The job may still be running or waiting to execute.
RelayDiscard — Raised when the job is discarded after exhausting all retry attempts, or when
with_retries=Falseand the job enters a retryable state. Thereasonattribute contains the error message from the last attempt.RelayCancelled — Raised when the job returns a
Cancelresult. Thereasonattribute contains the cancellation message.
Concurrent Relays#
Multiple relay calls can run concurrently using asyncio.gather():
import asyncio
results = await asyncio.gather(
ProcessorWorker.relay({"id": 1}, timeout=10.0),
ProcessorWorker.relay({"id": 2}, timeout=10.0),
ProcessorWorker.relay({"id": 3}, timeout=10.0),
)
# results == [result1, result2, result3]
Each relay operates independently with its own timeout and notification handling.
Concurrent relays are excellent for service coordination where you dispatch work to specialized workers and collect results. For example, an order processing system that needs to validate inventory, check fraud, and calculate shipping in parallel:
async def process_order(order):
inventory, fraud, shipping = await asyncio.gather(
InventoryWorker.relay({"sku": order["sku"]}, timeout=5.0),
FraudCheckWorker.relay({"user_id": order["user_id"]}, timeout=10.0),
ShippingWorker.relay({"address": order["address"]}, timeout=5.0),
)
if not inventory["available"]:
raise OutOfStockError()
if fraud["risk_score"] > 0.8:
raise FraudDetectedError()
return {"total": order["subtotal"] + shipping["cost"]}