Database connector
Database connectors allow to connect to databases, such as PostgreSQL, MySQL, or Snowflake, to execute queries to import and export data.
Database connectors, like other types of connectors, are written in Go. A connector is a Go module that implements specific functions and methods.
Quick start
In the creation of a new Go module, for your database connector, you can utilize the following template by pasting it into a Go file. Customize the template with your desired package name, type name, and pertinent connector information:
// Package postgresql provides a connector for PostgreSQL.
package postgresql
import (
"context"
"github.com/krenalis/krenalis/connectors"
"github.com/krenalis/krenalis/tools/types"
)
func init() {
connectors.RegisterDatabase(connectors.DatabaseInfo{
Code: "postgresql",
Label: "PostgreSQL",
Categories: connectors.CategoryDatabase,
SampleQuery: "SELECT *\nFROM users\nWHERE updated_at >= ${updated_at}\n",
}, New)
}
type PostgreSQL struct {
// Your connector fields.
}
// New returns a new connector instance for PostgreSQL.
func New(env *connectors.DatabaseEnv) (*PostgreSQL, error) {
// ...
}
// Close closes the database.
func (ps *PostgreSQL) Close() error {
// ...
}
// Columns returns the columns of the given table.
func (ps *PostgreSQL) Columns(ctx context.Context, table string) ([]connectors.Column, error) {
// ...
}
// Merge performs batch insert and update operations on the specified table,
// basing on the table keys.
func (ps *PostgreSQL) Merge(ctx context.Context, table connectors.Table, rows [][]any) error {
// ...
}
// Query executes the given query and returns the resulting rows and columns.
func (ps *PostgreSQL) Query(ctx context.Context, query string) (connectors.Rows, []connectors.Column, error) {
// ...
}
// SQLLiteral returns the SQL literal representation of v according to the
// provided Krenalis type t. It supports nil values and the following Krenalis
// types: string, datetime, date, and json.
func (ps *PostgreSQL) SQLLiteral(value any, typ types.Type) string {
// ...
}
Implementation
Let's explore how to implement a database connector, for example for PostgreSQL.
First create a Go module:
mkdir postgresql
cd postgresql
go mod init postgresql
Then add a Go file to the new directory. For example copy the previous template file.
Later on, you can build an executable with your connector.
About the connector
The DatabaseInfo type describes information about the database connector:
Code: unique identifier in kebab-case (a-z0-9-), e.g. "postgresql", "mysql", "snowflake".Label: display label in the Admin console, typically the database's name (e.g. "PostgreSQL", "MySQL", "Snowflake").Categories: the categories the connector belongs to. There must be at least one category, but for a database connector it should be onlyconnectors.CategoryDatabase.SampleQuery: sample query displayed in the query editor when creating a new database source pipeline.TimeLayouts: layouts for thedatetime,date, andtimevalues when they are represented as strings. See Time layouts in Data values for more details.
This information is passed to the RegisterDatabase function that, executed during package initialization, registers the database connector:
func init() {
connectors.RegisterDatabase(connectors.DatabaseInfo{
Code: "postgresql",
Label: "PostgreSQL",
Category: connectors.CategoryDatabase,
SampleQuery: "SELECT *\nFROM users\n",
}, New)
}
Constructor
The second argument supplied to the RegisterDatabase function is the function utilized for creating a connector instance:
func New(env *connectors.DatabaseEnv) (*PostgreSQL, error)
This function accepts a database environment and yields a value representing your custom type.
The structure of DatabaseEnv is outlined as follows:
// DatabaseEnv is the environment for a database connector.
type DatabaseEnv struct {
// Settings holds the raw settings data.
Settings json.Value
// SetSettings is the function used to update the settings.
SetSettings SetSettingsFunc
}
Settings: Contains the instance settings in JSON format. Further details on how the connector defines its settings will be discussed later.SetSetting: A function that enables the connector to update its settings as necessary.
Close method
Close() error
The Close method is invoked by Krenalis when no calls to the connector instance's methods are in progress and no more will be made, so the connector can close any connections eventually opened with the database.
Columns method
Columns(ctx context.Context, table string) ([]connectors.Column, error)
Krenalis invokes the Columns method when creating or updating a database destination pipeline, retrieving the columns of the table to which data should be exported.
The Columns method returns the table's columns as a slice of Column values, detailing the names and types of each column:
// Column represents a database table column.
type Column struct {
Name string // column name
Type types.Type // data type of the column
Nullable bool // true if the column can contain NULL values
Writable bool // true if the column is writable
Issue string // issue message
}
Handling column issues
If a column's type is not supported, its name is not a valid property name, or any other issue occurs with the column, leave Column.Type unset. Likewise, leave the other fields unset, as they are not relevant in this case, and describe the issue in Column.Issue.
Such a column will not appear among the available table columns. However, the issue will be brought to the user's attention without preventing the use of the other columns.
The following are examples of common issue messages used by database connectors:
- Column "perf" has an unsupported type "INT96".
- Column "score:value " does not have a valid property name. Valid names start with a letter or underscore, followed by only letters, numbers, or underscores.
- Column "amount" has a precision of 100, which exceeds the maximum supported precision of 76.
- Column "value" has a scale of 50, which exceeds the maximum supported precision of 37.
Merge method
Merge(ctx context.Context, table connectors.Table, rows [][]any) error
The Merge method is used by Krenalis during data export to a database. It updates existing rows if matching keys are found, or inserts new rows into the specified table.
The table parameter provides details about the table to update, including its name, columns, and keys. Defined as:
type Table struct {
Name string
Columns []Column
Keys []string
}
Name: The name of the table.Columns: The columns in the table that need to be updated. It may not include all the columns in the table.Keys: The columns that serve as keys for the table. This typically includes the primary key but does not have to. The columns specified intable.Keysare also included intable.Columns.
The rows parameter contains the rows to be updated or inserted. For each row, row[i] contains the value for the column table.Columns[i]. If a column is nullable, the corresponding value in a raw can be nil, representing the NULL value in the database.
Krenalis ensures that table.Columns (and consequently each row in rows) contains at least one additional column besides the table key values.
Furthermore, Krenalis ensures that during the entire execution of an export to the database, to the Merge method are never passed two or more duplicate rows, meaning rows that have the same value for the table keys.
A database connector can require that the columns in table.Keys form the primary key and can return an error if they do not.
Query method
Query(ctx context.Context, query string) (connectors.Rows, []connectors.Column, []string, error)
Krenalis invokes the Query method when previewing the rows returned by a query while creating or updating a database source pipeline, and to get the data during an import. The query is provided after replacing any placeholders like ${limit}.
The Query method runs the query and gives back two things: the rows themselves, which follow the Rows interface, and the columns as a slice of Column values. Here's what the Rows interface look like:
type Rows interface {
Close() error
Err() error
Next() bool
Scan(dest ...any) error
}
The standard Go library's sql.Rows type implements this interface. So, the connector can just return a sql.Rows value.
Here's what the Column type look like:
// Column represents a database table column.
type Column struct {
Name string // column name
Type types.Type // data type of the column
Nullable bool // true if the column can contain NULL values
Writable bool // true if the column is writable (generally false for columns returned by Query).
Issue string // issue message
}
Handling column issues
If a column's type is not supported, its name is not a valid property name, or any other issue occurs with the column, leave Column.Type unset. Likewise, leave the other fields unset, as they are not relevant in this case, and describe the issue in Column.Issue.
Such a column will not appear among the available table columns. However, the issue will be brought to the user's attention without preventing the use of the other columns.
The following are examples of common issue messages used by database connectors:
- Column "perf" has an unsupported type "INT96".
- Column "score:value " does not have a valid property name. Valid names start with a letter or underscore, followed by only letters, numbers, or underscores.
- Column "amount" has a precision of 100, which exceeds the maximum supported precision of 76.
- Column "value" has a scale of 50, which exceeds the maximum supported precision of 37.
SQLLiteral method
SQLLiteral(v any, t types.Type) string
The SQLLiteral method must return the SQL literal representation of v according to the provided Krenalis type t.
Currently, the values passed to the method are either nil or values corresponding to the Krenalis types string, datetime, date, and json.
The following table illustrates sample inputs to SQLLiteral and the corresponding literal returned by the PostgreSQL connector:
| Call | PostgreSQL return value |
|---|---|
SQLLiteral(nil, types.Type{}) |
NULL |
SQLLiteral("foo", types.String()) |
'foo' |
SQLLiteral("{\"boo\":5}", types.JSON()) |
'{"boo": 5}' |
SQLLiteral(time.Date(2025, 12, 9, 12, 53, 45, 730139838, time.UTC), types.DateTime()) |
'2025-12-09 12:53:45.730139' |
SQLLiteral(time.Date(2025, 12, 9, 0, 0, 0, 0, time.UTC), types.Date()) |
'2025-12-09' |
A connector only needs to implement SQLLiteral for the Krenalis types it actually supports. For example, if a connector never returns values of type json, it will never receive a value of that type and therefore does not need to implement JSON literal handling.
SQLLiteral with the updated_at placeholder
Krenalis calls the SQLLiteral method when building a SQL query to replace the updated_at placeholder. Consider the following query:
SELECT first_name, last_name, phone_number
FROM customers
WHERE updated_at >= ${updated_at} OR ${updated_at} IS NULL
ORDER BY updated_at
The call SQLLiteral(time.Date(2025, 01, 30, 16, 12, 25, 837, time.UTC), types.DateTime()) might return "'2025-01-30 16:12:25.837'", resulting in:
SELECT first_name, last_name, phone_number
FROM customers
WHERE updated_at >= '2025-01-30 16:12:25.837' OR '2025-01-30 16:12:25.837' IS NULL
ORDER BY updated_at
The call SQLLiteral("2025-02-13T16:12:25", types.String()) might return "'2025-02-13T16:12:25'", resulting in:
SELECT first_name, last_name, phone_number
FROM customers
WHERE updated_at >= '2025-02-13T16:12:25' OR '2025-02-13T16:12:25' IS NULL
ORDER BY updated_at
The call SQLLiteral(nil, time.Time{}) might return "NULL", resulting in:
SELECT first_name, last_name, phone_number
FROM customers
WHERE updated_at >= NULL OR NULL IS NULL
ORDER BY updated_at
In this case, updated_at >= NULL OR NULL IS NULL evaluates to TRUE, returning all rows as expected.