I want to create/update a set of records in one batch processing possible update operation atomically. I have a set of keys and corresponding data for each of them. I also have a UDF registered which checks if record already exists. If so, it does additional work based on old record data and new record data. Then updates the record.
According to documentation and examples, batch_apply allows to apply function to the provided set of keys. But, the example uses same data for the all keys. I need specific data for specific key.
I tried calling the following:
data = { k1: d1, k2: d2, k3: d3}
keys = list(data.keys())
asclient.batch_apply(keys, "my_module", "my_function", data)
with my_function being defined as:
function my_function(rec, args)
if aerospike:exists(rec) then
pk = record.key(rec)
data = args[pk]
-- update record data using new data and old data
some code for data manipulation
-- update record
aerospike:update(rec)
else
-- no record stored yet, store using data for the key
pk = record.key(rec) -- returns nil as record doesn't exist
data = args[pk] -- fails
aerospike:create(rec)
end
end
I then tried requesting key through policy as:
data = { k1: d1, k2: d2, k3: d3}
keys = list(data.keys())
asclient.batch_apply(keys, "my_module", "my_function", data, policy_batch_apply={'key': aerospike.POLICY_KEY_SEND})
to get the key from the record:
function my_function(rec, args)
if aerospike:exists(rec) then
...
else
pk = record.key(rec) -- returns nil as record doesn't exist
-- create empty record to get the key stored thanks to policy
aerospike:create(rec)
-- finally, key is there
pk = record.key(rec)
data = args[pk]
-- set data to record
...
-- update empty record with actual data
aerospike:update(rec)
end
end
This works, but only if keys contains only 1 key. If I put 2, I see the following warning in the AS log
Oct 01 2024 09:19:37 GMT: WARNING (batch): (batch.c:1015) batch must not repeat key
Oct 01 2024 09:19:37 GMT: WARNING (batch): (batch.c:1188) Batch keys mismatch. Expected 2 Received 1
and my_function is called for the first key only. I looked at the code and it seems like the REPEAT flag is set implicitly if aerospike.POLICY_KEY_SEND but this is not described anywhere.
As @pgupta mentioned, python client version 15.0.0 introduced a fix for REPEAT flag being set for batch writes.
I can explain some basic concepts from Java client perspective. Hope that will help your issue. Other clients should have equivalent APIs. Also, assuming latest server version.
1 - Regarding sendKey = true - it checks for hash collision. Aerospike stores/locates a record by a 20 byte RIPEMD-160 hash computed using the user supplied key, the set name (if using set) and key type (inferred). In addition, during writes, you can store this user key with the record with sendKey=true policy. When you update or read a record with sendKey=true policy, the record is located by digest but also the stored user key is double checked against your key in this transaction - thus detecting a possible hash collision. Most users don't do this check because the probability is next to zero. However, if you read or update with sendKey=true but did not originally create the record with sendKey = true, the check will fail because there was no user key stored with the record.
2 - Regarding updating in batch with each record having its own data - might as well update each record individually. But if you want to do batch write with UDF, you will have to use (in Java) operate(BatchPolicy policy, List < BatchRecord > records) api - equivalent for your client. https://javadoc.io/doc/com.aerospike/aerospike-client-jdk8/latest/index.html . Now BatchUDF extends BatchRecord will let you set data for each record separately. The Aerospike Java client code is in public repo - this link here as test examples of BatchUDF that may help. https://github.com/aerospike/aerospike-client-java/blob/master/test/src/com/aerospike/test/sync/basic/TestUDF.java#L256
(Check this for python client: https://aerospike-python-client.readthedocs.io/en/latest/aerospike_helpers.batch.html )