Skip to content

Commit 1ebf29c

Browse files
committed
demonstrate alternative constructor w/ listener support
Add a `NewWithListener` constructor to `riverdatabasesql` that allows the `database/sql` driver to be used with a functioning listener implementation. Also add a `NewListener` constructor to the `riverpgxv5` driver to allow creating a listener with a raw pgx pool. These can be combined to allow full listener support as long as the underlying database driver supports it, even when it's used within an abstraction like `database/sql` or Bun.
1 parent 755295f commit 1ebf29c

File tree

2 files changed

+41
-5
lines changed

2 files changed

+41
-5
lines changed

riverdriver/riverdatabasesql/river_database_sql.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ import (
2424

2525
// Driver is an implementation of riverdriver.Driver for database/sql.
2626
type Driver struct {
27-
dbPool *sql.DB
28-
queries *dbsqlc.Queries
27+
dbPool *sql.DB
28+
listener riverdriver.Listener
29+
queries *dbsqlc.Queries
2930
}
3031

3132
// New returns a new database/sql River driver for use with River.
@@ -41,13 +42,28 @@ func New(dbPool *sql.DB) *Driver {
4142
return &Driver{dbPool: dbPool, queries: dbsqlc.New()}
4243
}
4344

45+
// NewWithListener returns a new database/sql River driver for use with River
46+
// just like New, except it also takes a riverdriver.Listener to use for
47+
// listening to notifications.
48+
func NewWithListener(dbPool *sql.DB, listener riverdriver.Listener) *Driver {
49+
driver := New(dbPool)
50+
driver.listener = listener
51+
return driver
52+
}
53+
4454
func (d *Driver) GetExecutor() riverdriver.Executor {
4555
return &Executor{d.dbPool, d.dbPool, dbsqlc.New()}
4656
}
4757

48-
func (d *Driver) GetListener() riverdriver.Listener { panic(riverdriver.ErrNotImplemented) }
49-
func (d *Driver) HasPool() bool { return d.dbPool != nil }
50-
func (d *Driver) SupportsListener() bool { return false }
58+
func (d *Driver) GetListener() riverdriver.Listener {
59+
if d.listener == nil {
60+
panic(riverdriver.ErrNotImplemented)
61+
}
62+
return d.listener
63+
}
64+
65+
func (d *Driver) HasPool() bool { return d.dbPool != nil }
66+
func (d *Driver) SupportsListener() bool { return d.listener != nil }
5167

5268
func (d *Driver) UnwrapExecutor(tx *sql.Tx) riverdriver.ExecutorTx {
5369
return &ExecutorTx{Executor: Executor{nil, tx, dbsqlc.New()}, tx: tx}

riverdriver/riverpgxv5/river_pgx_v5_driver.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,26 @@ type Listener struct {
567567
mu sync.Mutex
568568
}
569569

570+
// NewListener returns a Listener that can be used to enable other drivers
571+
// (notably `riverdatabasesql`) to listen for notifications when their
572+
// abstraction doesn't allow direct access to pgx connections, even though they
573+
// are using pgx under the hood.
574+
//
575+
// Users of `database/sql` or Bun can use this with the
576+
// `riverdatabasesql.NewWithListener` constructor to enable listener support in
577+
// that driver.
578+
//
579+
// The dbPool will solely be used for acquiring new connections for `LISTEN`
580+
// commands. As such, a pool must be provided that supports that command. Users
581+
// of pgbouncer should ensure that this specific pool is configured with session
582+
// pooling, even if their main application does not use session pooling.
583+
//
584+
// A single Client will never use more than one listener connection at a time,
585+
// no matter how many topics are being listened to.
586+
func NewListener(dbPool *pgxpool.Pool) riverdriver.Listener {
587+
return &Listener{dbPool: dbPool}
588+
}
589+
570590
func (l *Listener) Close(ctx context.Context) error {
571591
l.mu.Lock()
572592
defer l.mu.Unlock()

0 commit comments

Comments
 (0)