I want to aggregate data from last minute from telegraf with kapacitor before putting them into influxdb and I also have this need for calculating few percentiles. And so I wrote a simple tick for test
var firstPerc = stream
|from()
.measurement('my_tmp_measurement_from_telegraf')
var secondPerc = stream
|from()
.measurement('my_tmp_measurement_from_telegraf')
firstPerc
|join(secondPerc)
.as('fp', 'sp')
|percentile('fp.myAggVal', 50.0)
|eval(lambda: "percentile")
.as('50p')
|percentile('sp.myAggVal', 90.0)
|eval(lambda: "percentile")
.as('90p')
|window()
.period(60s)
.every(60s)
.align()
|influxDBOut()
.database('myDBInInflux')
.retentionPolicy('autogen')
In my database, I have only values for 50th percentile, and I am not suprised with that since I use "percentile" in my eval but still, I cannot find in Kapacitor documentation any clue about how to get result I need.
Here you have "visual" result I crave for:
time 50p 90p someOtherP's otherDataICanPropablyHandle
Halp!
You are using the same measurement stream (and the same data in it) twice, so data are popped. First you should save the measurement stream:
var myStream = stream
|from()
.measurement('my_tmp_measurement_from_telegraf')
Next define streams using saved measurement. You should define here proper grouping, evaluations, etc.:
var firstPerc = myStream
|percentile('myAggVal', 50.0)
|eval(lambda: "percentile")
.as('percentile')
|window()
.period(60s)
.every(60s)
.align()
var secondPerc = myStream
|percentile('myAggVal', 90.0)
|eval(lambda: "percentile")
.as('percentile')
|window()
.period(60s)
.every(60s)
.align()
Finaly, it's time to define join stream:
var joinedStreams = firstPerc
|join(secondPerc)
.as('50', '90')
.tolerance(1s)
.streamName('measurementName')
|influxDBOut()
.database('myDBInInflux')
.retentionPolicy('autogen')
.create()
The output:
time 50.percentile 90.percentile
I strongly suggest using .tolerance(), which will group measurements within the same tolerance period.