Skip to content

Commit 2bf276b

Browse files
committed
Optimize SQLite's complete happy path where setting completed without metadata
This one follows up #870 to add an optimization for job completion where we separate out the most common case of setting jobs to `completed` without any metadata required and update all of them in a simplified batch query, then do the rest of the completions afterwards. In any non-degenerate queue, most completions will be setting success states so this should help with real world uses, but it also helps us significantly improve SQLite's benchmarking numbers. Here's a new benchmark run where throughput is ~4x what it was doing before and roughly on par with Postgres: $ go run ./cmd/river bench --database-url "sqlite://:memory:" --num-total-jobs 1_000_000 bench: jobs worked [ 0 ], inserted [ 1000000 ], job/sec [ 0.0 ] [0s] bench: jobs worked [ 88218 ], inserted [ 0 ], job/sec [ 44109.0 ] [2s] bench: jobs worked [ 91217 ], inserted [ 0 ], job/sec [ 45608.5 ] [2s] bench: jobs worked [ 88858 ], inserted [ 0 ], job/sec [ 44429.0 ] [2s] bench: jobs worked [ 77219 ], inserted [ 0 ], job/sec [ 38609.5 ] [2s] bench: jobs worked [ 82045 ], inserted [ 0 ], job/sec [ 41022.5 ] [2s] bench: jobs worked [ 84052 ], inserted [ 0 ], job/sec [ 42026.0 ] [2s] bench: jobs worked [ 72028 ], inserted [ 0 ], job/sec [ 36014.0 ] [2s] bench: jobs worked [ 90047 ], inserted [ 0 ], job/sec [ 45023.5 ] [2s] bench: jobs worked [ 88875 ], inserted [ 0 ], job/sec [ 44437.5 ] [2s] bench: jobs worked [ 89240 ], inserted [ 0 ], job/sec [ 44620.0 ] [2s] bench: jobs worked [ 88842 ], inserted [ 0 ], job/sec [ 44421.0 ] [2s] bench: jobs worked [ 59359 ], inserted [ 0 ], job/sec [ 29679.5 ] [2s] bench: total jobs worked [ 1000000 ], total jobs inserted [ 1000000 ], overall job/sec [ 42822.8 ], running 23.35203575s Here's a normal non-memory file-based database: $ go run ./cmd/river bench --database-url "sqlite://./sqlite/bench.sqlite3" --num-total-jobs 1_000_000 bench: jobs worked [ 0 ], inserted [ 1000000 ], job/sec [ 0.0 ] [0s] bench: jobs worked [ 83657 ], inserted [ 0 ], job/sec [ 41828.5 ] [2s] bench: jobs worked [ 76648 ], inserted [ 0 ], job/sec [ 38324.0 ] [2s] bench: jobs worked [ 88036 ], inserted [ 0 ], job/sec [ 44018.0 ] [2s] bench: jobs worked [ 75473 ], inserted [ 0 ], job/sec [ 37736.5 ] [2s] bench: jobs worked [ 82604 ], inserted [ 0 ], job/sec [ 41302.0 ] [2s] bench: jobs worked [ 84048 ], inserted [ 0 ], job/sec [ 42024.0 ] [2s] bench: jobs worked [ 85508 ], inserted [ 0 ], job/sec [ 42754.0 ] [2s] bench: jobs worked [ 90580 ], inserted [ 0 ], job/sec [ 45290.0 ] [2s] bench: jobs worked [ 83568 ], inserted [ 0 ], job/sec [ 41784.0 ] [2s] bench: jobs worked [ 86062 ], inserted [ 0 ], job/sec [ 43031.0 ] [2s] bench: jobs worked [ 88508 ], inserted [ 0 ], job/sec [ 44254.0 ] [2s] bench: jobs worked [ 75308 ], inserted [ 0 ], job/sec [ 37654.0 ] [2s] bench: total jobs worked [ 1000000 ], total jobs inserted [ 1000000 ], overall job/sec [ 42331.9 ], running 23.622860125s The improved benchmarks only work for fixed job burndown mode (with the `--num-total-jobs` option) because inserting jobs is still pretty slow because it's still done one by one. Once again, I'm pretty sure I'll be able to land some SQLite fixes that'll make batch operations possible using `json_each`, and then we should be able to make all normal operations batch-wise. That'll take some time though, and we can get this optimization out in time for the initial SQLite release.
1 parent aeaba05 commit 2bf276b

File tree

4 files changed

+141
-6
lines changed

4 files changed

+141
-6
lines changed

internal/riverinternaltest/riverdrivertest/riverdrivertest.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2261,7 +2261,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
22612261
t.Run("CompletesARunningJob", func(t *testing.T) {
22622262
t.Parallel()
22632263

2264-
exec, _ := setup(ctx, t)
2264+
exec, bundle := setup(ctx, t)
22652265

22662266
now := time.Now().UTC()
22672267

@@ -2274,7 +2274,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
22742274
require.NoError(t, err)
22752275
jobAfter := jobsAfter[0]
22762276
require.Equal(t, rivertype.JobStateCompleted, jobAfter.State)
2277-
require.WithinDuration(t, now, *jobAfter.FinalizedAt, time.Microsecond)
2277+
require.WithinDuration(t, now, *jobAfter.FinalizedAt, bundle.driver.TimePrecision())
22782278

22792279
jobUpdated, err := exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID, Schema: ""})
22802280
require.NoError(t, err)

riverdriver/riversqlite/internal/dbsqlc/river_job.sql

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,16 @@ WHERE id = @id
380380
AND state != 'running'
381381
RETURNING *;
382382

383+
-- This doesn't exist under the Postgres driver, but is used for an optimized
384+
-- happy path for setting jobs to `complete` where metadata isn't required.
385+
-- name: JobSetCompletedIfRunning :many
386+
UPDATE /* TEMPLATE: schema */river_job
387+
SET finalized_at = coalesce(cast(sqlc.narg('finalized_at') AS text), datetime('now', 'subsec')),
388+
state = 'completed'
389+
WHERE id IN (sqlc.slice('id'))
390+
AND state = 'running'
391+
RETURNING *;
392+
383393
-- Differs significantly from the Postgres version in that it can't do a bulk
384394
-- update, and since sqlc doesn't support `UPDATE` in CTEs, we need separate
385395
-- queries like JobSetMetadataIfNotRunning to do the fallback work.

riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go

Lines changed: 69 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

riverdriver/riversqlite/river_sqlite_driver.go

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@
1111
// `dbPool.SetMaxOpenConns(1)`.
1212
//
1313
// A known deficiency in this driver compared to Postgres is that due to
14-
// limitations in sqlc, it performs operations like completion and `InsertMany`
15-
// one row at a time instead of in batches. This means that it's slower than the
16-
// Postgres driver, especially when benchmarking.
14+
// limitations in sqlc, it performs bulk operations like non-standard
15+
// completions and `InsertMany` one row at a time instead of in batches. This
16+
// means that it processes batches more slowly than the Postgres driver.
1717
package riversqlite
1818

1919
import (
@@ -40,6 +40,7 @@ import (
4040
"github.com/riverqueue/river/riverdriver/riversqlite/internal/dbsqlc"
4141
"github.com/riverqueue/river/rivershared/sqlctemplate"
4242
"github.com/riverqueue/river/rivershared/uniquestates"
43+
"github.com/riverqueue/river/rivershared/util/maputil"
4344
"github.com/riverqueue/river/rivershared/util/ptrutil"
4445
"github.com/riverqueue/river/rivershared/util/randutil"
4546
"github.com/riverqueue/river/rivershared/util/sliceutil"
@@ -683,8 +684,63 @@ func (e *Executor) JobSetStateIfRunningMany(ctx context.Context, params *riverdr
683684
ctx = schemaTemplateParam(ctx, params.Schema)
684685
dbtx := templateReplaceWrapper{dbtx: e.driver.UnwrapTx(execTx), replacer: &e.driver.replacer}
685686

687+
// Because it's by far the most common path, put in an optimization for
688+
// jobs that we're setting to `completed` that don't have any metadata
689+
// updates needed. Group those jobs out and complete them all in one
690+
// query, then continue on and do all the other updates.
691+
var (
692+
completedIDs = make([]int64, 0, len(params.ID))
693+
completedIndexes = make(map[int64]int, len(params.ID)) // job ID -> params index (for setting result)
694+
)
695+
for i, id := range params.ID {
696+
if params.State[i] == rivertype.JobStateCompleted && !params.MetadataDoMerge[i] {
697+
completedIDs = append(completedIDs, id)
698+
completedIndexes[id] = i
699+
}
700+
}
701+
702+
if len(completedIDs) > 0 {
703+
jobs, err := dbsqlc.New().JobSetCompletedIfRunning(ctx, dbtx, &dbsqlc.JobSetCompletedIfRunningParams{
704+
ID: completedIDs,
705+
FinalizedAt: timeStringNullable(params.Now),
706+
})
707+
if err != nil {
708+
return fmt.Errorf("error setting completed state on jobs: %w", err)
709+
}
710+
711+
for _, job := range jobs {
712+
setRes[completedIndexes[job.ID]], err = jobRowFromInternal(job)
713+
if err != nil {
714+
return err
715+
}
716+
delete(completedIndexes, job.ID)
717+
}
718+
719+
// Fetch any jobs that weren't set by the query above because they
720+
// weren't `running`. In practice this should be quite rare, but we
721+
// check for it in the test suite.
722+
if len(completedIndexes) > 0 {
723+
jobs, err := dbsqlc.New().JobGetByIDMany(ctx, dbtx, maputil.Keys(completedIndexes))
724+
if err != nil {
725+
return fmt.Errorf("error getting non-running jobs: %w", err)
726+
}
727+
728+
for _, job := range jobs {
729+
setRes[completedIndexes[job.ID]], err = jobRowFromInternal(job)
730+
if err != nil {
731+
return err
732+
}
733+
}
734+
}
735+
}
736+
686737
// Should be a batch insert, but that's currently impossible with SQLite/sqlc. https://github.com/sqlc-dev/sqlc/issues/3802
687-
for i := range params.ID {
738+
for i, id := range params.ID {
739+
// Skip job if we handled it in the happy path optimization above.
740+
if _, ok := completedIndexes[id]; ok {
741+
continue
742+
}
743+
688744
setStateParams := &dbsqlc.JobSetStateIfRunningParams{
689745
ID: params.ID[i],
690746
Error: []byte("{}"), // even if not used, must be valid JSON because it's bed into the `json` function

0 commit comments

Comments
 (0)