Oban.Pro.Workers.Chunk (Oban Pro v1.5.0-rc.4)
Chunk workers execute jobs together in groups based on a size or a timeout option. e.g. when 1000 jobs are available or after 10 minutes have ellapsed.
Multiple chunks can run in parallel within a single queue, and each chunk may be composed of many thousands of jobs. Aside from improved throughput for a single queue, chunks are ideal as the initial stage of data-ingestion and data-processing pipelines.
Usage
Let's define a worker that sends SMS messages in chunks, rather than individually:
defmodule MyApp.ChunkWorker do
use Oban.Pro.Workers.Chunk, queue: :messages, size: 100, timeout: 1000
@impl true
def process([_ | _] = jobs) do
jobs
|> Enum.map(& &1.args)
|> MyApp.Messaging.send_batch()
:ok
end
end
Notice that we declared a size
and a timeout
along with a queue
for the worker. If size
or timeout
are omitted they fall back to their defaults: 100 size
and 1000ms respectively.
To process larger batches less frequently, we can increase both values:
use Oban.Pro.Workers.Chunk, size: 500, timeout: :timer.seconds(5)
Now chunks will run with up to 500 jobs or every 5 seconds, whichever comes first.
Unlike other Pro workers, a Chunk
worker's process/1
receives a list of jobs rather than a
single job struct. Fittingly, the expected return values are different as well.
Chunk Partitioning
By default, chunks are divided into groups based on the queue
and worker
. This means that
each chunk consists of workers belonging to the same queue, regardless of their args
or
meta
. However, this approach may not always be suitable. For instance, you may want to group
workers based on a specific argument such as :account_id
instead of just the worker. In such
cases, you can use the :by
option to customize how chunks are partitioned.
Here are a few examples of the :by
option that you can use to achieve fine-grained control over
chunk partitioning:
# Explicitly chunk by :worker
use Oban.Pro.Workers.Chunk, by: :worker
# Chunk by a single args key without considering the worker
use Oban.Pro.Workers.Chunk, by: [args: :account_id]
# Chunk by multiple args keys without considering the worker
use Oban.Pro.Workers.Chunk, by: [args: [:account_id, :cohort]]
# Chunk by worker and a single args key
use Oban.Pro.Workers.Chunk, by: [:worker, args: :account_id]
# Chunk by a single meta key
use Oban.Pro.Workers.Chunk, by: [meta: :source_id]
When partitioning chunks of jobs, it's important to note that using only :args
or :meta
without :worker
may result in heterogeneous chunks of jobs from different workers.
Nevertheless, regardless of the partitioning method chunks always consist of jobs from the same
queue.
Here's a simple example of partitioning by worker
and an author_id
field to batch analysis
of recent messages per author:
defmodule MyApp.LLMAnalysis do
use Oban.Pro.Workers.Chunk, by: [:worker, args: :author_id], size: 50, timeout: 30_000
@impl true
def process([%{args: %{"author_id" => author_id}} | _] = jobs) do
messages =
jobs
|> Enum.map(& &1.args["message_id"])
|> MyApp.Messages.all()
{:ok, sentiment} = MyApp.GPT.gauge_sentiment(messages)
MyApp.Messages.record_sentiment(author_id)
end
end
Chunk Results and Error Handling
Chunk
worker's result type is tailored to handling multiple jobs at once. For reference, here
are the types and callback definition for process/1
:
The result types allow you to succeed an entire chunk, or selectively fail parts of it. Here are each of the possible results:
:ok
— The chunk succeeded and all jobs can be marked complete{:ok, result}
— Like:ok
, but with a result for testing.{:error, reason, jobs}
— One or more jobs in the chunk failed and may be retried, any unlisted jobs are a success.{:cancel, reason, jobs}
— One or more jobs in the chunk should be cancelled, any unlisted jobs are a success.[error: {reason, jobs}, cancel: {reason, jobs}]
— Retry some jobs and cancel some other jobs, leaving any jobs not in either list a success.
To illustrate using chunk results let's expand on the message processing example from earlier. We'll extend it to complete the whole batch when all messages are delivered or cancel undeliverable messages:
def process([_ | _] = jobs) do
notifications =
jobs
|> Enum.map(& &1.args)
|> MyApp.Messaging.send_batch()
bad_token = fn %{response: response} -> response == :bad_token end
if Enum.any?(notifications, bad_token) do
cancels =
notifications
|> Enum.zip(jobs)
|> Enum.filter(fn {notification, _job} -> bad_token.(notification) end)
|> Enum.map(&elem(&1, 1))
{:cancel, :bad_token, cancels}
else
{:ok, notifications}
end
end
In the event of an ephemeral crash, like a network issue, the entire batch may be retried if there are any remaining attempts.
Cancelling any jobs in a chunk will cancel the entire chunk, including the leader job.
Chunk Organization
Chunks are ran by a leader job (which has nothing to do with peer leadership). When the leader executes it determines whether a complete chunk is available or if enough time has elapsed to run anyhow. If neither case applies, then the leader will delay until the timeout elapsed and execute with as many jobs as it can find.
Completion queries run every 1000ms
by default. You can use the :sleep
option to control how
long the leader delays between queries to check for complete chunks:
use Oban.Pro.Workers.Chunk, size: 50, sleep: 2_000, timeout: 10_000
Optimizing Chunks
Queue's with high concurrency and low throughput may have multiple chunk leaders running simultaneously rather than getting bundled into a single chunk. The solution is to reduce the queue’s global concurrency to a smaller number so that new chunk leader jobs don’t start and the existing chunk can run a bigger batch.
queues: [
chunked_queue: [global_limit: 1]
]
A low global limit will reduce overall throughput. Partitioning the queue with the same options
as the chunk can preserve throughput by allowing one concurrent leader per chunk. For example,
if you were chunking by :user_id
in args, you'd partition the queue with the same option:
queues: [
chunked_queue: [local_limit: 20, global_limit: [allowed: 1, partition: [args: :user_id]]]
]
Summary
Types
chunk_by()
@type chunk_by() :: :worker | {:args, key_or_keys()} | {:meta, key_or_keys()} | [:worker | {:args, key_or_keys()} | {:meta, key_or_keys()}]
key_or_keys()
options()
@type options() :: [ by: chunk_by(), size: pos_integer(), sleep: pos_integer(), timeout: pos_integer() ]
result()
@type result() :: :ok | {:ok, term()} | {:cancel | :discard | :error, reason :: term(), [Oban.Job.t()]} | [cancel: sub_result(), discard: sub_result(), error: sub_result()]
sub_result()
@type sub_result() :: {reason :: term(), [Oban.Job.t()]}