I'm trying to write into the Influxdb (running in a docker container with version 2.0). I'm using Scala and Reactive Streams. Therefore the Alpakka connector (https://doc.akka.io/docs/alpakka/current/influxdb.html) because the Scala Reactive Client (https://github.com/influxdata/influxdb-client-java/tree/master/client-scala) does not support writing into the database.
No matter how I try to write into the database, data is not written into it.
Source
.tick(1.seconds, 1.seconds,
Seq(
InfluxDbWriteMessage.create(
Point
.measurement("cpu_load_short")
.addField("host", "server01")
.addField("value", 0.64)
.tag("region", "us-west")
.time(DateTime.now.getMillis, java.util.concurrent.TimeUnit.MILLISECONDS)
.build,
).withDatabaseName("database"),
)
)
.toMat(
InfluxDbSink.create()(
InfluxDBFactory.connect("http://localhost:9091", "admin", "admin123")
)
)(Keep.right)
.run.andThen { case Success(posts) => print("done") }
Also "done" is never printed, so I assume the future is never completed and therefore somewhere is a problem.
The only thing that gets printed is
Pong{version=2.1.1, responseTime=68}
What am I missing, so that writing is not possible. Is it because the Alpakka connector is written for InfluxDB prior version 2 and therefore it does not work?
Although I did not try it myself, the official InfluxDB Alpakka connector may not work to write records to InfluxDB 2.x, so I guess your observation is correct.
What worked for me was:
Latest influxdb 2.x Docker image and sbt imports:
"org.influxdb" % "influxdb-java" % "2.22",
"com.influxdb" %% "influxdb-client-scala" % "4.3.0",
"com.influxdb" % "flux-dsl" % "4.3.0",
For writing the sync Java API WriteApiBlocking from influxdb-client-java, because of this discussion about write performance, sth like:
public CompletionStage<Done> writeTestPoints(int nPoints, String sensorID) {
List<Integer> range = IntStream.rangeClosed(1, nPoints).boxed().collect(Collectors.toList());
Source<Integer, NotUsed> source = Source.from(range);
CompletionStage<Done> done = source
.groupedWithin(10, Duration.ofMillis(100))
.mapAsyncUnordered(10, each -> this.eventHandlerPointBatch(each, influxDBClient.getWriteApiBlocking(), nPoints, sensorID))
.runWith(Sink.ignore(), system);
return done;
}
private CompletionStage<Done> eventHandlerPointBatch(List<Integer> hrs, WriteApiBlocking writeApi, int nPoints, String sensorID) {
LOGGER.info("Writing points: {}-{} ", sensorID, hrs);
List<Point> points = hrs.stream().map(each -> createPoint(nPoints, sensorID, System.nanoTime(), each)).collect(Collectors.toList());
writeApi.writePoints(points);
return CompletableFuture.completedFuture(Done.done());
}
Full example: InfluxdbWriter
For reading the influxdb-client-scala lib which you mention as "Scala Reactive Client"
Full example: InfluxdbReader
The integration test InfluxdbIT bootstraps InfluxDB 2.x Docker image via testcontainers and runs the classes above.