Table of contents
- Overview
- Writing data via configuration (DMSDK)
- Writing data via custom code (Bulk Data Services)
- Dead Letter Queue configuration
- Sink connector error handling
- JSON-based Connector Configuration
Overview
By default, the MarkLogic Kafka connector assumes that the app server associated with the port defined by the ml.connection.port
property is a REST API app server - that is, the value of its url rewriter
property is /MarkLogic/rest-api/rewriter.xml
or a variation of that rewriter. This allows the MarkLogic connector to use the MarkLogic Data Movement SDK, also known as DMSDK, to efficiently write batches of documents to MarkLogic without any custom coding required. The connector user can ideally meet all of their ingestion requirements simply by configuring properties that control how data is written. And in the event that some transformation of the data is required, a MarkLogic REST transform can be specified to perform code-driven transformations.
However, if you find that are not able to meet your requirements via the connector properties and optionally with a REST transform, you may instead configure the connector to write data via a custom Bulk Data Services endpoint. Bulk Data Services is intended to integrate with dataflow frameworks like Kafka and provide endpoint developers with complete control over how data is processed. This approach requires expertise with implementing a Bulk Data Services endpoint and is thus recommended only for users with experience in writing and deploying custom code to MarkLogic.
Writing data via configuration (DMSDK)
The intent behind using DMSDK with the MarkLogic REST API is that as many aspects of writing data to MarkLogic can be controlled via properties without the need to write any code.
Security requirements
The user that the connector authenticates as must have the rest-reader
and rest-writer
privileges in order to write data via the MarkLogic REST API, which the connector depends upon.
Configuring document URIs
Documents in a MarkLogic database are uniquely identified by a URI. By default, a UUID is generated and used as the URI of each document written to MarkLogic. The following properties allow for strings to be prepended and appended to the UUID:
ml.document.uriPrefix
= a string to be prepended to each URIml.document.uriSuffix
= a string to be appended to each URI
The ml.id.strategy
property can be used to select an identifier other than a UUID. The generated identifier will still have the values of ml.document.uriPrefix
and ml.document.uriSuffix
applied if they are set. The choices for this property are:
KAFKA_META_WITH_SLASH
- an identifier will be generated with the following form:{topic}/{partition}/{offset}
.KAFKA_META_HASHED
- an identifier will be generated by concatenating the topic, partition, and offset together and generating a SHA-512 hash of the concatenated value. If an error occurs while hashing the identifier, a UUID is used instead.JSON_PATH
- you must also specify a JSON Pointer expression via theml.id.strategy.paths
property. The content of each Kafka message must be a JSON document, and the JSON Pointer expression should reference a value in the JSON document.HASH
- you must also specify one or more JSON Pointer expressions via theml.id.strategy.paths
property. The content of each Kafka message must be a JSON document. The value of each JSON Pointer expression will be concatenated together, with the result used to generate an SHA-512 hash. If an error occurs while hashing the identifier, a UUID is used instead.
Configuring document metadata
The following properties control how each document is written:
ml.document.format
= the format of the written document; either JSON, XML, BINARY, TEXT, or UNKNOWNml.document.collections
= a comma-separated list of collections that each document should be included inml.document.permissions
= a comma-separated list of permissions that will be added to each document; formatted as role1,capability1,role2,capability2,etcml.document.temporalCollection
= name of a temporal collection that each document should be included inml.document.addTopicToCollections
=true
if the name of the topic associated with the connector should be used as a collection for each documentml.document.mimeType
= the MIME type for each document; this is an alternative toml.document.format
for specifying the format of each document
Configuring a REST transform
A MarkLogic REST transform provides a mechanism for writing custom code to transform a document before it is written, which is otherwise not possible to do via the existing set of connector properties.
To use a transform, configure the following properties:
ml.dmsdk.transform
= the name of the REST transform to useml.dmsdk.transformParams
= comma-separated list of transform parameter names and values; e.g. param1,value1,param2,value2ml.dmsdk.transformParamsDelimiter
= the delimiter forml.dmsdk.transformParams
; defaults to a comma
If you are using ml-gradle to manage your MarkLogic project, you can put REST transforms in the src/main/ml-modules/transforms
directory in your project.
Including Kafka metadata
Each Kafka record passed to the MarkLogic connector has several pieces of metadata that can be useful both for including in written documents and for debugging when a record fails to be written. Kafka metadata can be included in each document by configuring the following property:
ml.dmsdk.includeKafkaMetadata
=true
to include Kafka metadata
This will result in the following pieces of Kafka record metadata being in each associated document as MarkLogic document metadata values:
kafka.topic
= the Kafka topic associated with the recordkafka.key
= the key of the Kafka record (may be null)kafka.offset
= the offset of the Kafka recordkafka.partition
= the partition of the Kafka recordkafka.timestamp
= the timestamp of the Kafka record
Including Kafka headers
Each Kafka record passed to the MarkLogic connector also has headers that may contain useful information which can be included in the metadata written with documents. This includes the headers that are included in Kafka records by default as well as any custom headers. Kafka headers can be included in each document by configuring the following property:
ml.dmsdk.includeKafkaHeaders
=true
to include Kafka headers
When the headers are added to the document metadata, they are simply given the same name as the key for the header. However, you may also specify a prefix that will be prepended to each header key. To set that prefix, use the following property:
ml.dmsdk.includeKafkaHeaders.prefix
=<prefix>
to be prepended to header keys in the metadata.
The headers that are on the Kafka records will depend on the Kafka distribution you are using and the message producer configuration.
Configuring DMSDK performance
The performance of how data is written to MarkLogic can be configured via the following properties:
ml.dmsdk.batchSize
= the number of documents written in one call to MarkLogic; defaults to 100ml.dmsdk.threadCount
= the number of threads used for making calls to MarkLogic; defaults to 8
The size of each batch will only have an impact if Kafka is sending a number of records greater than the batch size each time it sends data to the MarkLogic connector. For each collection of records that the MarkLogic connector receives, it will perform an asynchronous flush of documents to MarkLogic. Thus, if the connector is using a batch size of 100 and it receives 10 records from Kafka, it will immediately (but asynchronously) write those to MarkLogic. If Kafka never sends at least 100 records to the connector, then increasing the batch size will not have any impact. Information on how Kafka can be configured to control how much data it sends to a connector is specific to a Kafka installation; please see the documentation for your Kafka distribution for more information.
Similar to the batch size, the number of threads used by the connector will only have an impact if the connector is being called frequently enough by Kafka to use multiple threads. This property has some overlap with a generic Kafka connector property named tasks.max
, which effectively controls how many instances of the MarkLogic connector are instantiated. Depending on the amount of data you are trying to send from Kafka to MarkLogic, you may want to raise both of these values to maximize performance. The MarkLogic Monitoring dashboard is helpful in understanding MarkLogic resource consumption and whether changing these values has any impact on performance.
Configuring a DHF flow to be run
The MarkLogic connector includes support for running a Data Hub Framework (DHF) flow after a batch of documents is written to MarkLogic. The following properties can be used to configure this feature:
ml.datahub.flow.name
= name of the flow to runml.datahub.flow.steps
= comma-separated list of step numbers in the flow to runml.datahub.flow.logResponse
=true
if the JSON response generated by DHF should be logged by the connector
Note that only “query” steps should be run. Running a DHF ingestion step typically will not be meaningful as an ingestion step depends on reading data from a filesystem. You can however run an ingestion step by configuring a REST transform as described above. Please see the DHF documentation for information on how to configure the DHF REST transform for running an ingestion step.
Writing data via custom code (Bulk Data Services)
MarkLogic’s Bulk Data Services feature is intended to give developers complete control over how data is written to MarkLogic via an external program, which is typically a dataflow framework like Kafka that can support multiple workers writing to MarkLogic at the same time. The MarkLogic Kafka connector utilizes Bulk Data Services to send Kafka record data to a custom endpoint in which a developer can write any code they like to control how the data is processed.
Security requirements
Unlike when using the MarkLogic REST API, no specific privileges or roles are required in order for the connector to invoke a Bulk Data Services endpoint. Instead, the required privileges and/or roles for the MarkLogic user that the connector authenticates as will be entirely determined by the Bulk Data Services endpoint implementation.
Configuring Bulk Data Services usage
Configuring the MarkLogic Kafka connector to use Bulk Data Services involves the following properties:
ml.sink.bulkds.endpointUri
= the URI of the Bulk Data Services endpoint moduleml.sink.bulkds.batchSize
= an optional batch size; defaults to 100. Note that if you include$bulk/inputBatchSize
in your API declaration, it will be ignored in favor of this property.
Bulk Data Services requires that your endpoint module have an API declaration. The URI of the API declaration must match that of your endpoint, but with .api
as a suffix instead of .sjs
or .xqy
. The MarkLogic Kafka connector expects the API declaration to have the following configuration:
{
"endpoint": "/path/to/your/endpoint.sjs",
"params": [
{
"name": "endpointConstants",
"datatype": "jsonDocument",
"multiple": false,
"nullable": false
},
{
"name": "input",
"datatype": "jsonDocument",
"multiple": true,
"nullable": true
}
]
}
The endpoint
field should have the same value as the ml.sink.bulkds.endpointUri
property.
It is recommended to start your endpoint module implementation with the following code:
'use strict';
declareUpdate();
var input;
var endpointConstants;
const inputSequence = input instanceof Document ? [input] : input;
endpointConstants = fn.head(xdmp.fromJSON(endpointConstants));
for (let item of inputSequence) {
item = fn.head(xdmp.fromJSON(item));
// Determine what to do with each item
}
As shown in the code above, input
will either be a single document (if the batch size is 1) or a sequence. Normalizing this variable into a sequence typically simplifies coding the rest of the endpoint module.
Additionally, the expression fn.head(xdmp.fromJSON(arg))
is used to convert the arg into a proper JSON object. This is typically desirable for coding the rest of the endpoint module as well.
The input
variable then consists of a JSON object for each Kafka record passed to the connector. That JSON object contains the following fields:
content
= a string representation of the data in the Kafka recordkafkaMetadata
= a JSON object containing the following fields:topic
,key
,offset
,partition
, andtimestamp
The endpointConstants
variable is a JSON object. It contains one field for every non-empty connector property whose name starts with ml.document.
. These properties are included to support an endpoint developer who wishes to make a dynamic endpoint that is driven by the values of these properties instead of them being hardcoded within the endpoint module. This can allow for reusing the same endpoint module across multiple connector instances.
Note that when using this approach, none of the properties starting with the following prefixes will have any impact, as they are specific to the approach that uses DMSDK:
ml.datahub
ml.dmsdk
ml.document
ml.id.strategy
As noted above, the ml.document
properties will be included in endpointConstants
, but they otherwise have no impact because the endpoint developer chooses whether to apply them or not in their endpoint module.
Configuring Bulk Data Services performance
MarkLogic’s Bulk Data Services feature is designed to leverage the multi-threading and parallelization support provided by existing dataflow frameworks. To achieve parallel writes to MarkLogic and increase performance, you should configure the generic Kafka connector property named tasks.max
to control how many Kafka connector tasks will write to MarkLogic in parallel.
A key design feature of Bulk Data Services to understand is that, unlike MarkLogic’s Data Movement SDK, it does not support asynchronous flushing of data. Bulk Data Services will not write any data to MarkLogic until it has a number of documents equalling that of the ml.sink.bulkds.batchSize
property. Importantly, partial batches will not be written until either enough records are received to meet the batch size, or until Kafka invokes the flush
operation on the MarkLogic Kafka connector. You can use the Kafka connector property named offset.flush.interval
to control how often the flush operation is invoked. This is a synchronous operation, but you may wish to have this occur fairly regularly, such as every 5 or 10 seconds, to ensure that partial batches of data are not waiting too long to be written to MarkLogic.
As always with MarkLogic applications, use the MarkLogic Monitoring dashboard to understand resource consumption and server performance while testing various connector settings.
Dead Letter Queue configuration
Starting with version 1.8.0, the sink connector makes use of the dead letter queue (DLQ) if the user has configured Kafka appropriately. Please see the Kafka documentation for more information on configuring the dead letter queue in Kafka.
Note that the DLQ is only supported when using DMSDK to write documents to MarkLogic; DLQ support for Bulk Data Services will be supported in a future release.
When Kafka has been configured to use the DLQ, there are two events in the sink connector that will cause a record to be sent to the DLQ.
- “Record conversion” : If a specific record cannot be converted into the target format to be delivered to MarkLogic, then that record will be sent to the DLQ.
- “Write failure” : If a batch of documents (converted Kafka records from the source topic) fails to be written to MarkLogic then each of the records in the batch will be sent to the DLQ. The entire batch must be sent to the DLQ since the connector is unable to determine the cause of the failure.
When a record is sent to the DLQ, the connector first adds headers to the record, providing information about the cause of the failure in order to assist with troubleshooting and potential routing.
- “marklogic-failure-type” : Either “Write failure” or “Record conversion”
- “marklogic-exception-message” : Information from MarkLogic when there is a write failure
- “marklogic-original-topic” : The name of the topic that this record came from
- “marklogic-target-uri” : For write failures, this contains the target URI for the document
For those headers to be populated properly, the version of this connector must be compatible with the version of Kafka that is being used. The 1.8.0 and 1.9.0 versions of the connector work with Kafka versions before 3.8. Starting with the connector version 1.10.0, the Kafka version must be 3.8 or later.
Sink connector error handling
The most common cause of errors in the MarkLogic Kafka sink connector is those occurring when a batch of documents is written to MarkLogic. The connector provides the following support for these errors:
- The error message and size of the failed batch is logged at the “ERROR” level.
- If the
ml.dmsdk.includeKafkaMetadata
option is set to “true”, then each failed record in the batch will have its URI and associated Kafka metadata logged at the “ERROR” level. - If Dead Letter Queue (DLQ) support has been configured as described above and DMSDK is used to write documents to MarkLogic, then each failed record in the batch will be sent to the user-defined DLQ topic.
The connector also provides support for the rare instance where a Kafka sink record cannot be converted into a document to be written to MarkLogic. Such an error will be logged at the “ERROR” level, and if DLQ support is enabled by the user, the sink record will be sent to the DLQ topic.
In addition, always check the MarkLogic server log files, particularly those associated with the MarkLogic app server that the connector connects to, for additional information about the error.
Finally, it is possible for an unexpected error to occur within the connector. Contrary to a source connector, which is required to catch any error that occurs, an unexpected error in the sink connector will be thrown and eventually caught and logged by Kafka. However, nothing will be sent to the user-configured DLQ topic in this scenario as the error will not be associated with a particular sink record. Kafka and MarkLogic server logs should be examined to determine the cause of the error.
JSON-based Connector Configuration
Some Kafka environments permit REST-based instantiation of connectors. Confluent is one of those environments. Please see the Confluent documentation to read about this technique. Examples of JSON files to use with the REST service can be found in examples/ConfluentConnectorConfigs.