# Database connector Database connectors allow to connect to databases, such as PostgreSQL, MySQL, or Snowflake, to execute queries to import and export data. Database connectors, like other types of connectors, are written in Go. A connector is a Go module that implements specific functions and methods. ## Quick start In the creation of a new Go module, for your database connector, you can utilize the following template by pasting it into a Go file. Customize the template with your desired package name, type name, and pertinent connector information: ```go // Package postgresql provides a connector for PostgreSQL. package postgresql import ( "context" "github.com/krenalis/krenalis/connectors" "github.com/krenalis/krenalis/tools/types" ) func init() { connectors.RegisterDatabase(connectors.DatabaseInfo{ Code: "postgresql", Label: "PostgreSQL", Categories: connectors.CategoryDatabase, SampleQuery: "SELECT *\nFROM users\nWHERE updated_at >= ${updated_at}\n", }, New) } type PostgreSQL struct { // Your connector fields. } // New returns a new connector instance for PostgreSQL. func New(env *connectors.DatabaseEnv) (*PostgreSQL, error) { // ... } // Close closes the database. func (ps *PostgreSQL) Close() error { // ... } // Columns returns the columns of the given table. func (ps *PostgreSQL) Columns(ctx context.Context, table string) ([]connectors.Column, error) { // ... } // Merge performs batch insert and update operations on the specified table, // basing on the table keys. func (ps *PostgreSQL) Merge(ctx context.Context, table connectors.Table, rows [][]any) error { // ... } // Query executes the given query and returns the resulting rows and columns. func (ps *PostgreSQL) Query(ctx context.Context, query string) (connectors.Rows, []connectors.Column, error) { // ... } // SQLLiteral returns the SQL literal representation of v according to the // provided Krenalis type t. It supports nil values and the following Krenalis // types: string, datetime, date, and json. func (ps *PostgreSQL) SQLLiteral(value any, typ types.Type) string { // ... } ``` ## Implementation Let's explore how to implement a database connector, for example for PostgreSQL. First create a Go module: ```sh mkdir postgresql cd postgresql go mod init postgresql ``` Then add a Go file to the new directory. For example copy the previous template file. Later on, you can [build an executable with your connector](https://www.krenalis.com/docs/installation/from-source.md#building-using-the-go-tools). ### About the connector The `DatabaseInfo` type describes information about the database connector: - `Code`: unique identifier in kebab-case (`a-z0-9-`), e.g. "postgresql", "mysql", "snowflake". - `Label`: display label in the Admin console, typically the database's name (e.g. "PostgreSQL", "MySQL", "Snowflake"). - `Categories`: the categories the connector belongs to. There must be at least one category, but for a database connector it should be only `connectors.CategoryDatabase`. - `SampleQuery`: sample query displayed in the query editor when creating a new database source pipeline. - `TimeLayouts`: layouts for the `datetime`, `date`, and `time` values when they are represented as strings. See [Time layouts](https://www.krenalis.com/docs/create-integration/data-values.md#time-layouts) in [Data values](https://www.krenalis.com/docs/create-integration/data-values.md) for more details. This information is passed to the `RegisterDatabase` function that, executed during package initialization, registers the database connector: ```go func init() { connectors.RegisterDatabase(connectors.DatabaseInfo{ Code: "postgresql", Label: "PostgreSQL", Category: connectors.CategoryDatabase, SampleQuery: "SELECT *\nFROM users\n", }, New) } ``` ### Constructor The second argument supplied to the `RegisterDatabase` function is the function utilized for creating a connector instance: ```go func New(env *connectors.DatabaseEnv) (*PostgreSQL, error) ``` This function accepts a database environment and yields a value representing your custom type. The structure of `DatabaseEnv` is outlined as follows: ```go // DatabaseEnv is the environment for a database connector. type DatabaseEnv struct { // Settings holds the raw settings data. Settings json.Value // SetSettings is the function used to update the settings. SetSettings SetSettingsFunc } ``` - `Settings`: Contains the instance settings in JSON format. Further details on how the connector defines its settings will be discussed later. - `SetSetting`: A function that enables the connector to update its settings as necessary. ### Close method ```go Close() error ``` The `Close` method is invoked by Krenalis when no calls to the connector instance's methods are in progress and no more will be made, so the connector can close any connections eventually opened with the database. ### Columns method ```go Columns(ctx context.Context, table string) ([]connectors.Column, error) ``` Krenalis invokes the `Columns` method when creating or updating a database destination pipeline, retrieving the columns of the table to which data should be exported. The `Columns` method returns the table's columns as a slice of `Column` values, detailing the names and types of each column: ```go // Column represents a database table column. type Column struct { Name string // column name Type types.Type // data type of the column Nullable bool // true if the column can contain NULL values Writable bool // true if the column is writable Issue string // issue message } ``` #### Handling column issues If a column's type is not supported, its name is not a valid property name, or any other issue occurs with the column, leave `Column.Type` unset. Likewise, leave the other fields unset, as they are not relevant in this case, and describe the issue in `Column.Issue`. Such a column will not appear among the available table columns. However, the issue will be brought to the user's attention without preventing the use of the other columns. The following are examples of common issue messages used by database connectors: * _Column "perf" has an unsupported type "INT96"._ * _Column "score:value " does not have a valid property name. Valid names start with a letter or underscore, followed by only letters, numbers, or underscores._ * _Column "amount" has a precision of 100, which exceeds the maximum supported precision of 76._ * _Column "value" has a scale of 50, which exceeds the maximum supported precision of 37._ ### Merge method ```go Merge(ctx context.Context, table connectors.Table, rows [][]any) error ``` The `Merge` method is used by Krenalis during data export to a database. It updates existing rows if matching keys are found, or inserts new rows into the specified table. The `table` parameter provides details about the table to update, including its name, columns, and keys. Defined as: ```go type Table struct { Name string Columns []Column Keys []string } ``` - `Name`: The name of the table. - `Columns`: The columns in the table that need to be updated. It may not include all the columns in the table. - `Keys`: The columns that serve as keys for the table. This typically includes the primary key but does not have to. The columns specified in `table.Keys` are also included in `table.Columns`. The `rows` parameter contains the rows to be updated or inserted. For each `row`, `row[i]` contains the value for the column `table.Columns[i]`. If a column is nullable, the corresponding value in a raw can be `nil`, representing the `NULL` value in the database. Krenalis ensures that `table.Columns` (and consequently each row in `rows`) contains at least one additional column besides the table key values. Furthermore, Krenalis ensures that during the entire execution of an export to the database, to the `Merge` method are never passed two or more duplicate rows, meaning rows that have the same value for the table keys. A database connector can require that the columns in `table.Keys` form the primary key and can return an error if they do not. ### Query method ```go Query(ctx context.Context, query string) (connectors.Rows, []connectors.Column, []string, error) ``` Krenalis invokes the `Query` method when previewing the rows returned by a query while creating or updating a database source pipeline, and to get the data during an import. The query is provided after replacing any placeholders like `${limit}`. The `Query` method runs the query and gives back two things: the rows themselves, which follow the `Rows` interface, and the columns as a slice of `Column` values. Here's what the `Rows` interface look like: ```go type Rows interface { Close() error Err() error Next() bool Scan(dest ...any) error } ``` The standard Go library's `sql.Rows` type implements this interface. So, the connector can just return a `sql.Rows` value. Here's what the `Column` type look like: ```go // Column represents a database table column. type Column struct { Name string // column name Type types.Type // data type of the column Nullable bool // true if the column can contain NULL values Writable bool // true if the column is writable (generally false for columns returned by Query). Issue string // issue message } ``` #### Handling column issues If a column's type is not supported, its name is not a valid property name, or any other issue occurs with the column, leave `Column.Type` unset. Likewise, leave the other fields unset, as they are not relevant in this case, and describe the issue in `Column.Issue`. Such a column will not appear among the available table columns. However, the issue will be brought to the user's attention without preventing the use of the other columns. The following are examples of common issue messages used by database connectors: * _Column "perf" has an unsupported type "INT96"._ * _Column "score:value " does not have a valid property name. Valid names start with a letter or underscore, followed by only letters, numbers, or underscores._ * _Column "amount" has a precision of 100, which exceeds the maximum supported precision of 76._ * _Column "value" has a scale of 50, which exceeds the maximum supported precision of 37._ ### SQLLiteral method ```go SQLLiteral(v any, t types.Type) string ``` The `SQLLiteral` method must return the SQL literal representation of `v` according to the provided Krenalis type `t`. Currently, the values passed to the method are either `nil` or values corresponding to the Krenalis types `string`, `datetime`, `date`, and `json`. The following table illustrates sample inputs to `SQLLiteral` and the corresponding literal returned by the PostgreSQL connector: | Call | PostgreSQL return value | |-----------------------------------------------------------------------------------------|--------------------------------| | `SQLLiteral(nil, types.Type{})` | `NULL` | | `SQLLiteral("foo", types.String())` | `'foo'` | | `SQLLiteral("{\"boo\":5}", types.JSON())` | `'{"boo": 5}'` | | `SQLLiteral(time.Date(2025, 12, 9, 12, 53, 45, 730139838, time.UTC), types.DateTime())` | `'2025-12-09 12:53:45.730139'` | | `SQLLiteral(time.Date(2025, 12, 9, 0, 0, 0, 0, time.UTC), types.Date())` | `'2025-12-09'` | A connector only needs to implement `SQLLiteral` for the Krenalis types it actually supports. For example, if a connector never returns values of type `json`, it will never receive a value of that type and therefore does not need to implement JSON literal handling. ### SQLLiteral with the `updated_at` placeholder Krenalis calls the `SQLLiteral` method when building a SQL query to replace the `updated_at` placeholder. Consider the following query: ```sql SELECT first_name, last_name, phone_number FROM customers WHERE updated_at >= ${updated_at} OR ${updated_at} IS NULL ORDER BY updated_at ``` The call `SQLLiteral(time.Date(2025, 01, 30, 16, 12, 25, 837, time.UTC), types.DateTime())` might return `"'2025-01-30 16:12:25.837'"`, resulting in: ```sql SELECT first_name, last_name, phone_number FROM customers WHERE updated_at >= '2025-01-30 16:12:25.837' OR '2025-01-30 16:12:25.837' IS NULL ORDER BY updated_at ``` The call `SQLLiteral("2025-02-13T16:12:25", types.String())` might return `"'2025-02-13T16:12:25'"`, resulting in: ```sql SELECT first_name, last_name, phone_number FROM customers WHERE updated_at >= '2025-02-13T16:12:25' OR '2025-02-13T16:12:25' IS NULL ORDER BY updated_at ``` The call `SQLLiteral(nil, time.Time{})` might return `"NULL"`, resulting in: ```sql SELECT first_name, last_name, phone_number FROM customers WHERE updated_at >= NULL OR NULL IS NULL ORDER BY updated_at ``` In this case, `updated_at >= NULL OR NULL IS NULL` evaluates to `TRUE`, returning all rows as expected.