apache-kafkaneo4jconfluent-schema-registry

Kafka-Neo4J-Connector: $lastCheck not updating if no timestamp is returned


I followed the quickstart-tutorial for the Neo4J-Kafka-Connector. The following source instance cypher

MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp

inserts name, surname, timestamp rows in a kafka topic, if the ts.timestamp > $lastCheck condition is satisfied.

If I am removing the ts.timestamp as timestamp from the return-clause, then the $lastCheck parameter isn't updated anymore, so the where condition is always true and the whole graph is queried for every request. Is there a way to update the $lastCheck parameter without returning a timestamp? This would be great, especially for aggregations, if a grouping on the timestamp is not wanted.

EDIT

To illustrate the problem a bit better, I added the $lastCheck parameter to the return clause. So my first query

MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp, $lastCheck as lastCheck

writes the following messages into my topic: Topic after first topic

Here, I inserted the example TestSource nodes twice (after a short delay), to show the updated $lastCheck parameter.

CREATE (:TestSource {name: 'john', surname: 'doe', timestamp: datetime().epochMillis}); CREATE (:TestSource {name: 'mary', surname: 'doe', timestamp: datetime().epochMillis}); CREATE (:TestSource {name: 'jack', surname: 'small', timestamp: datetime().epochMillis});

Now I remove the ts.timestamp as timestamp from the return clause.

MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, $lastCheck as lastCheck

The nodes in the database were deleted, and I inserted them only once. The messages in the topic look like this:

Topic after second query

The screenshot only shows the first seven, but these messages are printed infinitely. Sometimes the $lastCheck in the topic is also just returned by the second query as -1.


Solution

  • You deleted timestamp from the source instance's neo4j.source.query value, but presumably kept the neo4j.streaming.property still assigned to "timestamp".

    According to the documentation, the optional neo4j.streaming.property property is:

    The name of the property that we need to consider in order to determinate the last queried record; if not defined we use an internal value given from the last performed check. We use this value for injecting it in the provided query defined in neo4j.source.query as $lastCheck parameter.

    I fixed a typo in the above quote.

    Try deleting the optional neo4j.streaming.property property so that the internal value can be used instead.