-
Notifications
You must be signed in to change notification settings - Fork 159
Add new HookMetricEmit that can emit arbitrary metrics
#1285
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -181,15 +181,16 @@ type producer struct { | |
| // Jobs which are currently being worked. Only used by main goroutine. | ||
| activeJobs map[int64]*jobexecutor.JobExecutor | ||
|
|
||
| completer jobcompleter.JobCompleter | ||
| config *producerConfig | ||
| id atomic.Int64 // atomic because it's written at startup and read during shutdown | ||
| exec riverdriver.Executor | ||
| errorHandler jobexecutor.ErrorHandler | ||
| fetchLimiter *chanutil.DebouncedChan | ||
| state riverpilot.ProducerState | ||
| pilot riverpilot.Pilot | ||
| workers *Workers | ||
| completer jobcompleter.JobCompleter | ||
| config *producerConfig | ||
| id atomic.Int64 // atomic because it's written at startup and read during shutdown | ||
| exec riverdriver.Executor | ||
| errorHandler jobexecutor.ErrorHandler | ||
| fetchLimiter *chanutil.DebouncedChan | ||
| metricEmitHooks []rivertype.HookMetricEmit // memoized hooks of type HookMetricEmit for reuse in dispatchWork | ||
| state riverpilot.ProducerState | ||
| pilot riverpilot.Pilot | ||
| workers *Workers | ||
|
|
||
| // Receives job IDs to cancel. Written by notifier goroutine, only read from | ||
| // main goroutine. | ||
|
|
@@ -233,7 +234,7 @@ func newProducer(archetype *baseservice.Archetype, exec riverdriver.Executor, pi | |
| errorHandler = &errorHandlerAdapter{config.ErrorHandler} | ||
| } | ||
|
|
||
| return baseservice.Init(archetype, &producer{ | ||
| producer := baseservice.Init(archetype, &producer{ | ||
| activeJobs: make(map[int64]*jobexecutor.JobExecutor), | ||
| cancelCh: make(chan int64, 1000), | ||
| completer: config.Completer, | ||
|
|
@@ -247,6 +248,10 @@ func newProducer(archetype *baseservice.Archetype, exec riverdriver.Executor, pi | |
| retryPolicy: config.RetryPolicy, | ||
| workers: config.Workers, | ||
| }) | ||
|
|
||
| producer.metricEmitHooks = producer.metricEmitHooksFromLookup() | ||
|
|
||
| return producer | ||
| } | ||
|
|
||
| // Start starts the producer. It backgrounds a goroutine which is stopped when | ||
|
|
@@ -743,7 +748,33 @@ func (p *producer) maybeCancelJob(ctx context.Context, id int64) { | |
| executor.Cancel(ctx) | ||
| } | ||
|
|
||
| func (p *producer) metricEmitHooksFromLookup() []rivertype.HookMetricEmit { | ||
| hookLookup := p.config.HookLookupGlobal | ||
| if hookLookup == nil { | ||
| return nil | ||
| } | ||
|
|
||
| hooks := hookLookup.ByHookKind(hooklookup.HookKindMetricEmit) | ||
| if len(hooks) < 1 { | ||
| return nil | ||
| } | ||
|
|
||
| metricEmitHooks := make([]rivertype.HookMetricEmit, len(hooks)) | ||
| for i, hook := range hooks { | ||
| metricEmitHooks[i] = hook.(rivertype.HookMetricEmit) //nolint:forcetypeassert | ||
| } | ||
|
|
||
| return metricEmitHooks | ||
| } | ||
|
|
||
| func (p *producer) dispatchWork(workCtx context.Context, count int, fetchResultCh chan<- producerFetchResult) { | ||
| // When a queue is paused, innerFetchLoop dispatches with count zero so it can | ||
| // continue servicing state changes without attempting to lock jobs or emit metrics. | ||
| if count <= 0 { | ||
| fetchResultCh <- producerFetchResult{} | ||
| return | ||
| } | ||
|
|
||
| // This intentionally removes any deadlines or cancellation from the parent | ||
| // context because we don't want it to get cancelled if the producer is asked | ||
| // to shut down. In that situation, we want to finish fetching any jobs we are | ||
|
|
@@ -757,6 +788,11 @@ func (p *producer) dispatchWork(workCtx context.Context, count int, fetchResultC | |
| // rarely hit, but exists to protect against degenerate cases. | ||
| const maxAttemptedBy = 100 | ||
|
|
||
| var startedAt time.Time | ||
| if len(p.metricEmitHooks) > 0 { | ||
| startedAt = time.Now() | ||
| } | ||
|
|
||
| jobs, err := p.pilot.JobGetAvailable(ctx, p.exec, p.state, &riverdriver.JobGetAvailableParams{ | ||
| ClientID: p.config.ClientID, | ||
| MaxAttemptedBy: maxAttemptedBy, | ||
|
|
@@ -771,9 +807,30 @@ func (p *producer) dispatchWork(workCtx context.Context, count int, fetchResultC | |
| return | ||
| } | ||
|
|
||
| if len(p.metricEmitHooks) > 0 { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When a queue is paused, Useful? React with 👍 / 👎.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Modified |
||
| p.emitMetric(ctx, &rivertype.HookMetricEmitParams{ | ||
| Metric: &rivertype.JobGetAvailableDurationMetric{ | ||
| Duration: time.Since(startedAt), | ||
| Queue: p.config.Queue, | ||
| }, | ||
| }) | ||
| p.emitMetric(ctx, &rivertype.HookMetricEmitParams{ | ||
| Metric: &rivertype.JobGetAvailableCountMetric{ | ||
| Count: len(jobs), | ||
| Queue: p.config.Queue, | ||
| }, | ||
| }) | ||
| } | ||
|
|
||
| fetchResultCh <- producerFetchResult{jobs: jobs} | ||
| } | ||
|
|
||
| func (p *producer) emitMetric(ctx context.Context, params *rivertype.HookMetricEmitParams) { | ||
| for _, hook := range p.metricEmitHooks { | ||
| hook.MetricEmit(ctx, params) | ||
| } | ||
| } | ||
|
|
||
| // Periodically logs an informational log line giving some insight into the | ||
| // current state of the producer. | ||
| func (p *producer) heartbeatLogLoop(ctx context.Context, wg *sync.WaitGroup) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When
Config.Test.Timesupplies a stubbed time generator, this captures real wall-clock time instead of River's configured service clock, and the latertime.Since(startedAt)keeps the new metric nondeterministic under time stubbing. Metric hooks used in tests will see durations that don't advance withp.Time, unlike the rest of the producer's timing/database timestamps; capture and subtract withp.Time.Now()on both sides instead.Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think it's okay to use the stub-free version for metric durations. I did it on purpose since the code is made a little clearer and I had no plans to sub the value.