Import users from databases

This type of pipeline imports users from a database into the workspace's data warehouse. It is available only for source database connections.

Create pipeline

Create a source pipeline to import users from a database.

Request

  • name

    string Required

    The pipeline's name.

    Must be a non-empty string with a maximum of 60 characters.
  • connection

    int Required

    The ID of the connection from which to read the users. It must be a source database.

  • target

    string Required

    The entity on which the pipeline operates, which must be "User" in order to create a pipeline that imports users.

    Possible values: "User".
  • enabled

    boolean

    Indicates if the pipeline is enabled once created.

  • query

    string Required

    The SELECT query executed on the database to retrieve the users to import.

    The column names returned must be valid property names. They must:

    • Start with A...Z, a...z, or _,
    • Contain only A...Z, a...z, 0...9, or _.

    Note that you can use a column alias if necessary (e.g., SELECT 1 AS v FROM users).

    The query should include the updated_at placeholder as a condition in the WHERE clause.

    Must be a non-empty string with a maximum of 1000 characters.
  • userIDColumn

    string Required

    The column in the database that uniquely identifies each user in the connection. It serves as the single, unique identifier for each user record, ensuring that each user can be distinctly referenced.

    Only columns with types corresponding to the following Krenalis types can be used as an identity: string, int, uuid, and json.

    Must be a non-empty string with a maximum of 1024 characters.
  • updatedAtColumn

    string

    The column that stores the date when a user record was last updated. It tracks the most recent modification made to the user's data, helping to identify when changes occurred.

    The value of this column is used for incremental imports, where only records that have been modified since the last import need to be processed.

    Only columns with types corresponding to the following Krenalis types can be used as the update time: string, datetime, date, or json.

    It cannot be longer than 1024 characters.
  • updatedAtFormat

    string Conditionally Required

    The format of the value in the update time column. It can be set to "ISO8601" if the column value follows the ISO 8601 format. Otherwise, it should follow a format accepted by the Python strftime function.

    This field is only required if the updatedAtColumn is provided, is not empty, and has a type string or json.

    Must be a non-empty string with a maximum of 64 characters.
  • incremental

    boolean

    Determines whether users are imported incrementally:

    • true: are imported only users whose update time is equal to or later than the last imported user's change time.
    • false: all users are imported again, regardless of their update time. false is the default value.

    If set to true, a column for the update time must be specified (i.e., updatedAtColumn is not null). Additionally, if the update time column type corresponds to the Krenalis string or json types, the values in the column must be sortable.

  • transformation

    object Required

    The mapping or function responsible for transforming database users into user identities linked to the pipeline. Once the identity resolution process is complete, the user identities associated with all pipelines are merged into unified users.

    One of either a mapping or a function must be provided, but not both. The one that is not provided can be either missing or set to null.

    • transformation.mapping

      nullable object with string values Conditionally Required

      The transformation mapping. A key represents a property path in the profile schema, and its corresponding value is an expression. This expression can reference columns of the database table.

    • transformation.function

      nullable object Conditionally Required

      The transformation function. A JavaScript or Python function that given a user in the database, returns an identity.

      • transformation.function.source

        string Required

        The source code of the JavaScript or Python function.

        Must be a non-empty string with a maximum of 50000 characters.
      • transformation.function.language

        string Required

        The language of the function.

        Possible values: "JavaScript" or "Python".
      • transformation.function.preserveJSON

        boolean

        Specifies whether JSON values are passed to and returned from the function as strings, keeping their original format without any encoding or decoding.

      • transformation.function.inPaths

        array of string Required

        The paths of the properties that will be passed to the function. At least one path must be present.

      • transformation.function.outPaths

        array of string Required

        The paths of the properties that may be returned by the function. At least one path must be present.

  • inSchema

    schema Required

    The schema for the identity column, the update time column, and the input properties for the transformation.

    When importing users from databases, this should be a subset of the query schema.

  • outSchema

    schema Required

    The schema for the output properties of the transformation.

    When importing users from databases, this should be a subset of the profile schema.

Response

  • id

    int

    The ID of the pipeline.

POST /v1/pipelines
curl https://example.com/v1/pipelines \
-H "Authorization: Bearer api_xxxxxxx" \
--json '{
"name": "Ingest users from database",
"connection": 230527183,
"target": "User",
"enabled": true,
"query": "SELECT firstname, lastname, email, updated_at FROM customers WHERE updated_at >= ${updated_at}",
"userIDColumn": "email",
"updatedAtColumn": "updated_at",
"updatedAtFormat": "ISO8601",
"incremental": true,
"transformation": {
"mapping": {
"first_name": "firstname",
"last_name": "lastname",
"email": "email"
}
},
"inSchema": {
"kind": "object",
"properties": [
{
"name": "email",
"type": {
"kind": "string",
"maxLength": 120
}
},
{
"name": "firstname",
"type": {
"kind": "string",
"maxLength": 60
}
},
{
"name": "lastname",
"type": {
"kind": "string",
"maxLength": 60
}
},
{
"name": "updated_at",
"type": {
"kind": "string",
"maxLength": 60
}
}
]
},
"outSchema": {
"kind": "object",
"properties": [
{
"name": "first_name",
"type": {
"kind": "string",
"maxLength": 100
},
"readOptional": true,
"description": "First name"
},
{
"name": "last_name",
"type": {
"kind": "string",
"maxLength": 100
},
"readOptional": true,
"description": "Last name"
},
{
"name": "email",
"type": {
"kind": "string",
"maxLength": 254
},
"readOptional": true,
"description": "Email"
}
]
}
}'
Response
{
  "id": 705981339
}
Errors
404
workspace does not exist
422
connection does not exist
422
connector does not exist
422
transformation language is not supported

Update pipeline

Update a source pipeline that import users from a database.

Request

  • :id

    int Required

    The ID of the source database pipeline to update.

  • name

    string Required

    The pipeline's name.

    Must be a non-empty string with a maximum of 60 characters.
  • enabled

    boolean

    Indicates if the pipeline is enabled. Use the Set status endpoint to change only the pipeline's status.

  • query

    string Required

    The SELECT query executed on the database to retrieve the users to import.

    The column names returned must be valid property names. They must:

    • Start with A...Z, a...z, or _,
    • Contain only A...Z, a...z, 0...9, or _.

    Note that you can use a column alias if necessary (e.g., SELECT 1 AS v FROM users).

    The query should include the updated_at placeholder as a condition in the WHERE clause.

    Must be a non-empty string with a maximum of 1000 characters.
  • userIDColumn

    string Required

    The column in the database that uniquely identifies each user in the connection. It serves as the single, unique identifier for each user record, ensuring that each user can be distinctly referenced.

    Only columns with types corresponding to the following Krenalis types can be used as an identity: string, int, uuid, and json.

    Must be a non-empty string with a maximum of 1024 characters.
  • updatedAtColumn

    string

    The column that stores the date when a user record was last updated. It tracks the most recent modification made to the user's data, helping to identify when changes occurred.

    The value of this column is used for incremental imports, where only records that have been modified since the last import need to be processed.

    Only columns with types corresponding to the following Krenalis types can be used as the update time: string, datetime, date, or json.

    It cannot be longer than 1024 characters.
  • updatedAtFormat

    string Conditionally Required

    The format of the value in the update time column. It can be set to "ISO8601" if the column value follows the ISO 8601 format. Otherwise, it should follow a format accepted by the Python strftime function.

    This field is only required if the updatedAtColumn is provided, is not empty, and has a type string or json.

    Must be a non-empty string with a maximum of 64 characters.
  • incremental

    boolean

    Determines whether users are imported incrementally:

    • true: are imported only users whose update time is equal to or later than the last imported user's change time.
    • false: all users are imported again, regardless of their update time. false is the default value.

    If set to true, a column for the update time must be specified (i.e., updatedAtColumn is not null). Additionally, if the update time column type corresponds to the Krenalis string or json types, the values in the column must be sortable.

  • transformation

    object Required

    The mapping or function responsible for transforming database users into user identities linked to the pipeline. Once the identity resolution process is complete, the user identities associated with all pipelines are merged into unified users.

    One of either a mapping or a function must be provided, but not both. The one that is not provided can be either missing or set to null.

    • transformation.mapping

      nullable object with string values Conditionally Required

      The transformation mapping. A key represents a property path in the profile schema, and its corresponding value is an expression. This expression can reference columns of the database table.

    • transformation.function

      nullable object Conditionally Required

      The transformation function. A JavaScript or Python function that given a user in the database, returns an identity.

      • transformation.function.source

        string Required

        The source code of the JavaScript or Python function.

        Must be a non-empty string with a maximum of 50000 characters.
      • transformation.function.language

        string Required

        The language of the function.

        Possible values: "JavaScript" or "Python".
      • transformation.function.preserveJSON

        boolean

        Specifies whether JSON values are passed to and returned from the function as strings, keeping their original format without any encoding or decoding.

      • transformation.function.inPaths

        array of string Required

        The paths of the properties that will be passed to the function. At least one path must be present.

      • transformation.function.outPaths

        array of string Required

        The paths of the properties that may be returned by the function. At least one path must be present.

  • inSchema

    schema Required

    The schema for the identity column, the update time column, and the input properties for the transformation.

    When importing users from databases, this should be a subset of the query schema.

  • outSchema

    schema Required

    The schema for the output properties of the transformation.

    When importing users from databases, this should be a subset of the profile schema.

Response

No response.
PUT /v1/pipelines/:id
curl -X PUT https://example.com/v1/pipelines/705981339 \
-H "Authorization: Bearer api_xxxxxxx" \
--json '{
"name": "Ingest users from database",
"enabled": true,
"query": "SELECT firstname, lastname, email, updated_at FROM customers WHERE updated_at >= ${updated_at}",
"userIDColumn": "email",
"updatedAtColumn": "updated_at",
"updatedAtFormat": "ISO8601",
"incremental": true,
"transformation": {
"mapping": {
"first_name": "firstname",
"last_name": "lastname",
"email": "email"
}
},
"inSchema": {
"kind": "object",
"properties": [
{
"name": "email",
"type": {
"kind": "string",
"maxLength": 120
}
},
{
"name": "firstname",
"type": {
"kind": "string",
"maxLength": 60
}
},
{
"name": "lastname",
"type": {
"kind": "string",
"maxLength": 60
}
},
{
"name": "updated_at",
"type": {
"kind": "string",
"maxLength": 60
}
}
]
},
"outSchema": {
"kind": "object",
"properties": [
{
"name": "first_name",
"type": {
"kind": "string",
"maxLength": 100
},
"readOptional": true,
"description": "First name"
},
{
"name": "last_name",
"type": {
"kind": "string",
"maxLength": 100
},
"readOptional": true,
"description": "Last name"
},
{
"name": "email",
"type": {
"kind": "string",
"maxLength": 254
},
"readOptional": true,
"description": "Email"
}
]
}
}'
Errors
404
workspace does not exist
404
pipeline does not exist
422
transformation language is not supported

Get pipeline

Get a source pipeline that imports users from a database.

Request

  • :id

    int Required

    The ID of the source database pipeline.

Response

  • id

    int

    The ID of the source database pipeline.

  • name

    string

    The pipeline's name.

    It is not longer than 60 characters.
  • connector

    string

    The code of the connection's connector.

  • connectorType

    string

    The type of the connection's connector. It is always "Database" when the pipeline imports users from a database.

    Possible values: "Application", "Database", "FileStorage", "MessageBroker", "SDK" or "Webhook".
  • connection

    int

    The ID of the connection from which users are read. It is a source database.

  • connectionRole

    string

    The role of the pipeline's connection. It is always "Source" when the pipeline imports users from a database.

    Possible values: "Source" or "Destination".
  • target

    string

    The entity on which the pipeline operates. It is always "User" when the pipeline imports users from a database.

    Possible values: "User" or "Event".
  • enabled

    boolean

    Indicates if the pipeline is enabled.

  • query

    string

    The SELECT query executed on the database to retrieve the users to import.

    It is not longer than 1000 characters.
  • userIDColumn

    string

    The column in the database that uniquely identifies each user in the connection.

  • updatedAtColumn

    nullable string

    The column that stores the timestamp of the last update to a user record. It is null if no such column exists.

  • updatedAtFormat

    nullable string

    The format of the value in the update time column. It is null if no such column exists or if the corresponding Krenalis type is datetime or date.

    It is "ISO8601" if the column value follows the ISO 8601 format. Otherwise, it follows the format accepted by the Python strftime function.

  • incremental

    boolean

    Indicates whether users are imported incrementally:

    • true: are imported only users whose update time is equal to or later than the last imported user's change time.
    • false: all users are imported again, regardless of their update time.
  • transformation

    object

    The mapping or function responsible for transforming database users into user identities linked to the pipeline. Once identity resolution is completed, the user identities associated to all pipelines are merged into unified users.

    One of either a mapping or a function is present, but not both. The one that is not present is null.

    • transformation.mapping

      nullable object with string values

      The transformation mapping. A key represents a property path in the profile schema, and its corresponding value is an expression. This expression can reference columns of the database table.

    • transformation.function

      nullable object

      The transformation function. A JavaScript or Python function that given a user in the database, returns an identity.

      • transformation.function.source

        string

        The source code of the JavaScript or Python function.

        It is not longer than 50000 characters.
      • transformation.function.language

        string

        The language of the function.

        Possible values: "JavaScript" or "Python".
      • transformation.function.preserveJSON

        boolean

        Specifies whether JSON values are passed to and returned from the function as strings, keeping their original format without any encoding or decoding.

      • transformation.function.inPaths

        array of string

        The paths of the properties that will be passed to the function. It contains at least one property path.

      • transformation.function.outPaths

        array of string

        The paths of the properties that may be returned by the function. It contains at least one property path.

  • inSchema

    schema

    The schema for the identity column, the update time column, and the input properties for the transformation.

  • outSchema

    schema

    The schema for the output properties of the transformation.

  • running

    boolean

    Indicates if the pipeline is running.

  • scheduleStart

    nullable int

    The start time of the schedule in minutes, counting from 00:00. It specifies the minute when the first scheduled run of the day begins. Subsequent runs occur based on the interval defined by the scheduler period. If the scheduler is disabled, this value is null.

  • schedulePeriod

    nullable string

    The schedule period, which determines how often the import runs automatically. If it is null, the scheduler is disabled, and no automatic run will occur.

    To change the schedule period, use the Set schedule period endpoint.

    Possible values: "5m", "15m", "30m", "1h", "2h", "3h", "6h", "8h", "12h" or "24h".
GET /v1/pipelines/:id
curl https://example.com/v1/pipelines/705981339 \
-H "Authorization: Bearer api_xxxxxxx"
Response
{
  "id": 705981339,
  "name": "Ingest users from database",
  "connector": "mysql",
  "connectorType": "Database",
  "connection": 1371036433,
  "connectionRole": "Source",
  "target": "User",
  "enabled": true,
  "query": "SELECT firstname, lastname, email, updated_at FROM customers WHERE updated_at >= ${updated_at}",
  "userIDColumn": "email",
  "updatedAtColumn": "updated_at",
  "updatedAtFormat": "ISO8601",
  "incremental": true,
  "transformation": {
    "function": {
      "source": "const transform = (user) => { ... }",
      "language": "JavaScript",
      "preserveJSON": false,
      "inPaths": [
        "email",
        "firstname",
        "lastname"
      ],
      "outPaths": [
        "email",
        "first_name",
        "last_name"
      ]
    }
  },
  "inSchema": {
    "kind": "object",
    "properties": [
      {
        "name": "email",
        "type": {
          "kind": "string",
          "maxLength": 120
        }
      },
      {
        "name": "firstname",
        "type": {
          "kind": "string",
          "maxLength": 60
        }
      },
      {
        "name": "lastname",
        "type": {
          "kind": "string",
          "maxLength": 60
        }
      },
      {
        "name": "updated_at",
        "type": {
          "kind": "string",
          "maxLength": 60
        }
      }
    ]
  },
  "outSchema": {
    "kind": "object",
    "properties": [
      {
        "name": "first_name",
        "type": {
          "kind": "string",
          "maxLength": 100
        },
        "readOptional": true,
        "description": "First name"
      },
      {
        "name": "last_name",
        "type": {
          "kind": "string",
          "maxLength": 100
        },
        "readOptional": true,
        "description": "Last name"
      },
      {
        "name": "email",
        "type": {
          "kind": "string",
          "maxLength": 254
        },
        "readOptional": true,
        "description": "Email"
      }
    ]
  },
  "running": false,
  "scheduleStart": 15,
  "schedulePeriod": "1h"
}
Errors
404
workspace does not exist
404
pipeline does not exist