Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,11 @@ type Config struct {
// lifecycle (see rivertype.Hook), installed globally.
//
// The effect of hooks in this list will depend on the specific hook
// interfaces they implement, so for example implementing
// rivertype.HookInsertBegin will cause the hook to be invoked before a job
// is inserted, or implementing rivertype.HookWorkBegin will cause it to be
// invoked before a job is worked. Hook structs may implement multiple hook
// interfaces.
// interfaces they implement. rivertype.HookInsertBegin will cause the hook
// to be invoked before a job is inserted. rivertype.HookMetricEmit will
// cause the hook to be invoked when River emits a metric. Implementing
// rivertype.HookWorkBegin will cause it to be invoked before a job is
// worked. Hook structs may implement multiple hook interfaces.
//
// Order in this list is significant. A hook that appears first will be
// entered before a hook that appears later. For any particular phase, order
Expand Down
10 changes: 10 additions & 0 deletions hook_defaults_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ func (f HookInsertBeginFunc) InsertBegin(ctx context.Context, params *rivertype.

func (f HookInsertBeginFunc) IsHook() bool { return true }

// HookMetricEmitFunc is a convenience helper for implementing
// rivertype.HookMetricEmit using a simple function instead of a struct.
type HookMetricEmitFunc func(ctx context.Context, params *rivertype.HookMetricEmitParams)

func (f HookMetricEmitFunc) IsHook() bool { return true }

func (f HookMetricEmitFunc) MetricEmit(ctx context.Context, params *rivertype.HookMetricEmitParams) {
f(ctx, params)
}

// HookPeriodicJobsStartFunc is a convenience helper for implementing
// rivertype.HookPeriodicJobsStart using a simple function instead of a struct.
type HookPeriodicJobsStartFunc func(ctx context.Context, params *rivertype.HookPeriodicJobsStartParams) error
Expand Down
6 changes: 6 additions & 0 deletions hook_defaults_funcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,15 @@ var (
_ rivertype.Hook = HookInsertBeginFunc(func(ctx context.Context, params *rivertype.JobInsertParams) error { return nil })
_ rivertype.HookInsertBegin = HookInsertBeginFunc(func(ctx context.Context, params *rivertype.JobInsertParams) error { return nil })

_ rivertype.Hook = HookMetricEmitFunc(func(ctx context.Context, params *rivertype.HookMetricEmitParams) {})
_ rivertype.HookMetricEmit = HookMetricEmitFunc(func(ctx context.Context, params *rivertype.HookMetricEmitParams) {})

_ rivertype.Hook = HookPeriodicJobsStartFunc(func(ctx context.Context, params *rivertype.HookPeriodicJobsStartParams) error { return nil })
_ rivertype.HookPeriodicJobsStart = HookPeriodicJobsStartFunc(func(ctx context.Context, params *rivertype.HookPeriodicJobsStartParams) error { return nil })

_ rivertype.Hook = HookWorkBeginFunc(func(ctx context.Context, job *rivertype.JobRow) error { return nil })
_ rivertype.HookWorkBegin = HookWorkBeginFunc(func(ctx context.Context, job *rivertype.JobRow) error { return nil })

_ rivertype.Hook = HookWorkEndFunc(func(ctx context.Context, job *rivertype.JobRow, err error) error { return err })
_ rivertype.HookWorkEnd = HookWorkEndFunc(func(ctx context.Context, job *rivertype.JobRow, err error) error { return err })
)
7 changes: 7 additions & 0 deletions internal/hooklookup/hook_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type HookKind string

const (
HookKindInsertBegin HookKind = "insert_begin"
HookKindMetricEmit HookKind = "metric_emit"
HookKindPeriodicJobsStart HookKind = "periodic_job_start"
HookKindWorkBegin HookKind = "work_begin"
HookKindWorkEnd HookKind = "work_end"
Expand Down Expand Up @@ -84,6 +85,12 @@ func (c *hookLookup) ByHookKind(kind HookKind) []rivertype.Hook {
c.hooksByKind[kind] = append(c.hooksByKind[kind], typedHook)
}
}
case HookKindMetricEmit:
for _, hook := range c.hooks {
if typedHook, ok := hook.(rivertype.HookMetricEmit); ok {
c.hooksByKind[kind] = append(c.hooksByKind[kind], typedHook)
}
}
case HookKindPeriodicJobsStart:
for _, hook := range c.hooks {
if typedHook, ok := hook.(rivertype.HookPeriodicJobsStart); ok {
Expand Down
22 changes: 21 additions & 1 deletion internal/hooklookup/hook_lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func TestHookLookup(t *testing.T) {
return NewHookLookup([]rivertype.Hook{ //nolint:forcetypeassert
&testHookInsertAndWorkBegin{},
&testHookInsertBegin{},
&testHookMetricEmit{},
&testHookWorkBegin{},
&testHookWorkEnd{},
}).(*hookLookup), &testBundle{}
Expand All @@ -35,6 +36,9 @@ func TestHookLookup(t *testing.T) {
&testHookInsertAndWorkBegin{},
&testHookInsertBegin{},
}, hookLookup.ByHookKind(HookKindInsertBegin))
require.Equal(t, []rivertype.Hook{
&testHookMetricEmit{},
}, hookLookup.ByHookKind(HookKindMetricEmit))
require.Equal(t, []rivertype.Hook{
&testHookInsertAndWorkBegin{},
&testHookWorkBegin{},
Expand All @@ -43,13 +47,16 @@ func TestHookLookup(t *testing.T) {
&testHookWorkEnd{},
}, hookLookup.ByHookKind(HookKindWorkEnd))

require.Len(t, hookLookup.hooksByKind, 3)
require.Len(t, hookLookup.hooksByKind, 4)

// Repeat lookups to make sure we get the same result.
require.Equal(t, []rivertype.Hook{
&testHookInsertAndWorkBegin{},
&testHookInsertBegin{},
}, hookLookup.ByHookKind(HookKindInsertBegin))
require.Equal(t, []rivertype.Hook{
&testHookMetricEmit{},
}, hookLookup.ByHookKind(HookKindMetricEmit))
require.Equal(t, []rivertype.Hook{
&testHookInsertAndWorkBegin{},
&testHookWorkBegin{},
Expand All @@ -75,6 +82,7 @@ func TestHookLookup(t *testing.T) {
}

parallelLookupLoop(HookKindInsertBegin)
parallelLookupLoop(HookKindMetricEmit)
parallelLookupLoop(HookKindWorkBegin)
parallelLookupLoop(HookKindInsertBegin)
parallelLookupLoop(HookKindWorkBegin)
Expand All @@ -100,6 +108,7 @@ func TestEmptyHookLookup(t *testing.T) {
hookLookup, _ := setup(t)

require.Nil(t, hookLookup.ByHookKind(HookKindInsertBegin))
require.Nil(t, hookLookup.ByHookKind(HookKindMetricEmit))
require.Nil(t, hookLookup.ByHookKind(HookKindWorkBegin))
})
}
Expand Down Expand Up @@ -241,6 +250,17 @@ func (t *testHookInsertBegin) InsertBegin(ctx context.Context, params *rivertype
return nil
}

//
// testHookMetricEmit
//

var _ rivertype.HookMetricEmit = &testHookMetricEmit{}

type testHookMetricEmit struct{ rivertype.Hook }

func (t *testHookMetricEmit) MetricEmit(ctx context.Context, params *rivertype.HookMetricEmitParams) {
}

//
// testHookWorkBegin
//
Expand Down
77 changes: 67 additions & 10 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,16 @@ type producer struct {
// Jobs which are currently being worked. Only used by main goroutine.
activeJobs map[int64]*jobexecutor.JobExecutor

completer jobcompleter.JobCompleter
config *producerConfig
id atomic.Int64 // atomic because it's written at startup and read during shutdown
exec riverdriver.Executor
errorHandler jobexecutor.ErrorHandler
fetchLimiter *chanutil.DebouncedChan
state riverpilot.ProducerState
pilot riverpilot.Pilot
workers *Workers
completer jobcompleter.JobCompleter
config *producerConfig
id atomic.Int64 // atomic because it's written at startup and read during shutdown
exec riverdriver.Executor
errorHandler jobexecutor.ErrorHandler
fetchLimiter *chanutil.DebouncedChan
metricEmitHooks []rivertype.HookMetricEmit // memoized hooks of type HookMetricEmit for reuse in dispatchWork
state riverpilot.ProducerState
pilot riverpilot.Pilot
workers *Workers

// Receives job IDs to cancel. Written by notifier goroutine, only read from
// main goroutine.
Expand Down Expand Up @@ -233,7 +234,7 @@ func newProducer(archetype *baseservice.Archetype, exec riverdriver.Executor, pi
errorHandler = &errorHandlerAdapter{config.ErrorHandler}
}

return baseservice.Init(archetype, &producer{
producer := baseservice.Init(archetype, &producer{
activeJobs: make(map[int64]*jobexecutor.JobExecutor),
cancelCh: make(chan int64, 1000),
completer: config.Completer,
Expand All @@ -247,6 +248,10 @@ func newProducer(archetype *baseservice.Archetype, exec riverdriver.Executor, pi
retryPolicy: config.RetryPolicy,
workers: config.Workers,
})

producer.metricEmitHooks = producer.metricEmitHooksFromLookup()

return producer
}

// Start starts the producer. It backgrounds a goroutine which is stopped when
Expand Down Expand Up @@ -743,7 +748,33 @@ func (p *producer) maybeCancelJob(ctx context.Context, id int64) {
executor.Cancel(ctx)
}

func (p *producer) metricEmitHooksFromLookup() []rivertype.HookMetricEmit {
hookLookup := p.config.HookLookupGlobal
if hookLookup == nil {
return nil
}

hooks := hookLookup.ByHookKind(hooklookup.HookKindMetricEmit)
if len(hooks) < 1 {
return nil
}

metricEmitHooks := make([]rivertype.HookMetricEmit, len(hooks))
for i, hook := range hooks {
metricEmitHooks[i] = hook.(rivertype.HookMetricEmit) //nolint:forcetypeassert
}

return metricEmitHooks
}

func (p *producer) dispatchWork(workCtx context.Context, count int, fetchResultCh chan<- producerFetchResult) {
// When a queue is paused, innerFetchLoop dispatches with count zero so it can
// continue servicing state changes without attempting to lock jobs or emit metrics.
if count <= 0 {
fetchResultCh <- producerFetchResult{}
return
}

// This intentionally removes any deadlines or cancellation from the parent
// context because we don't want it to get cancelled if the producer is asked
// to shut down. In that situation, we want to finish fetching any jobs we are
Expand All @@ -757,6 +788,11 @@ func (p *producer) dispatchWork(workCtx context.Context, count int, fetchResultC
// rarely hit, but exists to protect against degenerate cases.
const maxAttemptedBy = 100

var startedAt time.Time
if len(p.metricEmitHooks) > 0 {
startedAt = time.Now()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3 Badge Use the service clock for fetch duration metrics

When Config.Test.Time supplies a stubbed time generator, this captures real wall-clock time instead of River's configured service clock, and the later time.Since(startedAt) keeps the new metric nondeterministic under time stubbing. Metric hooks used in tests will see durations that don't advance with p.Time, unlike the rest of the producer's timing/database timestamps; capture and subtract with p.Time.Now() on both sides instead.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think it's okay to use the stub-free version for metric durations. I did it on purpose since the code is made a little clearer and I had no plans to sub the value.

}

jobs, err := p.pilot.JobGetAvailable(ctx, p.exec, p.state, &riverdriver.JobGetAvailableParams{
ClientID: p.config.ClientID,
MaxAttemptedBy: maxAttemptedBy,
Expand All @@ -771,9 +807,30 @@ func (p *producer) dispatchWork(workCtx context.Context, count int, fetchResultC
return
}

if len(p.metricEmitHooks) > 0 {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Skip metrics when no JobGetAvailable query runs

When a queue is paused, innerFetchLoop still calls dispatchWork with count == 0, and StandardPilot.JobGetAvailable returns immediately for MaxToLock <= 0 without issuing the DB query. This block still emits both metrics, so paused queues produce near-zero job_get_available_duration samples and zero-count fetches that never actually locked anything, skewing the observability signal this hook is meant to expose; gate emission on count > 0 or on an actual fetch being attempted.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified dispatchWork to return early in case of a count <= 0 (i.e. paused queue).

p.emitMetric(ctx, &rivertype.HookMetricEmitParams{
Metric: &rivertype.JobGetAvailableDurationMetric{
Duration: time.Since(startedAt),
Queue: p.config.Queue,
},
})
p.emitMetric(ctx, &rivertype.HookMetricEmitParams{
Metric: &rivertype.JobGetAvailableCountMetric{
Count: len(jobs),
Queue: p.config.Queue,
},
})
}

fetchResultCh <- producerFetchResult{jobs: jobs}
}

func (p *producer) emitMetric(ctx context.Context, params *rivertype.HookMetricEmitParams) {
for _, hook := range p.metricEmitHooks {
hook.MetricEmit(ctx, params)
}
}

// Periodically logs an informational log line giving some insight into the
// current state of the producer.
func (p *producer) heartbeatLogLoop(ctx context.Context, wg *sync.WaitGroup) {
Expand Down
Loading
Loading