I followed the quickstart-tutorial for the Neo4J-Kafka-Connector. The following source instance cypher
MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp
inserts name, surname, timestamp
rows in a kafka topic, if the ts.timestamp > $lastCheck
condition is satisfied.
If I am removing the ts.timestamp as timestamp
from the return-clause, then the $lastCheck
parameter isn't updated anymore, so the where condition is always true and the whole graph is queried for every request. Is there a way to update the $lastCheck
parameter without returning a timestamp? This would be great, especially for aggregations, if a grouping on the timestamp is not wanted.
EDIT
To illustrate the problem a bit better, I added the $lastCheck
parameter to the return clause. So my first query
MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp, $lastCheck as lastCheck
writes the following messages into my topic:
Here, I inserted the example TestSource
nodes twice (after a short delay), to show the updated $lastCheck
parameter.
CREATE (:TestSource {name: 'john', surname: 'doe', timestamp: datetime().epochMillis}); CREATE (:TestSource {name: 'mary', surname: 'doe', timestamp: datetime().epochMillis}); CREATE (:TestSource {name: 'jack', surname: 'small', timestamp: datetime().epochMillis});
Now I remove the ts.timestamp as timestamp
from the return clause.
MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, $lastCheck as lastCheck
The nodes in the database were deleted, and I inserted them only once. The messages in the topic look like this:
The screenshot only shows the first seven, but these messages are printed infinitely. Sometimes the $lastCheck
in the topic is also just returned by the second query as -1
.
You deleted timestamp
from the source instance's neo4j.source.query
value, but presumably kept the neo4j.streaming.property
still assigned to "timestamp".
According to the documentation, the optional neo4j.streaming.property
property is:
The name of the property that we need to consider in order to determinate the last queried record; if not defined we use an internal value given from the last performed check. We use this value for injecting it in the provided query defined in
neo4j.source.query
as$lastCheck
parameter.
I fixed a typo in the above quote.
Try deleting the optional neo4j.streaming.property
property so that the internal value can be used instead.