scalaakkainfluxdbreactive-streamsalpakka

Influxdb Alpakka connector not writing to database


I'm trying to write into the Influxdb (running in a docker container with version 2.0). I'm using Scala and Reactive Streams. Therefore the Alpakka connector (https://doc.akka.io/docs/alpakka/current/influxdb.html) because the Scala Reactive Client (https://github.com/influxdata/influxdb-client-java/tree/master/client-scala) does not support writing into the database.

No matter how I try to write into the database, data is not written into it.

Source
    .tick(1.seconds, 1.seconds,
      Seq(
        InfluxDbWriteMessage.create(
          Point
            .measurement("cpu_load_short")
            .addField("host", "server01")
            .addField("value", 0.64)
            .tag("region", "us-west")
            .time(DateTime.now.getMillis, java.util.concurrent.TimeUnit.MILLISECONDS)
            .build,
        ).withDatabaseName("database"),
      )
    )
    .toMat(
      InfluxDbSink.create()(
        InfluxDBFactory.connect("http://localhost:9091", "admin", "admin123")
      )
    )(Keep.right)
    .run.andThen { case Success(posts) => print("done") }
  

Also "done" is never printed, so I assume the future is never completed and therefore somewhere is a problem.

The only thing that gets printed is

Pong{version=2.1.1, responseTime=68}

What am I missing, so that writing is not possible. Is it because the Alpakka connector is written for InfluxDB prior version 2 and therefore it does not work?


Solution

  • Although I did not try it myself, the official InfluxDB Alpakka connector may not work to write records to InfluxDB 2.x, so I guess your observation is correct.

    What worked for me was:

    Latest influxdb 2.x Docker image and sbt imports:

      "org.influxdb" % "influxdb-java" % "2.22",
      "com.influxdb" %% "influxdb-client-scala" % "4.3.0",
      "com.influxdb" % "flux-dsl" % "4.3.0",
    

    For writing the sync Java API WriteApiBlocking from influxdb-client-java, because of this discussion about write performance, sth like:

    public CompletionStage<Done> writeTestPoints(int nPoints, String sensorID) {
        List<Integer> range = IntStream.rangeClosed(1, nPoints).boxed().collect(Collectors.toList());
        Source<Integer, NotUsed> source = Source.from(range);
        CompletionStage<Done> done = source
                .groupedWithin(10, Duration.ofMillis(100))
                .mapAsyncUnordered(10, each -> this.eventHandlerPointBatch(each, influxDBClient.getWriteApiBlocking(), nPoints, sensorID))
                .runWith(Sink.ignore(), system);
        return done;
    }
    
    
    private CompletionStage<Done> eventHandlerPointBatch(List<Integer> hrs, WriteApiBlocking writeApi, int nPoints, String sensorID) {
        LOGGER.info("Writing points: {}-{} ", sensorID, hrs);
        List<Point> points = hrs.stream().map(each -> createPoint(nPoints, sensorID, System.nanoTime(), each)).collect(Collectors.toList());
        writeApi.writePoints(points);
        return CompletableFuture.completedFuture(Done.done());
    }
    

    Full example: InfluxdbWriter

    For reading the influxdb-client-scala lib which you mention as "Scala Reactive Client"

    Full example: InfluxdbReader

    The integration test InfluxdbIT bootstraps InfluxDB 2.x Docker image via testcontainers and runs the classes above.