user-defined-functionsinfluxdbkapacitorinfluxdb-python

Creating Kapacitor UDF that wants a STREAM and provides a BATCH (Python)


I’m having trouble at making an UDF that wants a STREAM and provides a BATCH.

This way:

def info(self):
    response = udf_pb2.Response()
    response.info.wants = udf_pb2.STREAM
    response.info.provides = udf_pb2.BATCH
    response.info.options['field'].valueTypes.append(udf_pb2.STRING)
    return response

Is there anyone with an example code?? I searched around the web (foruns, documentation) but all the examples are for BATCH-BACH, STREAM-STREAM or BATCH-STREAM.

I saw in the examples that when writing the response to Kapacitor, in the “end_batch(self, end_req)” method, is necessary to kind of “comunicate” that the BATCH has ended, in an example this was made this way:

def end_batch(self, end_req):
    # Send begin batch with count of outliers
    self._begin_response.begin.size = len(self._batch)
    self._agent.write_response(self._begin_response)

    response = udf_pb2.Response()


                                  ...    


    # Send an identical end batch back to Kapacitor
        # HERE
    response.end.CopyFrom(end_req)
    self._agent.write_response(response)

In order to send the BATCH, I have to send it from the “point(self, point)” method, but can’t acess the end_req object and don’t know how to create one.

Thanks in advance! Bye bye!


Solution

  • Hope this is still relevant, I'd make a STREAM-STREAM UDF and pipeline it into a window node. You can keep a copy of the window of data, like in their moving average example, and do any batch analysis on that. If you figured out how to write a STREAM-BATCH UDF I'd love to see it though, way less ugly than my answer.

    Edit

    jdv was definitely right my last answer was more of a comment for sure. Here's a STREAM-BATCH UDF in python, it just echoes the data that came in on a stream in a batch. It's still a little broken because it can't serialize the point class in the handler snapshot method. So whenever it needs to take a snapshot it crashes, might be solvable by using a different serialization method like pickling or by writing a JSON encoder/decoder for point. I'll get around to fixing that sometime but my work week is nearly done. The main thing you need to do to make a STREAM-BATCH UDF is construct the batch begin and end messages, which is done in the createEndBatch and createStartBatch method respectively.

    Edit 2

    Fixed the serialization by using a combination of protobufs method and json.