Skip to content

Commit f41afcc

Browse files
committed
Add leadership "domains" so multiple Rivers can operate in one schema
We've gotten a couple requests so far (see #342 and #1105) to be able to start multiple River clients targeting different queues within the same database/schema, and giving them the capacity to operate independently enough to be functional. This is currently not possible because a single leader is elected given a single schema and it handles all maintenance operations including non-queue ones like periodic job enqueuing. Here, add the idea of a `LeaderDomain`. This lets a user set the "domain" on which a client will elect its leader and allowing multiple leaders to be elected in a single schema. Each leader will run its own maintenance services. Setting `LeaderDomain` causes the additional effect of having maintenance services start to operate only on the queues that their client is configured for. The idea here is to give us backwards compatibility in that the default behavior (in case of an unset `LeaderDomain`) is the same, but providing a path for multiple leaders to be interoperable with each other. There are still a few edges: for example, reindexing is not queue specific, so multiple leaders could be running a reindexer. I've provided guidance in the config documentation that ideally, all clients but one should have their reindexer disabled.
1 parent ef2048a commit f41afcc

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1181
-298
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Added
11+
12+
- Added `Config.LeaderDomain` to allow multiple River clients to be elected leader within a single schema/database and run maintenance services on only their configured queues. [PR #1113](https://github.com/riverqueue/river/pull/1113).
13+
1014
## [0.29.0] - 2025-12-22
1115

1216
### Added

client.go

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"log/slog"
1010
"os"
1111
"regexp"
12+
"slices"
1213
"strings"
1314
"sync"
1415
"time"
@@ -209,6 +210,47 @@ type Config struct {
209210
// Jobs may have their own specific hooks by implementing JobArgsWithHooks.
210211
Hooks []rivertype.Hook
211212

213+
// LeaderDomain is an optional "domain" string to use for leader election.
214+
// Different clients sharing the same River schema can elect multiple
215+
// leaders as long as they're using different domains, with one leader
216+
// elected per domain.
217+
//
218+
// Setting this value also triggers the related behavior that maintenance
219+
// services start to only operate on the queues they're configured on. So
220+
// for example, given client1 handling queue_a and queue_b and client2
221+
// handling queue_c and queue_d, whichever client is elected leader will end
222+
// up running all maintenance services for all queues (queue_a, queue_b,
223+
// queue_c, and queue_d). But if client1 is using domain "domain1" and
224+
// client2 is using domain "domain2", then client1 (elected in domain1) will
225+
// only run maintenance services on queue_a and queue_b, while client2
226+
// (elected in domain2) will run maintenance services on queue_c and
227+
// queue_d.
228+
//
229+
// A warning though that River *does not protect against configuration
230+
// mistakes*. If client1 on domain1 is configured for queue_a and queue_b,
231+
// and client2 on domain2 is *also* configured for queue_a and queue_b, then
232+
// both clients may end up running maintenance services on the same queues
233+
// at the same time. It's the caller's responsibility to ensure that doesn't
234+
// happen.
235+
//
236+
// Left empty or use of the special value "default" causes the client to
237+
// operate on all queues. When setting this value to non-empty
238+
// non-"default", no other clients should be left empty or use "default"
239+
// because the default client(s) will infringe on the domains of the
240+
// non-default one(s).
241+
//
242+
// Certain maintenance services that aren't queue-related like the indexer
243+
// will continue to run on all leaders regardless of domain. If using this
244+
// feature, it's a good idea to configure ReindexerTimeout on all but a
245+
// single leader domain to river.NeverSchedule().
246+
//
247+
// In general, most River users should not need LeaderDomain, and when
248+
// running multiple Rivers may want to consider using multiple databases and
249+
// multiple schemas instead.
250+
//
251+
// Defaults to "default".
252+
LeaderDomain string
253+
212254
// Logger is the structured logger to use for logging purposes. If none is
213255
// specified, logs will be emitted to STDOUT with messages at warn level
214256
// or higher.
@@ -415,6 +457,7 @@ func (c *Config) WithDefaults() *Config {
415457
Hooks: c.Hooks,
416458
JobInsertMiddleware: c.JobInsertMiddleware,
417459
JobTimeout: cmp.Or(c.JobTimeout, JobTimeoutDefault),
460+
LeaderDomain: c.LeaderDomain,
418461
Logger: logger,
419462
MaxAttempts: cmp.Or(c.MaxAttempts, MaxAttemptsDefault),
420463
Middleware: c.Middleware,
@@ -840,6 +883,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
840883

841884
client.elector = leadership.NewElector(archetype, driver.GetExecutor(), client.notifier, &leadership.Config{
842885
ClientID: config.ID,
886+
Domain: config.LeaderDomain,
843887
Schema: config.Schema,
844888
})
845889
client.services = append(client.services, client.elector)
@@ -860,6 +904,14 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
860904
client.services = append(client.services, pluginPilot.PluginServices()...)
861905
}
862906

907+
// It's important for queuesIncluded to be `nil` in case it's not in use
908+
// for the various driver queries to work correctly.
909+
var queuesIncluded []string
910+
if config.LeaderDomain != "" && config.LeaderDomain != leadership.DomainDefault && len(config.Queues) > 0 {
911+
queuesIncluded = maputil.Keys(config.Queues)
912+
slices.Sort(queuesIncluded)
913+
}
914+
863915
//
864916
// Maintenance services
865917
//
@@ -872,6 +924,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
872924
CompletedJobRetentionPeriod: config.CompletedJobRetentionPeriod,
873925
DiscardedJobRetentionPeriod: config.DiscardedJobRetentionPeriod,
874926
QueuesExcluded: client.pilot.JobCleanerQueuesExcluded(),
927+
QueuesIncluded: queuesIncluded,
875928
Schema: config.Schema,
876929
Timeout: config.JobCleanerTimeout,
877930
}, driver.GetExecutor())
@@ -882,6 +935,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
882935
{
883936
jobRescuer := maintenance.NewRescuer(archetype, &maintenance.JobRescuerConfig{
884937
ClientRetryPolicy: config.RetryPolicy,
938+
QueuesIncluded: queuesIncluded,
885939
RescueAfter: config.RescueStuckJobsAfter,
886940
Schema: config.Schema,
887941
WorkUnitFactoryFunc: func(kind string) workunit.WorkUnitFactory {
@@ -897,9 +951,10 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
897951

898952
{
899953
jobScheduler := maintenance.NewJobScheduler(archetype, &maintenance.JobSchedulerConfig{
900-
Interval: config.schedulerInterval,
901-
NotifyInsert: client.maybeNotifyInsertForQueues,
902-
Schema: config.Schema,
954+
Interval: config.schedulerInterval,
955+
NotifyInsert: client.maybeNotifyInsertForQueues,
956+
QueuesIncluded: queuesIncluded,
957+
Schema: config.Schema,
903958
}, driver.GetExecutor())
904959
maintenanceServices = append(maintenanceServices, jobScheduler)
905960
client.testSignals.jobScheduler = &jobScheduler.TestSignals
@@ -925,6 +980,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
925980

926981
{
927982
queueCleaner := maintenance.NewQueueCleaner(archetype, &maintenance.QueueCleanerConfig{
983+
QueuesIncluded: queuesIncluded,
928984
RetentionPeriod: maintenance.QueueRetentionPeriodDefault,
929985
Schema: config.Schema,
930986
}, driver.GetExecutor())

client_test.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1491,6 +1491,154 @@ func Test_Client_Common(t *testing.T) {
14911491

14921492
startstoptest.Stress(ctx, t, clientWithStop)
14931493
})
1494+
1495+
t.Run("LeaderDomain_Alternate", func(t *testing.T) {
1496+
t.Parallel()
1497+
1498+
var client1 *Client[pgx.Tx]
1499+
{
1500+
config, bundle := setupConfig(t)
1501+
config.LeaderDomain = "domain1"
1502+
config.ReindexerSchedule = &neverSchedule{}
1503+
config.Queues = map[string]QueueConfig{
1504+
"queue_a": {MaxWorkers: 50},
1505+
"queue_b": {MaxWorkers: 50},
1506+
}
1507+
1508+
var err error
1509+
client1, err = NewClient(bundle.driver, config)
1510+
require.NoError(t, err)
1511+
client1.testSignals.Init(t)
1512+
}
1513+
1514+
var client2 *Client[pgx.Tx]
1515+
{
1516+
config, bundle := setupConfig(t)
1517+
config.LeaderDomain = "domain2"
1518+
config.Queues = map[string]QueueConfig{
1519+
"queue_c": {MaxWorkers: 50},
1520+
"queue_d": {MaxWorkers: 50},
1521+
}
1522+
config.Schema = client1.config.Schema
1523+
config.ReindexerSchedule = &neverSchedule{}
1524+
1525+
var err error
1526+
client2, err = NewClient(bundle.driver, config)
1527+
require.NoError(t, err)
1528+
client2.testSignals.Init(t)
1529+
}
1530+
1531+
startClient(ctx, t, client1)
1532+
startClient(ctx, t, client2)
1533+
1534+
// Both elected
1535+
client1.testSignals.electedLeader.WaitOrTimeout()
1536+
client2.testSignals.electedLeader.WaitOrTimeout()
1537+
})
1538+
1539+
t.Run("LeaderDomain_MaintenanceServiceConfigEmpty", func(t *testing.T) {
1540+
t.Parallel()
1541+
1542+
config, bundle := setupConfig(t)
1543+
config.Queues = map[string]QueueConfig{
1544+
"queue_a": {MaxWorkers: 50},
1545+
"queue_b": {MaxWorkers: 50},
1546+
}
1547+
1548+
client, err := NewClient(bundle.driver, config)
1549+
require.NoError(t, err)
1550+
client.testSignals.Init(t)
1551+
1552+
jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client.queueMaintainer)
1553+
require.Nil(t, jobCleaner.Config.QueuesIncluded)
1554+
jobRescuer := maintenance.GetService[*maintenance.JobRescuer](client.queueMaintainer)
1555+
require.Nil(t, jobRescuer.Config.QueuesIncluded)
1556+
jobScheduler := maintenance.GetService[*maintenance.JobScheduler](client.queueMaintainer)
1557+
require.Nil(t, jobScheduler.Config.QueuesIncluded)
1558+
queueCleaner := maintenance.GetService[*maintenance.QueueCleaner](client.queueMaintainer)
1559+
require.Nil(t, queueCleaner.Config.QueuesIncluded)
1560+
})
1561+
1562+
// The domain "default" is special in that it behaves like if LeaderDomain
1563+
// was not set.
1564+
t.Run("LeaderDomain_MaintenanceServiceConfigDefault", func(t *testing.T) {
1565+
t.Parallel()
1566+
1567+
config, bundle := setupConfig(t)
1568+
config.LeaderDomain = "default"
1569+
config.Queues = map[string]QueueConfig{
1570+
"queue_a": {MaxWorkers: 50},
1571+
"queue_b": {MaxWorkers: 50},
1572+
}
1573+
1574+
client, err := NewClient(bundle.driver, config)
1575+
require.NoError(t, err)
1576+
client.testSignals.Init(t)
1577+
1578+
jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client.queueMaintainer)
1579+
require.Nil(t, jobCleaner.Config.QueuesIncluded)
1580+
jobRescuer := maintenance.GetService[*maintenance.JobRescuer](client.queueMaintainer)
1581+
require.Nil(t, jobRescuer.Config.QueuesIncluded)
1582+
jobScheduler := maintenance.GetService[*maintenance.JobScheduler](client.queueMaintainer)
1583+
require.Nil(t, jobScheduler.Config.QueuesIncluded)
1584+
queueCleaner := maintenance.GetService[*maintenance.QueueCleaner](client.queueMaintainer)
1585+
require.Nil(t, queueCleaner.Config.QueuesIncluded)
1586+
})
1587+
1588+
// When non-default leader domains are configured, each client's maintenance
1589+
// services are limited to only their client's queues.
1590+
t.Run("LeaderDomain_MaintenanceServiceConfigAlternate", func(t *testing.T) {
1591+
t.Parallel()
1592+
1593+
var client1 *Client[pgx.Tx]
1594+
{
1595+
config, bundle := setupConfig(t)
1596+
config.LeaderDomain = "domain1"
1597+
config.ReindexerSchedule = &neverSchedule{}
1598+
config.Queues = map[string]QueueConfig{
1599+
"queue_a": {MaxWorkers: 50},
1600+
"queue_b": {MaxWorkers: 50},
1601+
}
1602+
1603+
var err error
1604+
client1, err = NewClient(bundle.driver, config)
1605+
require.NoError(t, err)
1606+
client1.testSignals.Init(t)
1607+
1608+
jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client1.queueMaintainer)
1609+
require.Equal(t, []string{"queue_a", "queue_b"}, jobCleaner.Config.QueuesIncluded)
1610+
jobRescuer := maintenance.GetService[*maintenance.JobRescuer](client1.queueMaintainer)
1611+
require.Equal(t, []string{"queue_a", "queue_b"}, jobRescuer.Config.QueuesIncluded)
1612+
jobScheduler := maintenance.GetService[*maintenance.JobScheduler](client1.queueMaintainer)
1613+
require.Equal(t, []string{"queue_a", "queue_b"}, jobScheduler.Config.QueuesIncluded)
1614+
queueCleaner := maintenance.GetService[*maintenance.QueueCleaner](client1.queueMaintainer)
1615+
require.Equal(t, []string{"queue_a", "queue_b"}, queueCleaner.Config.QueuesIncluded)
1616+
}
1617+
1618+
{
1619+
config, bundle := setupConfig(t)
1620+
config.LeaderDomain = "domain2"
1621+
config.Queues = map[string]QueueConfig{
1622+
"queue_c": {MaxWorkers: 50},
1623+
"queue_d": {MaxWorkers: 50},
1624+
}
1625+
config.Schema = client1.config.Schema
1626+
config.ReindexerSchedule = &neverSchedule{}
1627+
1628+
client2, err := NewClient(bundle.driver, config)
1629+
require.NoError(t, err)
1630+
client2.testSignals.Init(t)
1631+
1632+
jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client2.queueMaintainer)
1633+
require.Equal(t, []string{"queue_c", "queue_d"}, jobCleaner.Config.QueuesIncluded)
1634+
jobRescuer := maintenance.GetService[*maintenance.JobRescuer](client2.queueMaintainer)
1635+
require.Equal(t, []string{"queue_c", "queue_d"}, jobRescuer.Config.QueuesIncluded)
1636+
jobScheduler := maintenance.GetService[*maintenance.JobScheduler](client2.queueMaintainer)
1637+
require.Equal(t, []string{"queue_c", "queue_d"}, jobScheduler.Config.QueuesIncluded)
1638+
queueCleaner := maintenance.GetService[*maintenance.QueueCleaner](client2.queueMaintainer)
1639+
require.Equal(t, []string{"queue_c", "queue_d"}, queueCleaner.Config.QueuesIncluded)
1640+
}
1641+
})
14941642
}
14951643

14961644
type workerWithMiddleware[T JobArgs] struct {

0 commit comments

Comments
 (0)