I am trying to collect some metrics and export it to Kafka using OpenTelemetry. For a simple POC, I have a local file in the OpenMetrics format and I wish to parse it and push the metrics on Kafka.
I understand I should use kafkaexporter but I am not able to understand how the end-to-end flow would look like. A code snippet would be really helpful.
I am creating the Kafka exporter using the following code:
func newExporter(ctx context.Context) (exporter.Metrics, error) {
f := kafkaexporter.NewFactory(kafkaexporter.WithMetricsMarshalers())
cfg := f.CreateDefaultConfig()
ts := component.TelemetrySettings{
Logger: logger,
MeterProvider: meterProvider(),
MetricsLevel: configtelemetry.LevelNormal,
}
cs := exporter.CreateSettings{
ID: component.NewID("metrics"),
TelemetrySettings: ts,
BuildInfo: component.NewDefaultBuildInfo(),
}
return f.CreateMetricsExporter(ctx, cs, cfg)
}
func meterProvider() *metric.MeterProvider {
return metric.NewMeterProvider()
}
Now say I have a few counters / gauge meters, how do I push it using the above exporter?
I was able to run the end-to-end flow where I collect some metrics from a downstream dependency, parse it and push to Kafka.
Here's the code snippet:
func collectMetrics() (pmetric.ResourceMetrics, error) {
// collect and return metrics
}
func CollectMetrics(ctx context.Context) error {
me, err := newExporter(ctx)
if err != nil {
return err
}
err = me.Start(ctx, nil)
if err != nil {
return err
}
defer me.Shutdown(ctx)
ms := pmetric.NewMetrics()
rm, err := collectMetrics()
r := ms.ResourceMetrics().AppendEmpty()
rm.CopyTo(r)
err = me.ConsumeMetrics(ctx, ms)
if err != nil {
return err
}
return nil
}
The complete POC can be found here.