I have a requirement to create a csv file from all the metric values from Kairosdb.
The kairosdb UI already has a save as feature but it doesn't have a metric name in the exported file. Also we can't export multiple metrics into a single file.
The problem I am facing is with matching the timestamp from multiple metrics. For ex, One metric might return 5 timestamp values. Another metric might return 10 timestamp values which may be matching with previous metric or not.
So I need to generate a csv like below:
tmestamp,metric1,metric2,tmetric3\n
0,1,,2\n
1,,2,\n
2,1,3,6\n
3,5,5, \n
4,,,5\n
The value returned from the query might be more than 10000 datapoints. How can I approach this problem. Can I run this program in spark cluster.
The code that I tried:
package com.example;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.kairosdb.client.builder.DataPoint;
public class Test {
private static Map<MetricMap, String> metricMaps = new HashMap<>();
public static void main(String args[]) {
Map<String, List<DataPoint>> metriDps = new HashMap<>();
String[] metricNames = new String[] { "m1", "m2", "m3" };
List<DataPoint> dataPoints1 = new ArrayList<DataPoint>();
DataPoint dp1 = new DataPoint(0, 1);
DataPoint dp2 = new DataPoint(2, 1);
DataPoint dp3 = new DataPoint(3, 5);
dataPoints1.add(dp1);
dataPoints1.add(dp2);
dataPoints1.add(dp3);
metriDps.put("m1", dataPoints1);
List<DataPoint> dataPoints2 = new ArrayList<DataPoint>();
DataPoint dp21 = new DataPoint(1, 2);
DataPoint dp22 = new DataPoint(2, 3);
DataPoint dp23 = new DataPoint(3, 5);
dataPoints2.add(dp21);
dataPoints2.add(dp22);
dataPoints2.add(dp23);
metriDps.put("m2", dataPoints2);
List<DataPoint> dataPoints3 = new ArrayList<DataPoint>();
DataPoint dp31 = new DataPoint(0, 2);
DataPoint dp32 = new DataPoint(2, 6);
DataPoint dp33 = new DataPoint(4, 5);
dataPoints3.add(dp31);
dataPoints3.add(dp32);
dataPoints3.add(dp33);
metriDps.put("m3", dataPoints3);
try {
FileWriter writer = new FileWriter("/home/lr/Desktop/csv1.csv");
metriDps.keySet().stream().forEach(key -> createMap(metriDps.get(key), key));
String value;
for (MetricMap metricMap : metricMaps.keySet()) {
String time = metricMap.getTime();
writer.append(time);
writer.append(',');
for (int i = 0; i < 3; i++) {
MetricMap map = new MetricMap();
map.setName(metricNames[i]);
map.setTime(time);
value = metricMaps.get(map);
if (value != null)
writer.append(metricMaps.get(map));
else
writer.append("");
if (i == 2)
writer.append('\n');
else
writer.append(',');
}
}
// generate whatever data you want
writer.flush();
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private static void createMap(List<DataPoint> list, String key) {
MetricMap map = null;
for (DataPoint dp : list) {
map = new MetricMap();
map.setName(key);
map.setTime(String.valueOf(dp.getTimestamp()));
metricMaps.put(map, String.valueOf(dp.getValue()));
}
}
}
I really appreciate your help.
To make your algorithm work, you'll have to map that has time as key, and point's value + metric names as value. The following does that:
Map<String, List<DataPoint>> metriDps = new HashMap<>();
String[] metricNames = new String[] {
"m1", "m2", "m3"
};
List<DataPoint> dataPoints1 = new ArrayList<DataPoint>();
dataPoints1.add(new DataPoint(0, 1));
dataPoints1.add(new DataPoint(2, 1));
dataPoints1.add(new DataPoint(3, 5));
metriDps.put("m1", dataPoints1);
List<DataPoint> dataPoints2 = new ArrayList<DataPoint>();
dataPoints2.add(new DataPoint(1, 2));
dataPoints2.add(new DataPoint(2, 3));
dataPoints2.add(new DataPoint(3, 5));
metriDps.put("m2", dataPoints2);
List<DataPoint> dataPoints3 = new ArrayList<DataPoint>();
dataPoints3.add(new DataPoint(0, 2));
dataPoints3.add(new DataPoint(2, 6));
dataPoints3.add(new DataPoint(4, 5));
metriDps.put("m3", dataPoints3);
SortedMap<Long, Map<String, String>> map = new TreeMap<>();
// format:
// time1 -> [(metricName, value), (metricName, value), ..]
// time2 -> [(metricName, value), (metricName, value), ..]
// ..
metriDps.entrySet().stream()
.forEach(entry -> {
List<DataPoint> points = entry.getValue();
String metric = entry.getKey();
points.forEach(point -> {
Long time = point.getTimestamp();
Object value = point.getValue();
if (value != null)
// add (metricName, value) to map stored under time
map.computeIfAbsent(time, key -> new HashMap<>())
.put(metric, value.toString());
});
});
StringWriter writer = new StringWriter();
// header
writer.append("timestamp,");
writer.append(Stream.of(metricNames).collect(Collectors.joining(",")));
writer.append('\n');
// content, sorted map means we can simply iterate it's keys
map.entrySet().forEach(entry -> {
// time
writer.append(String.valueOf(entry.getKey()));
writer.append(',');
// fetch all possible metric names from the map so it prints empty ",,"
String line = Stream.of(metricNames)
.map(entry.getValue()::get)
.map(val -> val == null ? "" : val)
.collect(Collectors.joining(","));
writer.append(line);
writer.append('\n');
});
System.out.println(writer);
Prints
timestamp,m1,m2,m3
0,1,,2
1,,2,
2,1,3,6
3,5,5,
4,,,5
With sorted input lists, you can improve the algorithm by keeping 3 iterators, then advancing the one(s) that point to the earliest value. You can thereby iterate all series in parallel / side-by-side. That way you can save some memory because you don't have to build maps and process the lists one by one.
Using the following utility class
static class NamedKeeparator implements Iterator<DataPoint> {
private final Iterator<DataPoint> delegate;
private final String name;
private DataPoint current;
public NamedKeeparator(String name, Iterator<DataPoint> delegate) {
this.delegate = delegate;
this.name = name;
}
@Override
public boolean hasNext() {
return delegate.hasNext();
}
@Override
public DataPoint next() {
return current = delegate.next();
}
public DataPoint current() {
return current;
}
public void consume() {
current = null;
}
String getName() {
return name;
}
}
A potential implementation could be
StringWriter writer = new StringWriter();
// header
writer.append("timestamp,");
writer.append(Stream.of(metricNames).collect(Collectors.joining(",")));
writer.append('\n');
List<NamedKeeparator> iterators = metriDps.entrySet().stream()
.map(entry -> new NamedKeeparator(entry.getKey(), entry.getValue().iterator()))
.collect(Collectors.toList());
List<NamedKeeparator> leastIterators = new ArrayList<>();
for (;;) {
leastIterators.clear();
long leastValue = Long.MAX_VALUE;
for (NamedKeeparator iterator : iterators) {
// advance until there is some value
while (iterator.current() == null && iterator.hasNext()) {
iterator.next();
}
// build set of iterators pointing to least value
if (iterator.current() != null
&& iterator.current().getTimestamp() <= leastValue) {
if (iterator.current().getTimestamp() < leastValue) {
leastValue = iterator.current().getTimestamp();
leastIterators.clear();
}
leastIterators.add(iterator);
}
}
// nothing -> all iterators done
if (leastIterators.isEmpty())
break;
// least contains now iterators for the same timestamp
// get time from the first
long time = leastIterators.get(0).current().getTimestamp();
writer.append(String.valueOf(time)).append(',');
// format points
String points = Stream.of(metricNames)
.map(metric -> leastIterators.stream()
.filter(it -> it.getName().equals(metric)).findAny()
.map(it -> it.current()).orElse(null))
.map(point -> point != null ? String.valueOf(point.getValue()) : "")
.collect(Collectors.joining(","));
writer.append(points).append('\n');
leastIterators.forEach(it -> {
it.consume();
});
}
System.out.println(writer);