# Relay Relay lets you dispatch jobs and await their results synchronously—even when jobs execute on different servers. Because jobs are persisted in the database, relay provides the benefits of distributed task execution with the durability of background jobs. Think of relay as persistent, distributed async/await. ## Basic Usage Every worker automatically gains a `relay()` method when Oban Pro is installed. Use it to dispatch a job and await the result: ```python from oban import worker @worker(queue="default") class CalculatorWorker: async def process(self, job): return {"result": job.args["a"] + job.args["b"]} result = await CalculatorWorker.relay({"a": 10, "b": 32}, timeout=5.0) print(result) # {"result": 42} ``` The `relay()` method accepts the same arguments as `new()` and `enqueue()`, plus additional options for controlling the wait behavior. ## Options * **timeout** — Maximum time to wait for the job to complete, in seconds. Defaults to `30.0`. If the job doesn't complete within this time, a `RelayTimeout` exception is raised. * **poll_interval** — Backup polling interval in seconds. Defaults to `0.5`. Results are delivered immediately via PubSub notifications; polling only occurs if a notification is missed. * **with_retries** — If `True`, wait through job retries until the final outcome. If `False` (the default), return immediately when the job enters a retryable state. ```python # With custom timeout and queue result = await MyWorker.relay({"data": "value"}, timeout=60.0, queue="priority") # Wait through retries result = await MyWorker.relay({"data": "value"}, timeout=120.0, with_retries=True) ``` ```{note} Relay requires a running Oban instance to process jobs. If no workers are available for the job's queue, the relay will eventually timeout. ``` ## Exception Handling When jobs timeout, fail, are discarded, or cancelled, `relay()` will raise an appropriate exception: ```python from oban_pro import RelayCancelled, RelayDiscard, RelayTimeout try: result = await MyWorker.relay({"id": 1}, timeout=10.0) except RelayTimeout: print("Job timed out") except RelayDiscard as exc: print(f"Job failed: {exc.reason}") except RelayCancelled as exc: print(f"Job cancelled: {exc.reason}") ``` ### Exception Types * **RelayTimeout** — Raised when the job doesn't complete within the specified timeout. The job may still be running or waiting to execute. * **RelayDiscard** — Raised when the job is discarded after exhausting all retry attempts, or when `with_retries=False` and the job enters a retryable state. The `reason` attribute contains the error message from the last attempt. * **RelayCancelled** — Raised when the job returns a `Cancel` result. The `reason` attribute contains the cancellation message. ## Concurrent Relays Multiple relay calls can run concurrently using `asyncio.gather()`: ```python import asyncio results = await asyncio.gather( ProcessorWorker.relay({"id": 1}, timeout=10.0), ProcessorWorker.relay({"id": 2}, timeout=10.0), ProcessorWorker.relay({"id": 3}, timeout=10.0), ) # results == [result1, result2, result3] ``` Each relay operates independently with its own timeout and notification handling. Concurrent relays are excellent for service coordination where you dispatch work to specialized workers and collect results. For example, an order processing system that needs to validate inventory, check fraud, and calculate shipping in parallel: ```python async def process_order(order): inventory, fraud, shipping = await asyncio.gather( InventoryWorker.relay({"sku": order["sku"]}, timeout=5.0), FraudCheckWorker.relay({"user_id": order["user_id"]}, timeout=10.0), ShippingWorker.relay({"address": order["address"]}, timeout=5.0), ) if not inventory["available"]: raise OutOfStockError() if fraud["risk_score"] > 0.8: raise FraudDetectedError() return {"total": order["subtotal"] + shipping["cost"]} ```