Database connector

View as Markdown

Database connectors allow to connect to databases, such as PostgreSQL, MySQL, or Snowflake, to execute queries to import and export data.

Database connectors, like other types of connectors, are written in Go. A connector is a Go module that implements specific functions and methods.

Quick start

In the creation of a new Go module, for your database connector, you can utilize the following template by pasting it into a Go file. Customize the template with your desired package name, type name, and pertinent connector information:

// Package postgresql provides a connector for PostgreSQL.
package postgresql

import (
    "context"

    "github.com/krenalis/krenalis/connectors"
    "github.com/krenalis/krenalis/tools/types"
)

func init() {
    connectors.RegisterDatabase(connectors.DatabaseInfo{
        Code:        "postgresql",
        Label:       "PostgreSQL",
        Categories:  connectors.CategoryDatabase,
        SampleQuery: "SELECT *\nFROM users\nWHERE updated_at >= ${updated_at}\n",
    }, New)
}

type PostgreSQL struct {
    // Your connector fields.
}

// New returns a new connector instance for PostgreSQL.
func New(env *connectors.DatabaseEnv) (*PostgreSQL, error) {
    // ...
}

// Close closes the database.
func (ps *PostgreSQL) Close() error {
    // ...
}

// Columns returns the columns of the given table.
func (ps *PostgreSQL) Columns(ctx context.Context, table string) ([]connectors.Column, error) {
    // ...
}

// Merge performs batch insert and update operations on the specified table,
// basing on the table keys.
func (ps *PostgreSQL) Merge(ctx context.Context, table connectors.Table, rows [][]any) error {
    // ...
}

// Query executes the given query and returns the resulting rows and columns.
func (ps *PostgreSQL) Query(ctx context.Context, query string) (connectors.Rows, []connectors.Column, error) {
    // ...
}

// SQLLiteral returns the SQL literal representation of v according to the
// provided Krenalis type t. It supports nil values and the following Krenalis
// types: string, datetime, date, and json.
func (ps *PostgreSQL) SQLLiteral(value any, typ types.Type) string {
    // ...
}

Implementation

Let's explore how to implement a database connector, for example for PostgreSQL.

First create a Go module:

mkdir postgresql
cd postgresql
go mod init postgresql

Then add a Go file to the new directory. For example copy the previous template file.

Later on, you can build an executable with your connector.

About the connector

The DatabaseInfo type describes information about the database connector:

  • Code: unique identifier in kebab-case (a-z0-9-), e.g. "postgresql", "mysql", "snowflake".
  • Label: display label in the Admin console, typically the database's name (e.g. "PostgreSQL", "MySQL", "Snowflake").
  • Categories: the categories the connector belongs to. There must be at least one category, but for a database connector it should be only connectors.CategoryDatabase.
  • SampleQuery: sample query displayed in the query editor when creating a new database source pipeline.
  • TimeLayouts: layouts for the datetime, date, and time values when they are represented as strings. See Time layouts in Data values for more details.

This information is passed to the RegisterDatabase function that, executed during package initialization, registers the database connector:

func init() {
    connectors.RegisterDatabase(connectors.DatabaseInfo{
        Code:        "postgresql",
        Label:       "PostgreSQL",
        Category:    connectors.CategoryDatabase,
        SampleQuery: "SELECT *\nFROM users\n",
    }, New)
}

Constructor

The second argument supplied to the RegisterDatabase function is the function utilized for creating a connector instance:

func New(env *connectors.DatabaseEnv) (*PostgreSQL, error)

This function accepts a database environment and yields a value representing your custom type.

The structure of DatabaseEnv is outlined as follows:

// DatabaseEnv is the environment for a database connector.
type DatabaseEnv struct {

    // Settings holds the raw settings data.
    Settings json.Value

    // SetSettings is the function used to update the settings.
    SetSettings SetSettingsFunc
}
  • Settings: Contains the instance settings in JSON format. Further details on how the connector defines its settings will be discussed later.
  • SetSetting: A function that enables the connector to update its settings as necessary.

Close method

Close() error

The Close method is invoked by Krenalis when no calls to the connector instance's methods are in progress and no more will be made, so the connector can close any connections eventually opened with the database.

Columns method

Columns(ctx context.Context, table string) ([]connectors.Column, error)

Krenalis invokes the Columns method when creating or updating a database destination pipeline, retrieving the columns of the table to which data should be exported.

The Columns method returns the table's columns as a slice of Column values, detailing the names and types of each column:

// Column represents a database table column.
type Column struct {
    Name     string     // column name
    Type     types.Type // data type of the column
    Nullable bool       // true if the column can contain NULL values
    Writable bool       // true if the column is writable
    Issue    string     // issue message
}

Handling column issues

If a column's type is not supported, its name is not a valid property name, or any other issue occurs with the column, leave Column.Type unset. Likewise, leave the other fields unset, as they are not relevant in this case, and describe the issue in Column.Issue.

Such a column will not appear among the available table columns. However, the issue will be brought to the user's attention without preventing the use of the other columns.

The following are examples of common issue messages used by database connectors:

  • Column "perf" has an unsupported type "INT96".
  • Column "score:value " does not have a valid property name. Valid names start with a letter or underscore, followed by only letters, numbers, or underscores.
  • Column "amount" has a precision of 100, which exceeds the maximum supported precision of 76.
  • Column "value" has a scale of 50, which exceeds the maximum supported precision of 37.

Merge method

Merge(ctx context.Context, table connectors.Table, rows [][]any) error

The Merge method is used by Krenalis during data export to a database. It updates existing rows if matching keys are found, or inserts new rows into the specified table.

The table parameter provides details about the table to update, including its name, columns, and keys. Defined as:

type Table struct {
    Name    string
    Columns []Column
    Keys    []string
}
  • Name: The name of the table.
  • Columns: The columns in the table that need to be updated. It may not include all the columns in the table.
  • Keys: The columns that serve as keys for the table. This typically includes the primary key but does not have to. The columns specified in table.Keys are also included in table.Columns.

The rows parameter contains the rows to be updated or inserted. For each row, row[i] contains the value for the column table.Columns[i]. If a column is nullable, the corresponding value in a raw can be nil, representing the NULL value in the database.

Krenalis ensures that table.Columns (and consequently each row in rows) contains at least one additional column besides the table key values.

Furthermore, Krenalis ensures that during the entire execution of an export to the database, to the Merge method are never passed two or more duplicate rows, meaning rows that have the same value for the table keys.

A database connector can require that the columns in table.Keys form the primary key and can return an error if they do not.

Query method

Query(ctx context.Context, query string) (connectors.Rows, []connectors.Column, []string, error)

Krenalis invokes the Query method when previewing the rows returned by a query while creating or updating a database source pipeline, and to get the data during an import. The query is provided after replacing any placeholders like ${limit}.

The Query method runs the query and gives back two things: the rows themselves, which follow the Rows interface, and the columns as a slice of Column values. Here's what the Rows interface look like:

type Rows interface {
    Close() error
    Err() error
    Next() bool
    Scan(dest ...any) error
}

The standard Go library's sql.Rows type implements this interface. So, the connector can just return a sql.Rows value.

Here's what the Column type look like:

// Column represents a database table column.
type Column struct {
    Name     string     // column name
    Type     types.Type // data type of the column
    Nullable bool       // true if the column can contain NULL values
    Writable bool       // true if the column is writable (generally false for columns returned by Query).
    Issue    string     // issue message
}

Handling column issues

If a column's type is not supported, its name is not a valid property name, or any other issue occurs with the column, leave Column.Type unset. Likewise, leave the other fields unset, as they are not relevant in this case, and describe the issue in Column.Issue.

Such a column will not appear among the available table columns. However, the issue will be brought to the user's attention without preventing the use of the other columns.

The following are examples of common issue messages used by database connectors:

  • Column "perf" has an unsupported type "INT96".
  • Column "score:value " does not have a valid property name. Valid names start with a letter or underscore, followed by only letters, numbers, or underscores.
  • Column "amount" has a precision of 100, which exceeds the maximum supported precision of 76.
  • Column "value" has a scale of 50, which exceeds the maximum supported precision of 37.

SQLLiteral method

SQLLiteral(v any, t types.Type) string

The SQLLiteral method must return the SQL literal representation of v according to the provided Krenalis type t.
Currently, the values passed to the method are either nil or values corresponding to the Krenalis types string, datetime, date, and json.

The following table illustrates sample inputs to SQLLiteral and the corresponding literal returned by the PostgreSQL connector:

Call PostgreSQL return value
SQLLiteral(nil, types.Type{}) NULL
SQLLiteral("foo", types.String()) 'foo'
SQLLiteral("{\"boo\":5}", types.JSON()) '{"boo": 5}'
SQLLiteral(time.Date(2025, 12, 9, 12, 53, 45, 730139838, time.UTC), types.DateTime()) '2025-12-09 12:53:45.730139'
SQLLiteral(time.Date(2025, 12, 9, 0, 0, 0, 0, time.UTC), types.Date()) '2025-12-09'

A connector only needs to implement SQLLiteral for the Krenalis types it actually supports. For example, if a connector never returns values of type json, it will never receive a value of that type and therefore does not need to implement JSON literal handling.

SQLLiteral with the updated_at placeholder

Krenalis calls the SQLLiteral method when building a SQL query to replace the updated_at placeholder. Consider the following query:

SELECT first_name, last_name, phone_number
FROM customers
WHERE updated_at >= ${updated_at} OR ${updated_at} IS NULL 
ORDER BY updated_at

The call SQLLiteral(time.Date(2025, 01, 30, 16, 12, 25, 837, time.UTC), types.DateTime()) might return "'2025-01-30 16:12:25.837'", resulting in:

SELECT first_name, last_name, phone_number
FROM customers
WHERE updated_at >= '2025-01-30 16:12:25.837' OR '2025-01-30 16:12:25.837' IS NULL
ORDER BY updated_at

The call SQLLiteral("2025-02-13T16:12:25", types.String()) might return "'2025-02-13T16:12:25'", resulting in:

SELECT first_name, last_name, phone_number
FROM customers
WHERE updated_at >= '2025-02-13T16:12:25' OR '2025-02-13T16:12:25' IS NULL
ORDER BY updated_at

The call SQLLiteral(nil, time.Time{}) might return "NULL", resulting in:

SELECT first_name, last_name, phone_number
FROM customers
WHERE updated_at >= NULL OR NULL IS NULL
ORDER BY updated_at

In this case, updated_at >= NULL OR NULL IS NULL evaluates to TRUE, returning all rows as expected.