Cross Posted from Our CEO’s blog
In this article we will see how to leverage Postgres support for pipelining queries which “allows applications to send a query without having to read the result of the previously sent query”. This allows you for example to perform batch updates which you can send serially and comes with a significant performance boost in processing.
Taking advantage of the pipeline mode, a client will wait less for the server, since multiple queries/results can be sent/received in a single network transaction.
In this article we will see how to issue pipelined queries using pgx library in Go
You will need the following
Postgresql version 14 and above
Go 1.24+
Example: Refund each customer by 5% of their balance
In this article I present a self contained code example that will 1) created a new table, 2) populate it with customers that have random balances and then, 3) uses pipelined queries to process refund for each client.
Because the queries are pipelined, it means our refund queries don’t have to wait for one customers refund query to complete to send the next one.
Now let’s code! Create a new Go project and copy the following code:
$ mkdir pipelinedqueries
$ cd pipelinedqueries
$ go mod init pipelinedqueries
Copy the following into a new file named main.go
package main
import (
"context"
"fmt"
"math"
"math/rand"
"os"
"time"
"github.com/go-faker/faker/v4"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/shopspring/decimal"
)
const (
sqlCreateUsersTable = `
create table pipeline_customers (
id serial not null primary key,
name text not null,
balance numeric(18,6) not null default 0,
last_active_at timestamptz
);
`
sqlRefundEachCustomer = `UPDATE pipeline_customers SET balance = balance + $1 where id = $2`
)
type Customer struct {
ID int64 `db:"id"`
Name string `db:"name"`
Balance decimal.Decimal `db:"balance"`
LastActiveAt time.Time `db:"last_active_at"`
}
func createCustomers(conn *pgx.Conn, N int) error {
_, err := conn.Exec(context.Background(), `DROP TABLE IF EXISTS pipeline_customers;`)
if err != nil {
return err
}
_, err = conn.Exec(context.Background(), sqlCreateUsersTable)
if err != nil {
return err
}
for i := 0; i < N; i++ {
_, err := conn.Exec(context.Background(), `INSERT INTO pipeline_customers(name, balance, last_active_at) VALUES ($1, $2, $3)`,
faker.Name(),
decimal.NewFromFloat32(1000.0*float32(rand.Intn(100))*1.31425326),
time.Now().Add(time.Duration(rand.Intn(72)*-1)*time.Hour),
)
if err != nil {
return err
}
}
return nil
}
func fetchAllCustomers(conn *pgx.Conn) ([]Customer, error) {
customers := make([]Customer, 0)
rows, err := conn.Query(context.Background(), `SELECT id,name,balance,last_active_at FROM pipeline_customers order by id asc`)
if err != nil {
return nil, err
}
for rows.Next() {
customer := Customer{}
err := rows.Scan(&customer.ID, &customer.Name, &customer.Balance, &customer.LastActiveAt)
if err != nil {
return nil, err
}
customers = append(customers, customer)
}
return customers, nil
}
// refundAllCustomers uses pgx's support for pipelined queiries to update each customer balance in a pipeline
func refundAllCustomers(conn *pgx.Conn, customers []Customer, injectErrors bool) error {
ctx := context.Background()
pipeline := conn.PgConn().StartPipeline(ctx)
eqb := pgx.ExtendedQueryBuilder{}
// introduce error for 1% random customers
nErrors := int(math.Round(float64(len(customers)) * 0.01))
errorsInjected := 0
for _, customer := range customers {
// calculate 5% of the customers balance
customerRefundAmount, _ := customer.Balance.Mul(decimal.NewFromFloat32(0.05)).Float64()
fmt.Println("processing customer", customer.Name, customerRefundAmount)
queryArgs := []any{
customerRefundAmount,
customer.ID,
}
// introduce error random customers
if injectErrors && errorsInjected < nErrors {
if rand.Intn(2) >= 1 {
queryArgs = []any{
fmt.Sprintf("%f%s", customerRefundAmount, "RANDOM_STRING"),
customer.ID,
}
errorsInjected += 1
}
}
err := eqb.Build(conn.TypeMap(), nil, queryArgs)
if err != nil {
return fmt.Errorf("failed to buld query: %v", err)
}
pipeline.SendQueryParams(sqlRefundEachCustomer, eqb.ParamValues, nil, eqb.ParamFormats, eqb.ResultFormats)
}
err := pipeline.Sync()
if err != nil {
return fmt.Errorf("failed to sync pipeline query: %v", err)
}
results, err := pipeline.GetResults()
if err != nil {
return fmt.Errorf("failed to get pipeline results: %v", err)
}
if results == nil && err == nil {
// no results received from server
return nil
}
// spew.Dump(results)
_, ok := results.(*pgconn.ResultReader)
if !ok {
return fmt.Errorf("failed to read results from pipelined query: %v", err)
}
err = pipeline.Close()
if err != nil {
return fmt.Errorf("failed to close pipeline: %v", err)
}
return nil
}
func printCustomersTable(customers []Customer, n int) {
if len(customers) < n {
return
}
for i := 0; i < n; i++ {
customer := customers[i]
fmt.Printf("Customer: %s \t\t Last Active: %s\n", customer.Name, customer.LastActiveAt.Format(time.DateOnly))
fmt.Printf("Balance: %s\n---\n", customer.Balance)
}
}
func main() {
dbURL := "postgres://user:password@localhost:5432/pipeline_tutorial"
conn, err := pgx.Connect(context.Background(), dbURL)
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)
os.Exit(1)
}
// comment out the next few lines to prevent creating new customers on each run
err = createCustomers(conn, 100) // N = number of customers
if err != nil {
panic(fmt.Errorf("createCustomers: %v", err))
}
customers, err := fetchAllCustomers(conn)
if err != nil {
panic(fmt.Errorf("fetchAllCustomers: %v", err))
}
// printCustomersTable(customers, 10)
err = refundAllCustomers(conn, customers, true)
if err != nil {
panic(fmt.Errorf("refundAllCustomers: %v", err))
}
fmt.Println("------- AFTER PIPELINE QUERY -------")
otherCustomers, err := fetchAllCustomers(conn)
if err != nil {
panic(fmt.Errorf("fetchAllCustomers: %v", err))
}
printCustomersTable(otherCustomers, 10)
}
Side Note on SQL errors in pipeline queries: I have intentionally added error injection to this example code to show you how errors are treated when working with pipelined queries. We inject random error that results in INVALID SQL for 1% of our customers. What you will note when you run the code and the error conditions are satisfied is that the whole pipeline fails - this should give you confidence that SQL or database level errors won’t silently fail during processing pipeline. Logic errors are still something you have to make sure to handle.
The code with the following command:
go run main.go
Let’s take a closer look at the core parts of the function that handles the Postgresql pipeline queries, I’ve stripped out the error injection and annotated the relevant lines :
// refundAllCustomers uses pgx's support for pipelined queries to update each customer balance in a pipeline
func refundAllCustomers(conn *pgx.Conn, customers []Customer, injectErrors bool) error {
ctx := context.Background()
pipeline := conn.PgConn().StartPipeline(ctx) // 1
eqb := pgx.ExtendedQueryBuilder{} // 2
for _, customer := range customers {
// calculate 5% of the customers balance
customerRefundAmount, _ := customer.Balance.Mul(decimal.NewFromFloat32(0.05)).Float64()
fmt.Println("processing customer", customer.Name, customerRefundAmount)
queryArgs := []any{
customerRefundAmount,
customer.ID,
}
err := eqb.Build(conn.TypeMap(), nil, queryArgs) // 3
if err != nil {
return fmt.Errorf("failed to buld query: %v", err)
}
// 4 below
pipeline.SendQueryParams(sqlRefundEachCustomer, eqb.ParamValues, nil, eqb.ParamFormats, eqb.ResultFormats)
}
err := pipeline.Sync() // 5
if err != nil {
return fmt.Errorf("failed to sync pipeline query: %v", err)
}
results, err := pipeline.GetResults() // 6
if err != nil {
return fmt.Errorf("failed to get pipeline results: %v", err)
}
if results == nil && err == nil {
// no results received from server
return nil
}
// spew.Dump(results)
_, ok := results.(*pgconn.ResultReader) // 7
if !ok {
return fmt.Errorf("failed to read results from pipelined query: %v", err)
}
err = pipeline.Close() // 8
if err != nil {
return fmt.Errorf("failed to close pipeline: %v", err)
}
return nil
}
Several things are happening in the code above, let’s focus on the interesting bits:
Obtain a Pipeline
pipelinefrom the connection by starting the pipeline.Create an extended query builder that’s used to pass the query parameters
Build the query with a type mapping and the arguments for our SQL query
We send the query into the pipeline using the parameters from the extended query builder from the previous step. Since this is running in pipeline, we can continue sending multiple queries to Postgres without for it to process it (note that we are doing this inside the loop)
We then call sync on the pipeline to flush/execute the submitted queries and make the results available
We call
GetResults()to obtain the results of the pipeline queryWe check if the pipeline gave us a result we can read from, in this example we ignore the result reader
We close the pipeline and release related resources
On performance and resource utilization
Pipeline mode generally consumes more memory on both the client and server as noted in the Postgres documentation - this makes intuitive sense since we are buffering the queries and their parameters as we run the pipeline, especially before flushing/syncing. It is recommended to do appropriate management of the send/receive queue and to, of course, monitor the behaviour of your systems to identify bottlenecks or spikes in resource utilization.
Conclusion
In this article I shared how to use pipelined queries in Postgres from a Go program. Pipelined queries can open up a lot of use cases where you need to do batch operations but the data is coming in stream or whatever other use cases you can think of. As a reminder, pipeline mode generally consumes more memory on both the client and server and it is recommended to do appropriate management of the send/receive queue and monitor the behaviour of your systems to avoid bottlenecks.
Hope you found this interesting.