Skip to content

Commit 13d81f2

Browse files
committed
Add versionedjob contrib package with basic versioning framework
Here, add a contrib package called `versionedjob` that brings in a very basic job versioning framework. The framework is very lightweight and users could easily have written it themselves, but it might be a little non-obvious how to leverage hooks to bring it in. Contrib should be a really nice home for this feature because it's a little too trivial to bother with in core, but it's nice for people to have a semi-official reference that's well-maintained.
1 parent 491e03d commit 13d81f2

File tree

7 files changed

+678
-2
lines changed

7 files changed

+678
-2
lines changed

go.work

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
go 1.24.2
1+
go 1.25.5
22

33
use (
44
./datadogriver
5-
./otelriver
65
./nilerror
6+
./otelriver
77
./panictoerror
8+
./versionedjob
89
)

versionedjob/README.md

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
# versionedjob [![Build Status](https://github.com/riverqueue/rivercontrib/actions/workflows/ci.yaml/badge.svg?branch=master)](https://github.com/riverqueue/rivercontrib/actions) [![Go Reference](https://pkg.go.dev/badge/github.com/riverqueue/rivercontrib.svg)](https://pkg.go.dev/github.com/riverqueue/rivercontrib/versionedjob)
2+
3+
Provides a River hook with a simple job versioning framework. **Version transformers** are written for versioned jobs containing procedures for upgrading jobs that were encoded as older versions to the most modern version. This allows for workers to be implemented as if all job versions will be the most modern version only, keeping code simpler.
4+
5+
```go
6+
// VersionTransformer defines how to perform transformations between versions
7+
// for a specific job kind.
8+
type VersionTransformer interface {
9+
// Kind is the job kind that this transformer applies to.
10+
Kind() string
11+
12+
// VersionTransform applies version transformations to the given job. Version
13+
// transformations are fully defined according to the user, as well as how a
14+
// version is extracted from the job's args.
15+
//
16+
// Generally, this function should extract a version from the job, then
17+
// apply versions one by one until it's fully modernized to the point where
18+
// it can be successfully run by its worker.
19+
VersionTransform(job *rivertype.JobRow) error
20+
}
21+
```
22+
23+
## Example
24+
25+
Below are three versions of the same job: `VersionedJobArgsV1`, `VersionedJobArgsV2`, and the current version, `VersionedJobArgs`. From V1 to V2, `name` was renamed to `title`, and a `version` field added to track version. In V3, a new `description` property was added. A real program would only keep the latest version (`VersionedJobArgs`), but this example shows all three for reference.
26+
27+
```go
28+
type VersionedJobArgsV1 struct {
29+
Name string `json:"name"`
30+
}
31+
32+
type VersionedJobArgsV2 struct {
33+
Title string `json:"title"`
34+
Version int `json:"version"`
35+
}
36+
37+
type VersionedJobArgs struct {
38+
Description string `json:"description"`
39+
Title string `json:"title"`
40+
Version int `json:"version"`
41+
}
42+
```
43+
44+
The worker for `VersionedJobArgs` is written so it only handles the latest version (`title` instead of `name` and assumes `description` is present). This is possible because a `VersionTransformer` will handle migrating jobs from old versions to new ones before they hit the worker.
45+
46+
```go
47+
type VersionedJobWorker struct {
48+
river.WorkerDefaults[VersionedJobArgs]
49+
}
50+
51+
func (w *VersionedJobWorker) Work(ctx context.Context, job *river.Job[VersionedJobArgs]) error {
52+
fmt.Printf("Job title: %s; description: %s\n", job.Args.Title, job.Args.Description)
53+
return nil
54+
}
55+
```
56+
57+
The `VersionTransformer` implementation handles version upgrades one by one. Jobs which are multiple versions old can still be upgraded because multiple version changes can be applied in one go. This implementation uses `gjson`/`sjson` so that each change need only know a minimum about the data object in question and that unknown fields are retained. Other approaches are possible though, including using only Go's built-in `gjson` package.
58+
59+
```go
60+
type VersionedJobTransformer struct{}
61+
62+
func (*VersionedJobTransformer) VersionTransform(ctx context.Context, job *rivertype.JobRow) error {
63+
// Extract version from job, defaulting to 1 if not present because we
64+
// assume that was before versioning was introduced.
65+
version := cmp.Or(gjson.GetBytes(job.EncodedArgs, "version").Int(), 1)
66+
67+
var err error
68+
69+
//
70+
// Here, we walk through each successive version, applying transformations
71+
// to bring it to its next version. If a job is multiple versions behind,
72+
// version transformations are one-by-one applied in order until the job's
73+
// args are fully modernized.
74+
//
75+
76+
// Version change: V1 --> V2
77+
if version < 2 {
78+
version = 2
79+
80+
job.EncodedArgs, err = sjson.SetBytes(job.EncodedArgs, "title", gjson.GetBytes(job.EncodedArgs, "name").String())
81+
if err != nil {
82+
return err
83+
}
84+
85+
job.EncodedArgs, err = sjson.DeleteBytes(job.EncodedArgs, "name")
86+
if err != nil {
87+
return err
88+
}
89+
}
90+
91+
// Version change: V2 --> V3
92+
if version < 3 {
93+
version = 3
94+
95+
title := gjson.GetBytes(job.EncodedArgs, "title").String()
96+
if title == "" {
97+
return errors.New("no title found in job args")
98+
}
99+
100+
job.EncodedArgs, err = sjson.SetBytes(job.EncodedArgs, "description", "A description of a "+title+".")
101+
if err != nil {
102+
return err
103+
}
104+
}
105+
106+
// Not strictly necessary, but set version to latest.
107+
job.EncodedArgs, err = sjson.SetBytes(job.EncodedArgs, "version", version)
108+
if err != nil {
109+
return err
110+
}
111+
112+
return nil
113+
}
114+
115+
func (*VersionedJobTransformer) Kind() string { return (VersionedJobArgs{}).Kind() }
116+
```
117+
118+
A River client is initialized with the `versiondjob` hook and transformer installed:
119+
120+
```go
121+
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
122+
Hooks: []rivertype.Hook{
123+
versionedjob.NewHook(&versionedjob.HookConfig{
124+
Transformers: []versionedjob.VersionTransformer{
125+
&VersionedJobTransformer{},
126+
},
127+
}),
128+
},
129+
})
130+
if err != nil {
131+
panic(err)
132+
}
133+
```
134+
135+
With all that in place, a job of any version can be inserted and thanks to the version transformer modernizing the older ones, the worker will produce the same result regardless of input.
136+
137+
```go
138+
if _, err = riverClient.InsertMany(ctx, []river.InsertManyParams{
139+
{
140+
Args: VersionedJobArgsV1{
141+
Name: "My Job",
142+
},
143+
},
144+
{
145+
Args: VersionedJobArgsV2{
146+
Title: "My Job",
147+
Version: 2,
148+
},
149+
},
150+
{
151+
Args: VersionedJobArgs{
152+
Title: "My Job",
153+
Description: "A description of a My Job.",
154+
Version: 3,
155+
},
156+
},
157+
}); err != nil {
158+
panic(err)
159+
}
160+
```
161+
162+
```go
163+
// Output:
164+
// Job title: My Job; description: A description of a My Job.
165+
// Job title: My Job; description: A description of a My Job.
166+
// Job title: My Job; description: A description of a My Job.
167+
```

versionedjob/go.mod

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
module github.com/riverqueue/rivercontrib/versionedjob
2+
3+
go 1.25.5
4+
5+
require (
6+
github.com/jackc/pgx/v5 v5.7.6
7+
github.com/riverqueue/river v0.29.0
8+
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.29.0
9+
github.com/riverqueue/river/rivershared v0.29.0
10+
github.com/riverqueue/river/rivertype v0.29.0
11+
github.com/stretchr/testify v1.11.1
12+
github.com/tidwall/gjson v1.18.0
13+
github.com/tidwall/sjson v1.2.5
14+
)
15+
16+
require (
17+
github.com/davecgh/go-spew v1.1.1 // indirect
18+
github.com/jackc/pgpassfile v1.0.0 // indirect
19+
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
20+
github.com/jackc/puddle/v2 v2.2.2 // indirect
21+
github.com/pmezard/go-difflib v1.0.0 // indirect
22+
github.com/riverqueue/river/riverdriver v0.29.0 // indirect
23+
github.com/tidwall/match v1.2.0 // indirect
24+
github.com/tidwall/pretty v1.2.1 // indirect
25+
go.uber.org/goleak v1.3.0 // indirect
26+
golang.org/x/crypto v0.45.0 // indirect
27+
golang.org/x/sync v0.19.0 // indirect
28+
golang.org/x/text v0.32.0 // indirect
29+
gopkg.in/yaml.v3 v3.0.1 // indirect
30+
)

versionedjob/go.sum

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
2+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
3+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
4+
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 h1:Dj0L5fhJ9F82ZJyVOmBx6msDp/kfd1t9GRfny/mfJA0=
5+
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds=
6+
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
7+
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
8+
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
9+
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
10+
github.com/jackc/pgx/v5 v5.7.6 h1:rWQc5FwZSPX58r1OQmkuaNicxdmExaEz5A2DO2hUuTk=
11+
github.com/jackc/pgx/v5 v5.7.6/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M=
12+
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
13+
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
14+
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
15+
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
16+
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
17+
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
18+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
19+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
20+
github.com/riverqueue/river v0.29.0 h1:PMO4k6n7HcIjjgrbnG2UG04Exh8aLmQksOddOoYDASA=
21+
github.com/riverqueue/river v0.29.0/go.mod h1:S8BbQbxCrJLYygmnrnraltHhWlGzZzwjqcRbY3wdq7w=
22+
github.com/riverqueue/river/riverdriver v0.29.0 h1:o7mV07RPXrGJdwXUKxVTOyvG1/cDmJIMI3V4Le4/LBo=
23+
github.com/riverqueue/river/riverdriver v0.29.0/go.mod h1:bmkdn74EG4Ogsv44JkC1CBxFZ3JHfYsN+e0K8Dq0otU=
24+
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.29.0 h1:l3D17JWq/00QEt0bcawyDMxZYmM1YAk11Y/nRRVk5C8=
25+
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.29.0/go.mod h1:mpncN3m7DR7VpD78LV5CczbSpwkWcLeJ5j1kkJiOt9s=
26+
github.com/riverqueue/river/rivershared v0.29.0 h1:Niwbmp/CQAKPZ+zT3teCgEmPhksyW0f2cx4X03FurEk=
27+
github.com/riverqueue/river/rivershared v0.29.0/go.mod h1:74WjXTYKV4nTfLemIPloPqiA3Tjqe5BFvnALrNbS62k=
28+
github.com/riverqueue/river/rivertype v0.29.0 h1:26hpzbd44piqJZ+1zO4RO6GRKpmZVX3Ncx+Ki+w2gtg=
29+
github.com/riverqueue/river/rivertype v0.29.0/go.mod h1:rWpgI59doOWS6zlVocROcwc00fZ1RbzRwsRTU8CDguw=
30+
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
31+
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
32+
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
33+
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
34+
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
35+
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
36+
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
37+
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
38+
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
39+
github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
40+
github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
41+
github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
42+
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
43+
github.com/tidwall/match v1.2.0 h1:0pt8FlkOwjN2fPt4bIl4BoNxb98gGHN2ObFEDkrfZnM=
44+
github.com/tidwall/match v1.2.0/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
45+
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
46+
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
47+
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
48+
github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY=
49+
github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28=
50+
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
51+
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
52+
golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q=
53+
golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4=
54+
golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
55+
golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
56+
golang.org/x/text v0.32.0 h1:ZD01bjUt1FQ9WJ0ClOL5vxgxOI/sVCNgX1YtKwcY0mU=
57+
golang.org/x/text v0.32.0/go.mod h1:o/rUWzghvpD5TXrTIBuJU77MTaN0ljMWE47kxGJQ7jY=
58+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
59+
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
60+
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
61+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
62+
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
63+
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

0 commit comments

Comments
 (0)