changed CHANGELOG.md
 
@@ -61,8 +61,39 @@ In addition, thanks to the addition of `Process.set_label` in recent Elixir vers
61
61
name is set as the job's process label. That makes it possible to identify which job is running in
62
62
a `pid` via observer or live dashboard.
63
63
64
+ ## v2.18.3 — 2024-09-10
65
+
66
+ ### Enhancements
67
+
68
+ - [Basic] Use the shared concat operator when appending errors.
69
+
70
+ The standard `push` operation for updates is designed for arrays and uses `array_append`
71
+ internally. This replaces all use of `push` with a fragment that uses the `||` operator instead,
72
+ which works for both arrays and jsonb.
73
+
74
+ CockroachDB doesn't support arrays of jsonb, but they do support simple jsonb columns. Now
75
+ we can append to the errors column in either format for CRDB compatibility.
76
+
77
+ ### Bug Fixes
78
+
79
+ - [Queue] Link the dynamic queue supervisor and `Midwife` for automatic restarts.
80
+
81
+ When a producer crashes it brings the queue's supervisor down with it. With enough database
82
+ errors, the producer may crash repeatedly enough to exhaust restarts and bring down the
83
+ DynamicSupervisor in charge of all queues.
84
+
85
+ Now the supervisor is linked to the midwife to ensure that the midwife restarts as well, and it
86
+ restarts all of the queues.
87
+
88
+ - [Testing] Handle `insert_all/3` with streams for the `:inline` testing engine.
89
+
90
+ The inline engine's `insert_all_jobs` callback incorrectly expected changesets to always be a
91
+ list rather and couldn't handle streams.
92
+
64
93
## v2.18.2 — 2024-08-16
65
94
95
+ ### Bug Fixes
96
+
66
97
- [Repo] Prevent debug noise by ensuring default opts for standard transactions.
67
98
68
99
Without default opts each transaction is logged. Many standard operations execute each second,
 
@@ -81,7 +112,7 @@ a `pid` via observer or live dashboard.
81
112
82
113
- [Repo] Automatically retry all transactions with backoff.
83
114
84
- Avoid both expected an unexpected database errors by automatically retrying transactions. Some
115
+ Avoid both expected and unexpected database errors by automatically retrying transactions. Some
85
116
operations, such as serialization and lock not available errors, are likely to occur during
86
117
standard use depending on how a database is configured. Other errors happen infrequently due to
87
118
pool contention or flickering connections, and those should also be retried for increased
changed hex_metadata.config
 
@@ -5,7 +5,7 @@
5
5
{<<"GitHub">>,<<"https://github.com/sorentwo/oban">>},
6
6
{<<"Sponsor">>,<<"https://getoban.pro">>}]}.
7
7
{<<"name">>,<<"oban">>}.
8
- {<<"version">>,<<"2.18.2">>}.
8
+ {<<"version">>,<<"2.18.3">>}.
9
9
{<<"description">>,
10
10
<<"Robust job processing, backed by modern PostgreSQL and SQLite3.">>}.
11
11
{<<"elixir">>,<<"~> 1.14">>}.
 
@@ -33,19 +33,20 @@
33
33
<<"lib/oban/plugins/lifeline.ex">>,<<"lib/oban/plugins/cron.ex">>,
34
34
<<"lib/oban/plugins/repeater.ex">>,<<"lib/oban/plugins/reindexer.ex">>,
35
35
<<"lib/oban/plugins/gossip.ex">>,<<"lib/oban/plugins/pruner.ex">>,
36
- <<"lib/oban/config.ex">>,<<"lib/oban/testing.ex">>,
37
- <<"lib/oban/migration.ex">>,<<"lib/oban/engines">>,
38
- <<"lib/oban/engines/inline.ex">>,<<"lib/oban/engines/basic.ex">>,
39
- <<"lib/oban/engines/lite.ex">>,<<"lib/oban/queue">>,
40
- <<"lib/oban/queue/supervisor.ex">>,<<"lib/oban/queue/producer.ex">>,
41
- <<"lib/oban/queue/drainer.ex">>,<<"lib/oban/queue/watchman.ex">>,
42
- <<"lib/oban/queue/executor.ex">>,<<"lib/oban/validation.ex">>,
43
- <<"lib/oban/peers">>,<<"lib/oban/peers/isolated.ex">>,
44
- <<"lib/oban/peers/global.ex">>,<<"lib/oban/peers/postgres.ex">>,
45
- <<"lib/oban/notifiers">>,<<"lib/oban/notifiers/isolated.ex">>,
46
- <<"lib/oban/notifiers/pg.ex">>,<<"lib/oban/notifiers/postgres.ex">>,
47
- <<"lib/oban/peer.ex">>,<<"lib/oban/application.ex">>,
48
- <<"lib/oban/midwife.ex">>,<<"lib/oban/engine.ex">>,<<"lib/oban/repo.ex">>,
36
+ <<"lib/oban/config.ex">>,<<"lib/oban/nursery.ex">>,
37
+ <<"lib/oban/testing.ex">>,<<"lib/oban/migration.ex">>,
38
+ <<"lib/oban/engines">>,<<"lib/oban/engines/inline.ex">>,
39
+ <<"lib/oban/engines/basic.ex">>,<<"lib/oban/engines/lite.ex">>,
40
+ <<"lib/oban/queue">>,<<"lib/oban/queue/supervisor.ex">>,
41
+ <<"lib/oban/queue/producer.ex">>,<<"lib/oban/queue/drainer.ex">>,
42
+ <<"lib/oban/queue/watchman.ex">>,<<"lib/oban/queue/executor.ex">>,
43
+ <<"lib/oban/validation.ex">>,<<"lib/oban/peers">>,
44
+ <<"lib/oban/peers/isolated.ex">>,<<"lib/oban/peers/global.ex">>,
45
+ <<"lib/oban/peers/postgres.ex">>,<<"lib/oban/notifiers">>,
46
+ <<"lib/oban/notifiers/isolated.ex">>,<<"lib/oban/notifiers/pg.ex">>,
47
+ <<"lib/oban/notifiers/postgres.ex">>,<<"lib/oban/peer.ex">>,
48
+ <<"lib/oban/application.ex">>,<<"lib/oban/midwife.ex">>,
49
+ <<"lib/oban/engine.ex">>,<<"lib/oban/repo.ex">>,
49
50
<<"lib/oban/exceptions.ex">>,<<"lib/oban/cron">>,
50
51
<<"lib/oban/cron/expression.ex">>,<<"lib/oban/plugin.ex">>,
51
52
<<"lib/oban/stager.ex">>,<<"lib/oban.ex">>,<<".formatter.exs">>,
changed lib/oban.ex
 
@@ -26,7 +26,7 @@ defmodule Oban do
26
26
use Supervisor
27
27
28
28
alias Ecto.{Changeset, Multi}
29
- alias Oban.{Config, Engine, Job, Midwife, Notifier, Peer, Registry, Sonar, Stager}
29
+ alias Oban.{Config, Engine, Job, Notifier, Nursery, Peer, Registry, Sonar, Stager}
30
30
alias Oban.Queue.{Drainer, Producer}
31
31
32
32
@typedoc """
 
@@ -329,7 +329,8 @@ defmodule Oban do
329
329
Defaults to the `Postgres` peer.
330
330
331
331
* `:plugins` — a list or modules or module/option tuples that are started as children of an Oban
332
- supervisor. Any supervisable module is a valid plugin, i.e. a `GenServer` or an `Agent`.
332
+ supervisor. Any supervisable module is a valid plugin, i.e. a `GenServer` or an `Agent`. May
333
+ also be set to `false` to disable plugins _and_ disable leadership.
333
334
334
335
* `:prefix` — the query prefix, or schema, to use for inserting and executing jobs. An
335
336
`oban_jobs` table must exist within the prefix. See the "Prefix Support" section in the module
 
@@ -343,6 +344,8 @@ defmodule Oban do
343
344
Queues accept additional override options to customize their behavior, e.g. by setting
344
345
`paused` or `dispatch_cooldown` for a specific queue.
345
346
347
+ Using an empty list or `false` prevents any queues from starting on init.
348
+
346
349
* `:testing` — a mode that controls how an instance is configured for testing. When set to
347
350
`:inline` or `:manual` queues, peers, and plugins are automatically disabled. Defaults to
348
351
`:disabled`, no test mode.
 
@@ -465,14 +468,13 @@ defmodule Oban do
465
468
def whereis(name), do: Registry.whereis(name)
466
469
467
470
@impl Supervisor
468
- def init(%Config{plugins: plugins} = conf) do
471
+ def init(%Config{name: name, plugins: plugins} = conf) do
469
472
children = [
470
- {Notifier, conf: conf, name: Registry.via(conf.name, Notifier)},
471
- {DynamicSupervisor, name: Registry.via(conf.name, Foreman), strategy: :one_for_one},
472
- {Peer, conf: conf, name: Registry.via(conf.name, Peer)},
473
- {Sonar, conf: conf, name: Registry.via(conf.name, Sonar)},
474
- {Midwife, conf: conf, name: Registry.via(conf.name, Midwife)},
475
- {Stager, conf: conf, name: Registry.via(conf.name, Stager)}
473
+ {Notifier, conf: conf, name: Registry.via(name, Notifier)},
474
+ {Nursery, conf: conf, name: Registry.via(name, Nursery)},
475
+ {Peer, conf: conf, name: Registry.via(name, Peer)},
476
+ {Sonar, conf: conf, name: Registry.via(name, Sonar)},
477
+ {Stager, conf: conf, name: Registry.via(name, Stager)}
476
478
]
477
479
478
480
children = children ++ Enum.map(plugins, &plugin_child_spec(&1, conf))
changed lib/oban/engines/basic.ex
 
@@ -17,6 +17,14 @@ defmodule Oban.Engines.Basic do
17
17
alias Ecto.Changeset
18
18
alias Oban.{Config, Engine, Job, Repo}
19
19
20
+ # This is a replacement for `push`, which uses `array_append` and isn't compatible with jsonb
21
+ # arrays. The `||` operator works with both arrays and jsonb.
22
+ defmacrop concat_errors(column, error) do
23
+ quote do
24
+ fragment("? || ?", unquote(column), unquote(error))
25
+ end
26
+ end
27
+
20
28
@impl Engine
21
29
def init(%Config{} = conf, opts) do
22
30
if Keyword.has_key?(opts, :limit) do
 
@@ -193,24 +201,36 @@ defmodule Oban.Engines.Basic do
193
201
194
202
@impl Engine
195
203
def discard_job(%Config{} = conf, %Job{} = job) do
196
- updates = [
197
- set: [state: "discarded", discarded_at: utc_now()],
198
- push: [errors: Job.format_attempt(job)]
199
- ]
204
+ query =
205
+ Job
206
+ |> where(id: ^job.id)
207
+ |> update([j],
208
+ set: [
209
+ state: "discarded",
210
+ discarded_at: ^utc_now(),
211
+ errors: concat_errors(j.errors, ^[Job.format_attempt(job)])
212
+ ]
213
+ )
200
214
201
- Repo.update_all(conf, where(Job, id: ^job.id), updates)
215
+ Repo.update_all(conf, query, [])
202
216
203
217
:ok
204
218
end
205
219
206
220
@impl Engine
207
221
def error_job(%Config{} = conf, %Job{} = job, seconds) when is_integer(seconds) do
208
- updates = [
209
- set: [state: "retryable", scheduled_at: seconds_from_now(seconds)],
210
- push: [errors: Job.format_attempt(job)]
211
- ]
222
+ query =
223
+ Job
224
+ |> where(id: ^job.id)
225
+ |> update([j],
226
+ set: [
227
+ state: "retryable",
228
+ scheduled_at: ^seconds_from_now(seconds),
229
+ errors: concat_errors(j.errors, ^[Job.format_attempt(job)])
230
+ ]
231
+ )
212
232
213
- Repo.update_all(conf, where(Job, id: ^job.id), updates)
233
+ Repo.update_all(conf, query, [])
214
234
215
235
:ok
216
236
end
 
@@ -234,8 +254,11 @@ defmodule Oban.Engines.Basic do
234
254
query =
235
255
if is_map(job.unsaved_error) do
236
256
update(query, [j],
237
- set: [state: "cancelled", cancelled_at: ^utc_now()],
238
- push: [errors: ^Job.format_attempt(job)]
257
+ set: [
258
+ state: "cancelled",
259
+ cancelled_at: ^utc_now(),
260
+ errors: concat_errors(j.errors, ^[Job.format_attempt(job)])
261
+ ]
239
262
)
240
263
else
241
264
query
changed lib/oban/engines/inline.ex
 
@@ -76,7 +76,7 @@ defmodule Oban.Engines.Inline do
76
76
defp expand(value), do: expand(value, %{})
77
77
defp expand(fun, changes) when is_function(fun, 1), do: expand(fun.(changes), changes)
78
78
defp expand(%{changesets: changesets}, _), do: expand(changesets, %{})
79
- defp expand(changesets, _) when is_list(changesets), do: changesets
79
+ defp expand(changesets, _), do: changesets
80
80
81
81
# Execution Helpers
added lib/oban/nursery.ex
 
@@ -0,0 +1,33 @@
1
+ defmodule Oban.Nursery do
2
+ @moduledoc false
3
+
4
+ use Supervisor
5
+
6
+ alias Oban.{Config, Midwife, Registry}
7
+
8
+ @type opts :: [conf: Config.t(), name: GenServer.name()]
9
+
10
+ @spec start_link(opts()) :: Supervisor.on_start()
11
+ def start_link(opts) when is_list(opts) do
12
+ Supervisor.start_link(__MODULE__, opts, name: opts[:name])
13
+ end
14
+
15
+ @spec child_spec(opts()) :: Supervisor.child_spec()
16
+ def child_spec(opts) do
17
+ name = Keyword.fetch!(opts, :name)
18
+
19
+ %{super(opts) | id: name}
20
+ end
21
+
22
+ @impl Supervisor
23
+ def init(opts) do
24
+ conf = Keyword.fetch!(opts, :conf)
25
+
26
+ children = [
27
+ {DynamicSupervisor, name: Registry.via(conf.name, Foreman)},
28
+ {Midwife, conf: conf, name: Registry.via(conf.name, Midwife)}
29
+ ]
30
+
31
+ Supervisor.init(children, max_restarts: 5, max_seconds: 30, strategy: :rest_for_one)
32
+ end
33
+ end
changed mix.exs
 
@@ -2,7 +2,7 @@ defmodule Oban.MixProject do
2
2
use Mix.Project
3
3
4
4
@source_url "https://github.com/sorentwo/oban"
5
- @version "2.18.2"
5
+ @version "2.18.3"
6
6
7
7
def project do
8
8
[