Sync users

View as Markdown

An application connector, if the related application allows it, can read, create, and update users within the application, enabling Krenalis to import and export user data.

Firstly, include the TargetUser flag during connector registration as the targets for source and destination:

connectors.RegisterApplication(connectors.ApplicationSpec{
    ...
    AsSource: &connectors.AsApplicationSource{
        ...
        Targets: connectors.TargetUser,
        ...
    },
    AsDestination: &connectors.AsApplicationDestination{
        ...
        Targets: connectors.TargetUser,
        ...
    },
    ...
}, New)

After that, to read, update, and create application records, the connector must implement the Records and Upsert methods. These methods take the target they should operate on as an argument, which currently can only be TargetUser.

Here, we'll use the term "records" to refer to users.

Let's start by looking at how to read records using the Records method.

Read records

Krenalis calls the connector's Records method to read records from the application:

Records(ctx context.Context, target connectors.Targets, updatedAt time.Time, cursor string, schema types.Type) ([]connectors.Record, string, error)

The parameters for this method are:

  • ctx: The context.
  • target: Specifies the type of entities to return. Currently, only TargetUser is supported.
  • updatedAt: If not the zero time, return only the records that were created or modified at or after. The precision of updatedAt is limited to microseconds.
  • cursor: Indicates the starting position for reading records. This is the cursor value from a previous call in a paginated query. For the first call, it is empty.
  • schema: The user source schema containing only the properties that must be returned. This schema is derived from the most recently returned result of RecordSchema.

Typically, the Records method returns at least one record if there are no errors. However, it is valid for it to return no records even when there are no errors. Additionally, the Records method may return duplicate records (i.e., records with the same ID), but only the first record in such cases will be processed.

First, let's examine the structure of a single returned record. Then, we'll explore how the Records method can return records incrementally, rather than all at once, by utilizing the cursor input parameter and the next output parameter.

The Record type is defined as follows:

type Record struct {
    ID         string
    Attributes map[string]any
    UpdatedAt  time.Time
    Err        error
}
  • ID: The unique identifier assigned by the application for this record. For user records, this is the application's User ID: the stable unique identifier used to identify a user within the application connection. It must be a valid, non-empty UTF-8 string. Do not use mutable natural keys (for example email) as the User ID.
  • Attributes: The record's attributes; a map of property names to their corresponding values. Additional properties not requested are not considered. The connector may omit a property for a user if that user does not have that property. This is distinct from a property with a null value.
  • UpdatedAt: The date and time the record was last updated in the application. It can have any time zone. The precision of this time is limited to microseconds; any precision beyond microseconds will be truncated. It must not be the zero time, and its year must be >= 1900 (Krenalis rejects dates too far in the future).
  • Err: optional error encountered while reading/validating this specific record. Use it to report per-record failures while still returning other records in the same request. If Err != nil, Krenalis treats the record as invalid and may ignore Attributes/UpdatedAt; ID must still be set (non-empty). Use the method return error for request-level failures (auth/network/5xx), not Err.

If an error occurs, Records must return a non-nil error, and it should not be an io.EOF error.

Tip: the connectors.Record type provides the helper methods record.IsCreate() and record.IsUpdate(). Use them to decide whether the current record should be created or updated without directly checking whether record.ID is empty.

Sending requests to the application

When a connector instance is created, an HTTP client is passed to the constructor through the ApplicationEnv.HTTPClient field. This client should be used by the connector's methods to make HTTP requests to the API. It takes care of:

  • Retrying calls in case of an error, if the request allows for it.
  • Proper resource management.
  • Adding the "Authorization" HTTP header for connectors that use OAuth.
  • Refreshing the access token for connectors that use OAuth.

The client implements the HTTPClient interface:

// HTTPClient is the interface implemented by the HTTP client used by
// connectors.
type HTTPClient interface {

    // Do sends an HTTP request and returns the response.
    //
    // If authentication is needed, it adds an Authorization header automatically.
    // It always closes the request body after the call, even if an error happens.
    // This method does *not* follow redirects.
    //
    // A request is retriable if it is idempotent and either has no body or provides
    // a GetBody function to regenerate the body.
    // A request is considered idempotent if its method is GET, HEAD, OPTIONS, or
    // TRACE, or if it includes the header "Idempotency-Key" or "X-Idempotency-Key".
    // (If the header's value is an empty slice, the request is still treated as
    // idempotent, but the header is not sent over the network.)
    //
    // See also the `http.Transport` documentation:
    // https://pkg.go.dev/net/http#Transport.
    Do(req *http.Request) (res *http.Response, err error)

    // ClientSecret returns the OAuth client secret of the HTTP client.
    ClientSecret() (string, error)

    // AccessToken returns an OAuth access token.
    AccessToken(ctx context.Context) (string, error)
}

If you need to make direct HTTP requests without using the provided client, the ClientSecret and AccessToken methods can be used by OAuth connectors to obtain the client secret and an access token for authentication with the application's API.

If a method from HTTPClient returns an error, connector methods should return that exact error, without any modification or wrapping.

Updating and creating records

To update or create a record, Krenalis uses the connector's Upsert method:

Upsert(ctx context.Context, target connectors.Targets, records connectors.Records, schema types.Type) error

This method is used during the export process to update existing users or create new ones in the application. The target parameter is set to TargetUser to indicate that the operation applies to users.

The records parameter contains a collection of records to update or create. You don't need to process all the records in the collection at once. Instead, only handle as many as you can send in a single HTTP request to the application's API. Even if the application's API supports processing only one record per request, that's fine. Krenalis will automatically call the method again for any records that remain unprocessed.

The attributes of each record conform to the provided schema parameter, which in turn conforms to the most recently retrieved destination schema.

As soon as an iteration over the records parameter completes—either when First returns or when the iteration over the sequence returned by All or Same ends—Krenalis can immediately start a new call to Upsert with the remaining unconsumed records, even if previous Upsert calls are still in progress.

This ensures that only one iteration over a given Records sequence is active at any time, but allows multiple concurrent HTTP requests to the application's API, improving throughput and efficiency.

Each call to Upsert must consume at least one record from the provided records sequence (by calling First, All, or Same). If the implementation cannot consume any record, it must return an error. If such an error is returned, the iteration is aborted and no further calls to Upsert will be made for the remaining records.

Key concept: processed records

Krenalis considers a record processed as soon as it has been read from the Records collection. To better understand how this works, let's first explore the methods provided by the Records interface. Afterward, we'll review how to use these methods effectively in various scenarios.

// Records provides access to a non-empty sequence of records to be created or
// updated by Upsert. A record to be created has an empty ID.
//
// To iterate over records, call either All, Same, or First — only one of these
// can be used per Records value:
//   - All returns an iterator over all records.
//   - Same returns an iterator over records with the same operation type
//     (create or update) as the first record.
//   - First returns the first record.
//
// Records are consumed as they are yielded by the iterator. A record is
// considered consumed once produced by the iterator, unless Postpone is called.
//
// Example:
//
//	for record := range records.All() {
//		...
//		// record is now consumed unless Postpone is called here
//		if postpone {
//			records.Postpone()
//			continue
//		}
//		...
//	}
//
// Calling Postpone during iteration marks the current record as not consumed,
// so it will be available in subsequent Upsert calls.
//
// Only one iteration (using All or Same) or call to First may be active on a
// Records value. After an iteration completes or First is called, the Records
// value must not be used again.
type Records interface {

    // All returns an iterator to read all records. Attributes of the records in the
    // sequence may be modified unless the record is subsequently postponed.
    All() iter.Seq[Record]

    // Discard discards the current record in the iteration with the provided error.
    // Discard may only be called during iterations from All or Same.
    // It panics if err is nil, or if the record has already been postponed or
    // discarded.
    Discard(err error)

    // First returns the first record. The record's attributes may be modified.
    // Use it instead of All or Some when the application only needs to create
    // or update one record at a time.
    First() Record

    // Postpone postpones the current record in the iteration and marks it as
    // unread. Postpone may only be called during iterations from All or Same, and
    // only if the record's attributes have not been modified.
    //
    // The first event must always be consumed. Calling Postpone on it will
    // cause a panic. It is safe to call Postpone multiple times on the same record.
    Postpone()

    // Peek retrieves the next record without advancing the iterator. It returns the
    // record and true if a record is available, or false if there are no further
    // records. Can only be called during an iteration with All or Same.
    // The returned record must not be modified.
    Peek() (Record, bool)

    // Same returns an iterator for records: either all records to update
    // (if the first record is for update) or all records to create
    // (if the first record is for creation). Attributes of the records in the
    // sequence may be modified unless the record is subsequently postponed.
    Same() iter.Seq[Record]
}

Sending one record at a time

The most common scenario involves an application's API that can handle only one record (user) per request. In this case, you should use the First method of Records to read only the first record.

Below is an example implementation:

func (my *MyApp) Upsert(ctx context.Context, target connectors.Targets, records connectors.Records, schema types.Type) error {

    // Read the first record.
    record := records.First()

    // Prepare request body.
    var body bytes.Buffer
    body.WriteString(`{"attributes":`)
    json.Encode(&body, record.Attributes)
    body.WriteString(`}`)

    // Prepare the method for update (PUT) or create (POST).
    method := http.MethodPut
    if record.IsCreate() {
        method = http.MethodPost
    }

    // Prepare the path.
    path := "/v1/customers"
    if method == http.MethodPut {
        path += "/" + url.PathEscape(record.ID)
    }

    // Create the HTTP request.
    req, _ := http.NewRequestWithContext(ctx, method,
        "https://api.myapp.com"+path, bytes.NewReader(body.Bytes()))

    // Mark the request as idempotent for updates.
    if method == http.MethodPut {
        req.Header["Idempotency-Key"] = nil
        req.GetBody = func() (io.ReadCloser, error) {
            return io.NopCloser(bytes.NewReader(body.Bytes())), nil
        }		
    }

    // Send the HTTP request.
    res, err := my.httpClient.Do(req)
    if err != nil {
        return err
    }
    defer res.Body.Close()

    // Check the response status code.
    if res.StatusCode != 200 {
        return fmt.Errorf("application's server response: %s", res.Status)
    }

    return nil
}

Key concepts:

  • Read the first record
    Use records.First() to read the first record that needs to be processed.

  • Determine the type of operation
    If record.IsCreate() is true, the record should be created; otherwise it should be updated.

This method ensures that only one record is processed per request, aligning with the application's API limitations. Krenalis will automatically re-invoke the method for unread records.

Batch of records of the same type (update or create)

If the application's API allows processing multiple records in a batch but requires them to be of the same type (either all updates or all creates), you can use the Peek method to peek the first record without consuming it. This helps you determine whether the batch should run an update or a create operation. After that, you can iterate over records.Same() to read only records of the same type as the first record.

Below is an example implementation:

func (m *MyApp) Upsert(ctx context.Context, target connectors.Targets, records connectors.Records, schema types.Type) error {

    // Peek at the first record to determine the type of request.
    record, _ := records.Peek()
    method := http.MethodPut
    if record.IsCreate() {
        method = http.MethodPost
    }

    // Prepare request body.
    var body bytes.Buffer
    body.WriteString(`{"customers":[`)

    n := 0
    for record := range records.Same() {
        if n > 0 {
            body.WriteString(`,`)
        }
        body.WriteString(`{`)
        if record.IsUpdate() {
            body.WriteString(`"id":`)
            json.Encode(&body, record.ID)
            body.WriteString(`,`)
        }
        body.WriteString(`"attributes":`)
        json.Encode(&body, record.Attributes)
        body.WriteString(`}`)
		n++
        if n == bodyMaxRecords {
            break
        }
    }
    body.WriteString(`]}`)

    // Create the HTTP request.
    req, _ := http.NewRequestWithContext(ctx, method,
        "https://api.myapp.com/v1/customers/batch", bytes.NewReader(body.Bytes()))

    // Mark the request as idempotent for updates.
    if method == http.MethodPut {
        req.Header["Idempotency-Key"] = nil
        req.GetBody = func() (io.ReadCloser, error) {
            return io.NopCloser(bytes.NewReader(body.Bytes())), nil
        }		
    }

    // Send the HTTP request.
    res, err := m.httpClient.Do(req)
    if err != nil {
        return err
    }
    defer res.Body.Close()

    // Check the response status code.
    if res.StatusCode != 200 {
        return fmt.Errorf("server responded with status %s", res.Status)
    }

    return nil
}

Key concepts:

  • Determine the type of operation
    Use records.Peek() to examine the first record without consuming it.

    • If record.IsUpdate() is true, it's an update.
    • If record.IsCreate() is true, it's a creation.

    Do not use the records.First() in this scenario, as it consumes the record and prevents any other methods from being called.

  • Iterate over records
    Use records.Same() to read only records of the same type as the first one. This ensures all records in the batch are valid for the chosen operation.

  • Batch size limitation
    The example demonstrates breaking the loop once the maximum number of records (bodyMaxRecords) is reached. This ensures the request complies with the application's API limits.

Batch of records of mixed types (create and update)

When the application's API allows sending multiple records of different types (both records to create and records to update) in a single HTTP request, you can iterate over all records using records.All().

Here is an example implementation:

func (m *MyApp) Upsert(ctx context.Context, target connectors.Targets, records connectors.Records, schema types.Type) error {

    // Prepare request body.
    var body bytes.Buffer
    body.WriteString(`{"customers":[`)

    n := 0
    for record := range records.All() {
        if n > 0 {
            body.WriteString(`,`)
        }
        body.WriteString(`{`)
        if record.IsUpdate() {
            body.WriteString(`"id":`)
            json.Encode(&body, record.ID)
            body.WriteString(`,`)
        }
        body.WriteString(`"attributes":`)
        json.Encode(&body, record.Attributes)
        body.WriteString(`}`)
        n++
        if n == bodyMaxRecords {
            break
        }
    }
    body.WriteString(`]}`)

    // Create the HTTP request.
    req, _ := http.NewRequestWithContext(ctx, http.MethodPost,
        "https://api.myapp.com/v1/customers/batch", &body)

    // Send the HTTP request.
    res, err := m.httpClient.Do(req)
    if err != nil {
        return err
    }
    defer res.Body.Close()

    // Check the response status code.
    if res.StatusCode != 200 {
        return fmt.Errorf("server responded with status %s", res.Status)
    }

    return nil
}

Key concepts:

  • Iterating over all records
    The method records.All() is used to iterate over both types of records—those to be created and those to be updated. This makes it possible to process mixed batches in a single request.

  • Determine the type of operation
    If record.IsUpdate() is true, it's an update; if record.IsCreate() is true, it's a creation.

  • Limit on records
    The loop stops once the maximum number of records (bodyMaxRecords) is reached, ensuring that the body size does not exceed the application's API limit.

This approach allows you to efficiently handle mixed record types (create and update) in a single batch request, reducing the number of API requests required.

Handling body size limits

In the previous examples, the loop stops when the number of records reaches the application's API maximum limit. However, if the API imposes a body size limit rather than a record count limit, you can use the Postpone method to postpone a record after it has been read. This ensures that the record remains unprocessed and can be included in a subsequent call to the Upsert method.

Below is an example implementation:

body.WriteString(`{"records":[`)

first := true
for record := range records.All() {

    // Track length before adding the record.
    size := body.Len()

    if !first {
        body.WriteString(`,`)
    }
    first = false

    // Build the record JSON object.
    body.WriteString(`{`)
    if record.IsUpdate() {
        body.WriteString(`"id":`)
        json.Encode(&body, record.ID)
        body.WriteString(`,`)
    }
    body.WriteString(`"attributes":`)
    json.Encode(&body, record.Attributes)
    body.WriteString(`}`)

    // Stop if body exceeds the API's size limit.
    if body.Len() + len(`]}`) > bodySizeLimit {
        body.Truncate(size)
        records.Postpone()
        break
    }

}

body.WriteString(`]}`)

Key concepts:

  • Tracking body size
    Before adding a record to the request body, the current size of the body is tracked using body.Len(). This allows for easy truncation if the body size limit is exceeded.

  • Truncating the body
    To ensure the request is valid, the body.Truncate(size) method removes the last added record from the body. This prevents the body from exceeding the size limit while maintaining a valid JSON structure.

  • Using Postpone to reprocess records
    When the body size exceeds the limit:

    • The Postpone method is called to notify Krenalis that the last processed record has been postponed.
    • The processed records remain unchanged, meaning they can potentially be postponed later.
    • This postponed record will remain unprocessed and will be included in the next call to the Upsert method.

Error handling

If, during the iteration over the record sequence, a record cannot be processed—for example, because it fails validation—you should call the Discard method on the iterator:

n := 0
for record := range records.All() {
    if !valid(record) {
        records.Discard(errors.New("record is invalid"))
        continue
    }
    // ...
    n++
}

// Return early if all records have been discarded.
if n == 0 {
    return nil
}

Unlike postponed records, discarded records will not be retried in future requests to Upsert.

If a validation error occurs after sending the request to the application, you should return a RecordsError. This type of error lets you indicate which records failed and why:

// RecordsError is returned by the Upsert method of an application connector
// when only some records have failed or when the method can distinguish errors
// based on individual records. It maps record indices to their respective
// errors.
type RecordsError map[int]error

If the error affects all records—such as when the entire request fails—you should return a generic error. In that case, all processed records will be marked with the same error.

Key concepts:

  • Discarding records during iteration
    If a record fails validation before sending, you can discard it during iteration using records.Discard(err). Discarded records are removed from processing entirely and will not be retried in future calls to Upsert.

  • Handling individual record errors
    When certain records fail due to validation issues (e.g., returned by the application), you can return a RecordsError that maps each failed record to its specific error, instead of returning a single error for the whole batch.

  • Error index mapping
    Each key in the RecordsError type represents the index of a failed record (in the order they were consumed and likely sent in the HTTP request), and each value holds the corresponding error.

When to validate records

When it comes to record validation, there are a few possible scenarios depending on the target application API:

  • The API never returns validation errors. In this case, your connector should validate records as much as possible before sending them. This allows users to quickly understand why certain records aren't accepted by the API.

  • The API validates records but only returns a single error in the response, typically the first error encountered. In this case, it's important that your connector performs validation ahead of time — otherwise, a validation error on a single record would cause all records in the same request to be marked as invalid. If the API still returns a validation error, you should return a generic error, which will mark all records as invalid.

  • The API validates records and returns a separate error for each invalid record. In this case, you can return a RecordsError that maps each failed record to its corresponding error.