Create a Salesforce sink connector for Aiven for Apache Kafka® Early availability
The Salesforce sink connector writes records from Apache Kafka® topics to Salesforce objects, such as Account or Contact.
It consumes Kafka records and submits them as Salesforce Bulk API 2.0 insert jobs.
The connector provides at-least-once delivery. If records are retried before Kafka offsets are committed, Salesforce can receive duplicate records.
Prerequisites
- An Aiven for Apache Kafka® service with Apache Kafka Connect enabled, or a dedicated Aiven for Apache Kafka Connect® service.
- A Salesforce account with the following:
- A connected app configured for the OAuth 2.0 client credentials flow, with a client ID and client secret. For setup details, see Configure the client credentials flow for external client apps in the Salesforce documentation.
- Bulk API 2.0 access enabled.
- The target Salesforce object already exists, for example
AccountorContact.
- A Kafka topic that contains the records to send to Salesforce.
- Kafka records that use one of the following value formats:
Structvalues with a schemaMapvalues with a schema- Schemaless
Mapvalues, such as JSON records produced whenvalue.converter.schemas.enableis set tofalse
- Field names in
Structvalues or keys inMapvalues that match Salesforce field API names on the target object.
Create a Salesforce sink connector configuration file
Create a file named salesforce_sink_connector.json with the following configuration:
{
"name": "salesforce_sink_connector",
"connector.class": "io.aiven.kafka.connect.salesforce.sink.SalesforceSinkConnector",
"tasks.max": "1",
"topics": "test_topic",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"salesforce.bulk.api.sink.object": "Contact",
"salesforce.client.id": "YOUR_OAUTH_CLIENT_ID",
"salesforce.client.secret": "YOUR_OAUTH_CLIENT_SECRET",
"salesforce.oauth.uri": "https://YOUR_INSTANCE.salesforce.com/services/oauth2/token",
"salesforce.uri": "https://YOUR_INSTANCE.salesforce.com",
"salesforce.api.version": "v65.0",
"salesforce.max.retries": "3",
"offset.flush.interval.ms": "60000",
"offset.flush.timeout.ms": "30000"
}
The example uses schema-based JSON values. To use schemaless JSON values, set
value.converter.schemas.enable to false.
Parameters:
name: Name of the connector.connector.class: Useio.aiven.kafka.connect.salesforce.sink.SalesforceSinkConnector.tasks.max: Maximum number of connector tasks.topics: Kafka topics to read from.value.converter: Converter for Kafka record values. Useorg.apache.kafka.connect.json.JsonConverterfor JSON. For Avro, use the appropriate converter and setschema.registry.url.value.converter.schemas.enable: Set totrueto use schema-based JSON values. Set tofalseto use schemaless JSON values. The connector supports both schema-basedStructvalues and schemalessMapvalues.salesforce.bulk.api.sink.object: Salesforce object to write records to, such asAccountorContact.salesforce.client.id: OAuth 2.0 client ID from your Salesforce connected app.salesforce.client.secret: OAuth 2.0 client secret from your Salesforce connected app.salesforce.oauth.uri: OAuth token endpoint for your Salesforce org. Set together withsalesforce.uriso both match your deployment.salesforce.uri: Base URL of your Salesforce instance.salesforce.api.version: Salesforce REST API version string supported by your org. Examples usev65.0.salesforce.max.retries: Maximum retries for Salesforce API and authentication requests. See the configuration reference.offset.flush.interval.ms: How often, in milliseconds, Kafka Connect flushes records to Salesforce. Default:60000. Each flush is submitted as one Bulk API 2.0 insert job. A larger interval reduces Salesforce API calls but produces larger batches that take longer to process and can require a higheroffset.flush.timeout.ms.offset.flush.timeout.ms: Maximum time in milliseconds for a flush before Kafka Connect marks it failed. Default:5000. The configuration examples use30000when each flush includes many records or when Salesforce Bulk API responses are slower than the default allows. Raise this value if flush operations exceed5000ms or fail with timeout errors.
For all available configuration options, see the Salesforce sink connector configuration reference.
Create the connector
- Aiven Console
- Aiven CLI
-
Access the Aiven Console.
-
Select your Aiven for Apache Kafka® or Aiven for Apache Kafka Connect® service.
-
Click Connectors.
-
Click Create connector if Apache Kafka Connect is already enabled on the service. If not, click Enable connector on this service.
Alternatively, to enable connectors:
- Click Service settings in the sidebar.
- In the Service management section, click Actions > Enable Kafka connect.
-
In the sink connectors list, select Salesforce, and click Get started.
-
On the Salesforce connector page, go to the Common tab.
-
Locate the Connector configuration text box and click Edit.
-
Paste the configuration from your
salesforce_sink_connector.jsonfile into the text box. -
Click Create connector.
-
Verify the connector status on the Connectors page.
To create the Salesforce sink connector using the Aiven CLI, run:
avn service connector create SERVICE_NAME @salesforce_sink_connector.json
To check the connector status, run:
avn service connector status SERVICE_NAME CONNECTOR_NAME
Parameters:
SERVICE_NAME: Name of your Aiven for Apache Kafka® or Aiven for Apache Kafka Connect® service.@salesforce_sink_connector.json: Path to your JSON configuration file.CONNECTOR_NAME: Value of thenamefield in the connector configuration.
Verify that records are written to the expected Salesforce object.
Verify the connector
After you create the connector, confirm that records are flowing to Salesforce:
- Open the target Salesforce object, for example
Contact. - Confirm that new records appear.
- Optionally, verify that field names and values match the records in the Kafka topic.
For a sample record shape when writing to Contact, see
Example record for Salesforce.
Example record for Salesforce
If the connector writes to the Salesforce Contact object, a Kafka record value can use
schema-based or schemaless JSON. For schemaless JSON, the value can look like this:
{
"FirstName": "Alice",
"LastName": "Example",
"Email": "alice@example.com"
}
The connector uses the JSON keys as Salesforce field API names: FirstName,
LastName, and Email.
Data format
The connector accepts Kafka record values as Struct or Map values.
For Struct values, use field names that match Salesforce object field API names.
For Map values, use keys that match Salesforce object field API names.
For example, the following Kafka record value:
{
"Name": "Alice",
"Email": "alice@example.com",
"ExternalId__c": "EXT001"
}
is written to Salesforce using the following field names:
Name,Email,ExternalId__c
For custom Salesforce fields, use the Salesforce API field name, such as
ExternalId__c.
Schema behavior
The connector detects Salesforce field names dynamically from the records buffered
during each flush. For Struct values, it uses schema field names. For Map values,
it uses map keys.
If records in the same flush contain different fields or keys, the connector creates one CSV header that includes all detected Salesforce field names.
For example, if one record contains Name and Email, and another record contains
Name and ExternalId__c, the generated CSV header includes all three fields:
Email,ExternalId__c,Name
Records that do not contain a detected field or key are sent with an empty value in that column.
To reduce failures, keep records in a topic consistent with the target Salesforce object schema.
How it works
When Kafka Connect flushes records, the connector:
- Collects buffered records for that flush interval.
- Detects field names from the buffered records.
- Uses schema field names for
Structvalues and map keys forMapvalues. - Creates CSV data with a header row that contains the detected field names.
- Sends the CSV data to Salesforce as a Bulk API 2.0 multipart insert job.
- Polls Salesforce until the job reaches a final state.
- Commits Kafka offsets after the Salesforce job completes successfully.
Limitations
The Salesforce sink connector has the following limitations:
- Insert only: Only insert operations are supported. Update and upsert operations are not supported.
- At-least-once delivery: The connector can write the same record more than once if a task restarts or Kafka Connect replays records before offsets are committed.
- Supported value types: The connector processes
StructandMapvalues. Records with other value types are skipped. If the errant record reporter is configured, skipped records are reported as errant records. If a flush contains no supported values (StructorMap), the batch can fail. - No automatic data mapping: The connector does not transform field names, keys, or values. Use Kafka record field names or keys that match Salesforce field API names.
- Dynamic fields: Field names and keys are discovered dynamically from buffered records. Inconsistent fields or keys across records in the same flush interval produce sparse rows with empty values where a field is missing.
- Single batch per flush: All records buffered within one flush interval are submitted as a single Salesforce Bulk API 2.0 job. Large batches may approach Salesforce API limits or exceed the Kafka Connect flush timeout.
Related pages