Oban.Met (Oban Met v0.1.11)

Metric introspection for Oban.

Oban.Met supervises a collection of autonomous modules for in-memory, distributed time-series data with zero-configuration. Oban.Web relies on Met for queue gossip, detailed job counts, and historic metrics.

Highlights

  • Telemetry powered execution tracking for time-series data that is replicated between nodes, filterable by label, arbitrarily mergeable over windows of time, and compacted for longer playback.

  • Centralized counting across queues and states with exponential backoff to minimize load and data replication between nodes.

  • Ephemeral data storage via data replication with handoff between nodes. All nodes have a shared view of the cluster's data and new nodes are caught up when they come online.

Summary

Functions

Retrieve stored producer checks.

Get a normalized, unified crontab from all connected nodes.

Get all stored, unique values for a particular label.

Get the latest values for a gauge series, optionally subdivided by a label.

Get all stored values for a series without any filtering.

List all recorded series along with their labels and value type.

Start a Met supervisor for an Oban instance.

Summarize a series of data with an aggregate over a configurable window of time.

Types

@type counts() :: %{optional(String.t()) => non_neg_integer()}
Link to this type

filter_value()

@type filter_value() :: label() | [label()]
@type label() :: String.t()
Link to this type

latest_opts()

@type latest_opts() :: [
  filters: keyword(filter_value()),
  group: nil | label(),
  lookback: pos_integer()
]
@type operation() :: :max | :sum | {:pct, float()}
@type series() :: atom() | String.t()
Link to this type

series_detail()

@type series_detail() :: %{series: series(), labels: [label()], value: module()}
Link to this type

sub_counts()

@type sub_counts() :: %{optional(String.t()) => non_neg_integer() | counts()}
Link to this type

timeslice_opts()

@type timeslice_opts() :: [
  by: pos_integer(),
  filters: keyword(filter_value()),
  group: nil | label(),
  label: nil | label(),
  lookback: pos_integer(),
  operation: operation(),
  since: pos_integer()
]
@type ts() :: integer()
@type value() :: Oban.Met.Value.t()

Functions

Link to this function

checks(oban \\ Oban)

@spec checks(Oban.name()) :: [map()]

Retrieve stored producer checks.

This mimics the output of the legacy Oban.Web.Plugins.Stats.all_gossip/1 function.

Checks are queried approximately every second and broadcast to all connected nodes, so each node is a replica of checks from the entire cluster. Checks are stored for 30 seconds before being purged.

Output

Checks are the result of Oban.check_queue/1, and the exact contents depends on which Oban.Engine is in use. A Basic engine check will look similar to this:

%{
  uuid: "2dde4c0f-53b8-4f59-9a16-a9487454292d",
  limit: 10,
  node: "me@local",
  paused: false,
  queue: "default",
  running: [100, 102],
  started_at: ~D[2020-10-07 15:31:00],
  updated_at: ~D[2020-10-07 15:31:00]
}

Examples

Get all current checks:

Oban.Met.checks()

Get current checks for a non-standard Oban isntance:

Oban.Met.checks(MyOban)
Link to this function

crontab(oban \\ Oban)

@spec crontab(Oban.name()) :: [{binary(), binary(), map()}]

Get a normalized, unified crontab from all connected nodes.

Examples

Get a merged crontab:

Oban.Met.crontab()
[
  {"* * * * *", "Worker.A", []},
  {"* * * * *", "Worker.B", [["args", %{"mode" => "foo"}]]}
]

Get the crontab for a non-standard Oban instance:

Oban.Met.crontab(MyOban)
Link to this function

labels(oban \\ Oban, label, opts \\ [])

@spec labels(Oban.name(), label(), keyword()) :: [label()]

Get all stored, unique values for a particular label.

Examples

Get all known queues:

Oban.Met.labels("queue")
~w(alpha gamma delta)

Get all known workers:

Oban.Met.labels("worker")
~w(MyApp.Worker MyApp.OtherWorker)
Link to this function

latest(oban \\ Oban, series, opts \\ [])

Get the latest values for a gauge series, optionally subdivided by a label.

Unlike queues and workers, states are static and constant, so they'll always show up in the counts or subdivision maps.

Gauge Series

Latest counts only apply to Gauge series. There are two gauges available (as reported by series/1:

  • :exec_count — jobs executing at that moment, including node, queue, state, and worker labels.

  • :full_count — jobs in the database, including queue, and state labels.

Examples

Get the :full_count value without any grouping:

Oban.Met.latest(:full_count)
%{"all" => 99}

Group the :full_count value by state:

Oban.Met.latest(:full_count, group: "state")
%{"available" => 9, "completed" => 80, "executing" => 5, ...

Group results by queue:

Oban.Met.latest(:exec_count, group: "queue")
%{"alpha" => 9, "gamma" => 3}

Group results by node:

Oban.Met.latest(:exec_count, group: "node")
%{"worker.1" => 6, "worker.2" => 5}

Filter values by node:

Oban.Met.latest(:exec_count, filters: [node: "worker.1"])
%{"all" => 6}

Filter values by queue and state:

Oban.Met.latest(:exec_count, filters: [node: "worker.1", "worker.2"])
Link to this function

lookup(oban \\ Oban, series)

@spec lookup(Oban.name(), series()) :: [term()]

Get all stored values for a series without any filtering.

Link to this function

series(oban \\ Oban)

@spec series(Oban.name()) :: [series_detail()]

List all recorded series along with their labels and value type.

Examples

Oban.Met.series()
[
  %{series: "exec_time", labels: ["state", "queue", "worker"], value: Sketch},
  %{series: "wait_time", labels: ["state", "queue", "worker"], value: Sketch},
  %{series: "exec_count", labels: ["state", "queue", "worker"], value: Gauge},
  %{series: "full_count", labels: ["state", "queue"], value: Gauge}
]
Link to this function

start_link(opts)

@spec start_link(Keyword.t()) :: Supervisor.on_start()

Start a Met supervisor for an Oban instance.

Oban.Met typically starts supervisors automatically when Oban instances initialize. However, starting a supervisor manually can be used if auto_start is disabled.

Options

These options are required; without them the supervisor won't start:

  • :conf — configuration for a running Oban instance, required

  • :name — an optional name for the supervisor, defaults to Oban.Met

Example

Start a supervisor for the default Oban instance:

Oban.Met.start_link(conf: Oban.config())

Start a supervisor with a custom name:

Oban.Met.start_link(conf: Oban.config(), name: MyApp.MetSup)
Link to this function

timeslice(oban \\ Oban, series, opts \\ [])

@spec timeslice(Oban.name(), series(), timeslice_opts()) :: [{ts(), value(), label()}]

Summarize a series of data with an aggregate over a configurable window of time.

Examples

Retreive a 3 second timeslice of the exec_time sketch:

Oban.Met.timeslice(Oban, :exec_time, lookback: 3)
[
  {2, 16771374649.128689, nil},
  {1, 24040058779.3428, nil},
  {0, 22191534459.516357, nil},
]

Group exec_time slices by the queue label:

Oban.Met.timeslice(Oban, :exec_time, group: "queue")
[
  {1, 9970235387.031698, "analysis"},
  {0, 11700429279.446463, "analysis"},
  {1, 23097311376.231316, "default"},
  {0, 23097311376.231316, "default"},
  {1, 1520977874.3348415, "events"},
  {0, 2558504265.2738624, "events"},
  ...