Source code for oban.job

"""Job class representing a unit of work to be processed.

Jobs hold all the information needed to execute a task: the worker to run, arguments
to pass, scheduling options, and metadata for tracking execution state.
"""

from __future__ import annotations

import asyncio
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from enum import StrEnum
from typing import Any, TypeVar

import orjson

from ._extensions import use_ext
from ._recorded import encode_recorded


class JobState(StrEnum):
    """Lifecycle state of a job.

    - AVAILABLE: Ready to be executed
    - CANCELLED: Explicitly cancelled
    - COMPLETED: Successfully finished
    - DISCARDED: Exceeded max attempts
    - EXECUTING: Currently running
    - RETRYABLE: Failed but will be retried
    - SCHEDULED: Scheduled to run in the future
    - SUSPENDED: Not currently runnable
    """

    AVAILABLE = "available"
    CANCELLED = "cancelled"
    COMPLETED = "completed"
    DISCARDED = "discarded"
    EXECUTING = "executing"
    RETRYABLE = "retryable"
    SCHEDULED = "scheduled"
    SUSPENDED = "suspended"


T = TypeVar("T")


@dataclass(frozen=True, slots=True)
class Snooze:
    """Reschedule a job to run again after a delay.

    Return this from a worker's process method to put the job back in the queue
    with a delayed scheduled_at time.

    Example:
        >>> async def process(self, job):
        ...     if not ready_to_process():
        ...         return Snooze(60)  # Try again in 60 seconds
    """

    seconds: int


@dataclass(frozen=True, slots=True)
class Cancel:
    """Cancel a job and stop processing.

    Return this from a worker's process method to mark the job as cancelled.
    The reason is stored in the job's errors list.

    Example:
        >>> async def process(self, job):
        ...     if job.cancelled():
        ...         return Cancel("Job was cancelled by user")
    """

    reason: str


RECORDED_LIMIT = 64_000_000  # 64MB default limit


@dataclass(slots=True)
class Record:
    """Record a value to be stored with the completed job.

    Return this from a worker's process method to store a value in the job's
    meta field. The value is encoded using Erlang term format for compatibility
    with Oban Pro.

    Args:
        value: The value to record. Must be serializable by erlpack.
        limit: Maximum size in bytes for the encoded value. Defaults to 64MB.

    Raises:
        ValueError: If the encoded value exceeds the size limit.

    Example:
        >>> async def process(self, job):
        ...     result = await compute_something()
        ...     return Record(result)

        >>> # With custom size limit
        >>> return Record(large_result, limit=32_000_000)
    """

    value: Any
    limit: int = RECORDED_LIMIT
    encoded: str = field(init=False, repr=False)

    def __post_init__(self):
        encoded = encode_recorded(self.value)

        if len(encoded) > self.limit:
            raise ValueError(
                f"recorded value is {len(encoded)} bytes, exceeds limit of {self.limit}"
            )

        setattr(self, "encoded", encoded)


type Result[T] = Cancel | Snooze | Record | T | None


TIMESTAMP_FIELDS = [
    "inserted_at",
    "attempted_at",
    "cancelled_at",
    "completed_at",
    "discarded_at",
    "scheduled_at",
]


[docs] class Job: """A unit of work to be processed by a worker. Jobs can be created directly via `Job(worker="...")` with validation, or loaded from the database via `Job.from_row()` without validation. """ __slots__ = ( "worker", "id", "state", "queue", "attempt", "max_attempts", "priority", "args", "meta", "errors", "tags", "attempted_by", "inserted_at", "attempted_at", "cancelled_at", "completed_at", "discarded_at", "scheduled_at", "extra", "_cancellation", )
[docs] def __init__( self, worker: str, *, id: int | None = None, state: JobState = JobState.AVAILABLE, queue: str = "default", attempt: int = 0, max_attempts: int = 20, priority: int = 0, args: dict[str, Any] | None = None, meta: dict[str, Any] | None = None, errors: list[str] | None = None, tags: list[str] | None = None, attempted_by: list[str] | None = None, inserted_at: datetime | None = None, attempted_at: datetime | None = None, cancelled_at: datetime | None = None, completed_at: datetime | None = None, discarded_at: datetime | None = None, scheduled_at: datetime | None = None, schedule_in: timedelta | int | float | None = None, extra: dict[str, Any] | None = None, _validate: bool = True, ) -> None: """Create a new job with validation and normalization. In most cases, you should use the `@worker` or `@job` decorators instead, which provide a more convenient API via `Worker.new()` and `Worker.enqueue()`. Args: worker: Required. Fully qualified worker class path id: Job ID (assigned by database) state: Job state (default: AVAILABLE) queue: Queue name (default: "default") attempt: Current attempt number (default: 0) max_attempts: Maximum retry attempts (default: 20) priority: Priority 0-9 (default: 0) args: Job arguments (default: {}) meta: Arbitrary metadata dictionary (default: {}) errors: List of error messages from failed attempts tags: List of tags for grouping attempted_by: List of node names that attempted this job inserted_at: When the job was inserted attempted_at: When the job was last attempted cancelled_at: When the job was cancelled completed_at: When the job completed discarded_at: When the job was discarded scheduled_at: When to run the job schedule_in: Alternative to scheduled_at. Timedelta or seconds from now extra: Extra data for runtime use (not persisted) _validate: Whether to validate and normalize (default: True) Example: Manual job creation (not recommended for typical use): >>> job = Job( ... worker="myapp.workers.EmailWorker", ... args={"to": "user@example.com"}, ... queue="mailers", ... schedule_in=60 # Run in 60 seconds ... ) Preferred approach using decorators: >>> from oban import worker >>> >>> @worker(queue="mailers") ... class EmailWorker: ... async def process(self, job): ... pass >>> >>> job = EmailWorker.new({"to": "user@example.com"}, schedule_in=60) """ self.worker = worker self.id = id self.state = state self.queue = queue self.attempt = attempt self.max_attempts = max_attempts self.priority = priority self.args = args if args is not None else {} self.meta = meta if meta is not None else {} self.errors = errors if errors is not None else [] self.tags = tags if tags is not None else [] self.attempted_by = attempted_by if attempted_by is not None else [] self.extra = extra if extra is not None else {} self._cancellation: asyncio.Event | None = None if schedule_in is not None: if isinstance(schedule_in, (int, float)): schedule_in = timedelta(seconds=schedule_in) scheduled_at = datetime.now(timezone.utc) + schedule_in self.inserted_at = inserted_at self.attempted_at = attempted_at self.cancelled_at = cancelled_at self.completed_at = completed_at self.discarded_at = discarded_at self.scheduled_at = scheduled_at # Timestamps from database are naive, ensure they're timezone-aware for key in TIMESTAMP_FIELDS: value = getattr(self, key) if value is not None and value.tzinfo is None: setattr(self, key, value.replace(tzinfo=timezone.utc)) if _validate: self._normalize_tags() self._do_validate() use_ext("job.after_new", lambda _job: None, self)
def __str__(self) -> str: worker_parts = self.worker.split(".") worker_name = worker_parts[-1] if worker_parts else self.worker parts = [ f"id={self.id}", f"worker={worker_name}", f"args={orjson.dumps(self.args)}", f"queue={self.queue}", f"state={self.state}", ] return f"Job({', '.join(parts)})"
[docs] def update(self, changes: dict[str, Any]) -> Job: """Update this job in place with the given changes. Applies validation and normalization after updating. Args: changes: Dictionary of field changes. Supports: - args: Job arguments - max_attempts: Maximum retry attempts - meta: Arbitrary metadata dictionary - priority: Priority 0-9 - queue: Queue name - scheduled_at: When to run the job - schedule_in: Alternative to scheduled_at. Timedelta or seconds from now - tags: List of tags for filtering/grouping - worker: Fully qualified worker class path Returns: This job instance (for method chaining) Example: >>> job.update({"priority": 0, "tags": ["urgent"]}) """ # Handle schedule_in -> scheduled_at conversion if "schedule_in" in changes: schedule_in = changes.pop("schedule_in") if isinstance(schedule_in, (int, float)): schedule_in = timedelta(seconds=schedule_in) changes["scheduled_at"] = datetime.now(timezone.utc) + schedule_in for key, value in changes.items(): setattr(self, key, value) self._normalize_tags() self._do_validate() return self
[docs] def cancelled(self) -> bool: """Check if cancellation has been requested for this job. Workers can call this method at safe points during execution to check if the job should stop processing and return early. Returns: True if cancellation has been requested, False otherwise Example: >>> async def process(self, job): ... for item in large_dataset: ... if job.cancelled(): ... return Cancel("Job was cancelled") ... await process_item(item) """ if self._cancellation is None: return False return self._cancellation.is_set()
def _normalize_tags(self) -> None: self.tags = sorted( {str(tag).strip().lower() for tag in self.tags if tag and str(tag).strip()} ) def _do_validate(self) -> None: if not self.queue.strip(): raise ValueError("queue must not be blank") if not self.worker.strip(): raise ValueError("worker must not be blank") if self.max_attempts <= 0: raise ValueError("max_attempts must be greater than 0") if not (0 <= self.priority <= 9): raise ValueError("priority must be between 0 and 9")