Source code for oban.worker

"""Worker protocol and utilities for Oban.

This module defines the Worker Protocol that classes decorated with @worker
implement, providing type hints for static analysis tools.
"""

import importlib
from typing import Any, Protocol

from .job import Job, Result


[docs] class Worker(Protocol): """Protocol for Oban workers. Classes decorated with @worker implement this protocol and gain `new()` and `enqueue()` classmethods for job creation and management, along with the `process()` method for job execution. Example: >>> from oban import worker >>> >>> @worker(queue="default") ... class EmailWorker: ... async def process(self, job): ... print(f"Sending email: {job.args}") ... >>> # Static analysis now understands these: >>> job = EmailWorker.new({"to": "user@example.com"}) >>> await EmailWorker.enqueue({"to": "admin@example.com"}) """ _opts: dict[str, Any] _oban_name: str
[docs] @classmethod def new(cls, args: dict[str, Any] | None = None, /, **params) -> Job: """Create a Job instance without enqueueing it. Args: args: Job arguments dictionary **params: Optional overrides for job fields (queue, priority, etc.) Returns: A Job instance ready to be enqueued """ ...
[docs] @classmethod async def enqueue( cls, args: dict[str, Any] | None = None, /, conn=None, **overrides ) -> Job: """Create and enqueue a Job. Args: args: Job arguments dictionary conn: Optional database connection for transactional insertion **overrides: Optional overrides for job fields Returns: The enqueued Job instance """ ...
[docs] async def process(self, job: Job) -> Result[Any]: """Process the job. This method must be implemented by the worker class. It is called by the Oban executor when a job is ready to be processed. Args: job: The Job instance to process Returns: The result of job processing (value, None, Cancel, or Snooze) """ ...
[docs] def backoff(self, job: Job) -> int: """Calculate the delay before the next retry attempt. This method is optional. If not implemented, Oban uses its default jittery clamped backoff strategy. Args: job: The failed Job instance Returns: Delay in seconds before the next retry """ ...
class WorkerResolutionError(Exception): """Raised when a worker class cannot be resolved from a path string. This error occurs when the worker resolution process fails due to: - Invalid path format - Module not found or import errors - Class not found in the module - Resolved attribute is not a class """ pass _registry: dict[str, type] = {} def worker_name(cls: type) -> str: """Generate the fully qualified name for a worker class.""" return f"{cls.__module__}.{cls.__qualname__}" def register_worker(cls) -> None: """Register a worker class for usage later""" key = worker_name(cls) _registry[key] = cls def resolve_worker(path: str) -> type: """Resolve a worker class by its path. Loads worker classes from the local registry, falling back to importing the module. Args: path: Fully qualified class path (e.g., "myapp.workers.EmailWorker") Returns: The resolved worker class Raises: WorkerResolutionError: If the worker cannot be resolved """ if path in _registry: return _registry[path] parts = path.split(".") mod_name, cls_name = ".".join(parts[:-1]), parts[-1] try: mod = importlib.import_module(mod_name) except ModuleNotFoundError as error: raise WorkerResolutionError( f"Module '{mod_name}' not found for worker '{path}'" ) from error except ImportError as error: raise WorkerResolutionError( f"Failed to import module '{mod_name}' for worker '{path}'" ) from error try: cls = getattr(mod, cls_name) except AttributeError as error: raise WorkerResolutionError( f"Class '{cls_name}' not found in module '{mod_name}'" ) from error if not isinstance(cls, type): raise WorkerResolutionError( f"'{path}' resolved to {type(cls).__name__}, expected a class" ) register_worker(cls) return cls