I want to continuously update a document's field named value
{'_id': 'count', 'value':0}
by a certain number.
My MongoSinkConnector has
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy
I'm using a python script to produce messages to the appropiate topic
self._aio_producer.produce(
topic='mongo',
value=json.dumps(
{
"_id":"count",
"$inc":{"value":len(task['payload'].split(','))}
}
)
)
But I get this error on the Kafka Connect standalone process:
Failed to put into the sink the following records: [SinkRecord{kafkaOffset=8174, timestampType=CreateTime} ConnectRecord{topic='mongo', kafkaPartition=1, key=null, keySchema=null, value={_id=count, $inc={value=1}}, valueSchema=null, timestamp=1697153679938, headers=ConnectHeaders(headers=)}]
(com.mongodb.kafka.connect.sink.MongoSinkTask:244)
com.mongodb.kafka.connect.sink.dlq.WriteException: v=1, code=52, message=The dollar ($) prefixed field '$inc' in '$inc' is not allowed in the context of an update's replacement document. Consider using an aggregation pipeline with $replaceWith., details={}
I've tried dropping the $inc
part, but it seems to just be replacing the document over and over without incrementing the value. Is there any way to increment a value or do I have to write my own custom Class?
Currently the way is to create your own Custom WriteModel Strategy.
BsonDocument setOnInsertFields =
new BsonDocument().append(A_FIELD_NAME, aValue).append(B_FIELD_NAME, bValue);
BsonDocument incFields =
new BsonDocument()
.append(C_COUNT_FIELD_NAME, cType)
.append(D_COUNT_FIELD_NAME, DType);
// Create new document with specific update operations
// such as set or setOnInsert, inc, etc
BsonDocument newDocument =
new BsonDocument().append("$setOnInsert", setOnInsertFields).append("$inc", incFields);