Import users from databases
This type of pipeline imports users from a database into the workspace's data warehouse. It is available only for source database connections.
Create pipeline
Create a source pipeline to import users from a database.
Request
-
name
string RequiredThe pipeline's name.
Must be a non-empty string with a maximum of 60 characters. -
connection
int RequiredThe ID of the connection from which to read the users. It must be a source database.
-
target
string RequiredThe entity on which the pipeline operates, which must be
Possible values:"User"in order to create a pipeline that imports users."User". -
enabled
booleanIndicates if the pipeline is enabled once created.
-
query
string RequiredThe SELECT query executed on the database to retrieve the users to import.
The column names returned must be valid property names. They must:
- Start with
A...Z,a...z, or_, - Contain only
A...Z,a...z,0...9, or_.
Note that you can use a column alias if necessary (e.g.,
SELECT 1 AS v FROM users).The query should include the
Must be a non-empty string with a maximum of 1000 characters.updated_atplaceholder as a condition in the WHERE clause. - Start with
-
userIDColumn
string RequiredThe column in the database that uniquely identifies each user in the connection. It serves as the single, unique identifier for each user record, ensuring that each user can be distinctly referenced.
Only columns with types corresponding to the following Krenalis types can be used as an identity:
Must be a non-empty string with a maximum of 1024 characters.string,int,uuid, andjson. -
updatedAtColumn
stringThe column that stores the date when a user record was last updated. It tracks the most recent modification made to the user's data, helping to identify when changes occurred.
The value of this column is used for incremental imports, where only records that have been modified since the last import need to be processed.
Only columns with types corresponding to the following Krenalis types can be used as the update time:
It cannot be longer than 1024 characters.string,datetime,date, orjson. -
updatedAtFormat
string Conditionally RequiredThe format of the value in the update time column. It can be set to
"ISO8601"if the column value follows the ISO 8601 format. Otherwise, it should follow a format accepted by the Python strftime function.This field is only required if the
Must be a non-empty string with a maximum of 64 characters.updatedAtColumnis provided, is not empty, and has a typestringorjson. -
incremental
booleanDetermines whether users are imported incrementally:
true: are imported only users whose update time is equal to or later than the last imported user's change time.false: all users are imported again, regardless of their update time.falseis the default value.
If set to
true, a column for the update time must be specified (i.e.,updatedAtColumnis not null). Additionally, if the update time column type corresponds to the Krenalisstringorjsontypes, the values in the column must be sortable. -
transformation
object RequiredThe mapping or function responsible for transforming database users into user identities linked to the pipeline. Once the identity resolution process is complete, the user identities associated with all pipelines are merged into unified users.
One of either a mapping or a function must be provided, but not both. The one that is not provided can be either missing or set to null.
-
transformation.mapping
nullable object with string values Conditionally RequiredThe transformation mapping. A key represents a property path in the profile schema, and its corresponding value is an expression. This expression can reference columns of the database table.
-
transformation.function
nullable object Conditionally RequiredThe transformation function. A JavaScript or Python function that given a user in the database, returns an identity.
-
transformation.function.source
string RequiredThe source code of the JavaScript or Python function.
Must be a non-empty string with a maximum of 50000 characters. -
transformation.function.language
string RequiredThe language of the function.
Possible values:"JavaScript"or"Python". -
transformation.function.preserveJSON
booleanSpecifies whether JSON values are passed to and returned from the function as strings, keeping their original format without any encoding or decoding.
-
transformation.function.inPaths
array of string RequiredThe paths of the properties that will be passed to the function. At least one path must be present.
-
transformation.function.outPaths
array of string RequiredThe paths of the properties that may be returned by the function. At least one path must be present.
-
-
-
inSchema
schema RequiredThe schema for the identity column, the update time column, and the input properties for the transformation.
When importing users from databases, this should be a subset of the query schema.
-
outSchema
schema RequiredThe schema for the output properties of the transformation.
When importing users from databases, this should be a subset of the profile schema.
Response
-
id
intThe ID of the pipeline.
curl https://example.com/v1/pipelines \ -H "Authorization: Bearer api_xxxxxxx" \ --json '{ "name": "Ingest users from database", "connection": 230527183, "target": "User", "enabled": true, "query": "SELECT firstname, lastname, email, updated_at FROM customers WHERE updated_at >= ${updated_at}", "userIDColumn": "email", "updatedAtColumn": "updated_at", "updatedAtFormat": "ISO8601", "incremental": true, "transformation": { "mapping": { "first_name": "firstname", "last_name": "lastname", "email": "email" } }, "inSchema": { "kind": "object", "properties": [ { "name": "email", "type": { "kind": "string", "maxLength": 120 } }, { "name": "firstname", "type": { "kind": "string", "maxLength": 60 } }, { "name": "lastname", "type": { "kind": "string", "maxLength": 60 } }, { "name": "updated_at", "type": { "kind": "string", "maxLength": 60 } } ] }, "outSchema": { "kind": "object", "properties": [ { "name": "first_name", "type": { "kind": "string", "maxLength": 100 }, "readOptional": true, "description": "First name" }, { "name": "last_name", "type": { "kind": "string", "maxLength": 100 }, "readOptional": true, "description": "Last name" }, { "name": "email", "type": { "kind": "string", "maxLength": 254 }, "readOptional": true, "description": "Email" } ] } }' {
"id": 705981339
}Update pipeline
Update a source pipeline that import users from a database.
Request
-
:id
int RequiredThe ID of the source database pipeline to update.
-
name
string RequiredThe pipeline's name.
Must be a non-empty string with a maximum of 60 characters. -
enabled
booleanIndicates if the pipeline is enabled. Use the Set status endpoint to change only the pipeline's status.
-
query
string RequiredThe SELECT query executed on the database to retrieve the users to import.
The column names returned must be valid property names. They must:
- Start with
A...Z,a...z, or_, - Contain only
A...Z,a...z,0...9, or_.
Note that you can use a column alias if necessary (e.g.,
SELECT 1 AS v FROM users).The query should include the
Must be a non-empty string with a maximum of 1000 characters.updated_atplaceholder as a condition in the WHERE clause. - Start with
-
userIDColumn
string RequiredThe column in the database that uniquely identifies each user in the connection. It serves as the single, unique identifier for each user record, ensuring that each user can be distinctly referenced.
Only columns with types corresponding to the following Krenalis types can be used as an identity:
Must be a non-empty string with a maximum of 1024 characters.string,int,uuid, andjson. -
updatedAtColumn
stringThe column that stores the date when a user record was last updated. It tracks the most recent modification made to the user's data, helping to identify when changes occurred.
The value of this column is used for incremental imports, where only records that have been modified since the last import need to be processed.
Only columns with types corresponding to the following Krenalis types can be used as the update time:
It cannot be longer than 1024 characters.string,datetime,date, orjson. -
updatedAtFormat
string Conditionally RequiredThe format of the value in the update time column. It can be set to
"ISO8601"if the column value follows the ISO 8601 format. Otherwise, it should follow a format accepted by the Python strftime function.This field is only required if the
Must be a non-empty string with a maximum of 64 characters.updatedAtColumnis provided, is not empty, and has a typestringorjson. -
incremental
booleanDetermines whether users are imported incrementally:
true: are imported only users whose update time is equal to or later than the last imported user's change time.false: all users are imported again, regardless of their update time.falseis the default value.
If set to
true, a column for the update time must be specified (i.e.,updatedAtColumnis not null). Additionally, if the update time column type corresponds to the Krenalisstringorjsontypes, the values in the column must be sortable. -
transformation
object RequiredThe mapping or function responsible for transforming database users into user identities linked to the pipeline. Once the identity resolution process is complete, the user identities associated with all pipelines are merged into unified users.
One of either a mapping or a function must be provided, but not both. The one that is not provided can be either missing or set to null.
-
transformation.mapping
nullable object with string values Conditionally RequiredThe transformation mapping. A key represents a property path in the profile schema, and its corresponding value is an expression. This expression can reference columns of the database table.
-
transformation.function
nullable object Conditionally RequiredThe transformation function. A JavaScript or Python function that given a user in the database, returns an identity.
-
transformation.function.source
string RequiredThe source code of the JavaScript or Python function.
Must be a non-empty string with a maximum of 50000 characters. -
transformation.function.language
string RequiredThe language of the function.
Possible values:"JavaScript"or"Python". -
transformation.function.preserveJSON
booleanSpecifies whether JSON values are passed to and returned from the function as strings, keeping their original format without any encoding or decoding.
-
transformation.function.inPaths
array of string RequiredThe paths of the properties that will be passed to the function. At least one path must be present.
-
transformation.function.outPaths
array of string RequiredThe paths of the properties that may be returned by the function. At least one path must be present.
-
-
-
inSchema
schema RequiredThe schema for the identity column, the update time column, and the input properties for the transformation.
When importing users from databases, this should be a subset of the query schema.
-
outSchema
schema RequiredThe schema for the output properties of the transformation.
When importing users from databases, this should be a subset of the profile schema.
Response
No response.curl -X PUT https://example.com/v1/pipelines/705981339 \ -H "Authorization: Bearer api_xxxxxxx" \ --json '{ "name": "Ingest users from database", "enabled": true, "query": "SELECT firstname, lastname, email, updated_at FROM customers WHERE updated_at >= ${updated_at}", "userIDColumn": "email", "updatedAtColumn": "updated_at", "updatedAtFormat": "ISO8601", "incremental": true, "transformation": { "mapping": { "first_name": "firstname", "last_name": "lastname", "email": "email" } }, "inSchema": { "kind": "object", "properties": [ { "name": "email", "type": { "kind": "string", "maxLength": 120 } }, { "name": "firstname", "type": { "kind": "string", "maxLength": 60 } }, { "name": "lastname", "type": { "kind": "string", "maxLength": 60 } }, { "name": "updated_at", "type": { "kind": "string", "maxLength": 60 } } ] }, "outSchema": { "kind": "object", "properties": [ { "name": "first_name", "type": { "kind": "string", "maxLength": 100 }, "readOptional": true, "description": "First name" }, { "name": "last_name", "type": { "kind": "string", "maxLength": 100 }, "readOptional": true, "description": "Last name" }, { "name": "email", "type": { "kind": "string", "maxLength": 254 }, "readOptional": true, "description": "Email" } ] } }' Get pipeline
Get a source pipeline that imports users from a database.
Request
-
:id
int RequiredThe ID of the source database pipeline.
Response
-
id
intThe ID of the source database pipeline.
-
name
stringThe pipeline's name.
It is not longer than 60 characters. -
connector
stringThe code of the connection's connector.
-
connectorType
stringThe type of the connection's connector. It is always
Possible values:"Database"when the pipeline imports users from a database."Application","Database","FileStorage","MessageBroker","SDK"or"Webhook". -
connection
intThe ID of the connection from which users are read. It is a source database.
-
connectionRole
stringThe role of the pipeline's connection. It is always
Possible values:"Source"when the pipeline imports users from a database."Source"or"Destination". -
target
stringThe entity on which the pipeline operates. It is always
Possible values:"User"when the pipeline imports users from a database."User"or"Event". -
enabled
booleanIndicates if the pipeline is enabled.
-
query
stringThe SELECT query executed on the database to retrieve the users to import.
It is not longer than 1000 characters. -
userIDColumn
stringThe column in the database that uniquely identifies each user in the connection.
-
updatedAtColumn
nullable stringThe column that stores the timestamp of the last update to a user record. It is null if no such column exists.
-
updatedAtFormat
nullable stringThe format of the value in the update time column. It is null if no such column exists or if the corresponding Krenalis type is
datetimeordate.It is
"ISO8601"if the column value follows the ISO 8601 format. Otherwise, it follows the format accepted by the Python strftime function. -
incremental
booleanIndicates whether users are imported incrementally:
true: are imported only users whose update time is equal to or later than the last imported user's change time.false: all users are imported again, regardless of their update time.
-
transformation
objectThe mapping or function responsible for transforming database users into user identities linked to the pipeline. Once identity resolution is completed, the user identities associated to all pipelines are merged into unified users.
One of either a mapping or a function is present, but not both. The one that is not present is null.
-
transformation.mapping
nullable object with string valuesThe transformation mapping. A key represents a property path in the profile schema, and its corresponding value is an expression. This expression can reference columns of the database table.
-
transformation.function
nullable objectThe transformation function. A JavaScript or Python function that given a user in the database, returns an identity.
-
transformation.function.source
stringThe source code of the JavaScript or Python function.
It is not longer than 50000 characters. -
transformation.function.language
stringThe language of the function.
Possible values:"JavaScript"or"Python". -
transformation.function.preserveJSON
booleanSpecifies whether JSON values are passed to and returned from the function as strings, keeping their original format without any encoding or decoding.
-
transformation.function.inPaths
array of stringThe paths of the properties that will be passed to the function. It contains at least one property path.
-
transformation.function.outPaths
array of stringThe paths of the properties that may be returned by the function. It contains at least one property path.
-
-
-
inSchema
schemaThe schema for the identity column, the update time column, and the input properties for the transformation.
-
outSchema
schemaThe schema for the output properties of the transformation.
-
running
booleanIndicates if the pipeline is running.
-
scheduleStart
nullable intThe start time of the schedule in minutes, counting from 00:00. It specifies the minute when the first scheduled run of the day begins. Subsequent runs occur based on the interval defined by the scheduler period. If the scheduler is disabled, this value is null.
-
schedulePeriod
nullable stringThe schedule period, which determines how often the import runs automatically. If it is null, the scheduler is disabled, and no automatic run will occur.
To change the schedule period, use the Set schedule period endpoint.
Possible values:"5m","15m","30m","1h","2h","3h","6h","8h","12h"or"24h".
curl https://example.com/v1/pipelines/705981339 \ -H "Authorization: Bearer api_xxxxxxx" {
"id": 705981339,
"name": "Ingest users from database",
"connector": "mysql",
"connectorType": "Database",
"connection": 1371036433,
"connectionRole": "Source",
"target": "User",
"enabled": true,
"query": "SELECT firstname, lastname, email, updated_at FROM customers WHERE updated_at >= ${updated_at}",
"userIDColumn": "email",
"updatedAtColumn": "updated_at",
"updatedAtFormat": "ISO8601",
"incremental": true,
"transformation": {
"function": {
"source": "const transform = (user) => { ... }",
"language": "JavaScript",
"preserveJSON": false,
"inPaths": [
"email",
"firstname",
"lastname"
],
"outPaths": [
"email",
"first_name",
"last_name"
]
}
},
"inSchema": {
"kind": "object",
"properties": [
{
"name": "email",
"type": {
"kind": "string",
"maxLength": 120
}
},
{
"name": "firstname",
"type": {
"kind": "string",
"maxLength": 60
}
},
{
"name": "lastname",
"type": {
"kind": "string",
"maxLength": 60
}
},
{
"name": "updated_at",
"type": {
"kind": "string",
"maxLength": 60
}
}
]
},
"outSchema": {
"kind": "object",
"properties": [
{
"name": "first_name",
"type": {
"kind": "string",
"maxLength": 100
},
"readOptional": true,
"description": "First name"
},
{
"name": "last_name",
"type": {
"kind": "string",
"maxLength": 100
},
"readOptional": true,
"description": "Last name"
},
{
"name": "email",
"type": {
"kind": "string",
"maxLength": 254
},
"readOptional": true,
"description": "Email"
}
]
},
"running": false,
"scheduleStart": 15,
"schedulePeriod": "1h"
}