avroflumeflume-ng

Apache Avro schema validation in Apache Flume


After reading about Apache Flume and the benefits it provides in terms of handling client events I decided it was time to start looking into this in more detail. Another great benefit appears to be that it can handle Apache Avro objects :-) However, I am struggle to understand how the Avro schema is used to validate Flume events received.

To help understand my problem in more detail I have provided code snippets below;

Avro schema

For the purpose of this post I am using a sample schema defining a nested Object1 record with 2 fields.

{
  "namespace": "com.example.avro",
  "name": "Example",
  "type": "record",
  "fields": [
    {
      "name": "object1",
      "type": {
        "name": "Object1",
        "type": "record",
        "fields": [
          {
            "name": "value1",
            "type": "string"
          },
          {
            "name": "value2",
            "type": "string"
          }
        ]
      }
    }
  ]
}

Embedded Flume agent

Within my Java project I am currently using the Apache Flume embedded agent as detailed below;

public static void main(String[] args) {
    final Event event = EventBuilder.withBody("Test", Charset.forName("UTF-8"));

    final Map<String, String> properties = new HashMap<>();
    properties.put("channel.type", "memory");
    properties.put("channel.capacity", "100");
    properties.put("sinks", "sink1");
    properties.put("sink1.type", "avro");
    properties.put("sink1.hostname", "192.168.99.101");
    properties.put("sink1.port", "11111");
    properties.put("sink1.batch-size", "1");
    properties.put("processor.type", "failover");

    final EmbeddedAgent embeddedAgent = new EmbeddedAgent("TestAgent");
    embeddedAgent.configure(properties);
    embeddedAgent.start();

    try {
        embeddedAgent.put(event);
    } catch (EventDeliveryException e) {
        e.printStackTrace();
    }
}

In the above example I am creating a new Flume event with "Test" defined as the event body sending events to a separate Apache Flume agent running inside a VM (192.168.99.101).

Remote Flume agent

As described above I have configured this agent to receive events from the embedded Flume agent. The Flume configuration for this agent looks like;

# Name the components on this agent
hello.sources = avroSource
hello.channels = memoryChannel
hello.sinks = loggerSink

# Describe/configure the source
hello.sources.avroSource.type = avro
hello.sources.avroSource.bind = 0.0.0.0
hello.sources.avroSource.port = 11111
hello.sources.avroSource.channels = memoryChannel

# Describe the sink
hello.sinks.loggerSink.type = logger

# Use a channel which buffers events in memory
hello.channels.memoryChannel.type = memory
hello.channels.memoryChannel.capacity = 1000
hello.channels.memoryChannel.transactionCapacity = 1000

# Bind the source and sink to the channel
hello.sources.avroSource.channels = memoryChannel
hello.sinks.loggerSink.channel = memoryChannel

And I am executing the following command to launch the agent;

./bin/flume-ng agent --conf conf --conf-file ../sample-flume.conf --name hello -Dflume.root.logger=TRACE,console -Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true

When I execute the Java project main method I see the "Test" event is passed through to my logger sink with the following output;

2019-02-18 14:15:09,998 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 54 65 73 74                                     Test }

However, it is unclear to me exactly where I should configure the Avro schema to ensure that only valid events are received and processed by Flume. Can someone please help me understand where I am going wrong? Or, if I have misunderstood the intention of how Flume is designed to convert Flume events into Avro events?

In addition to the above I have also tried using the Avro RPC client after changing the Avro schema to specify a protocol talking directly to my remote Flume agent, but when I attempt to send events I see the following error;

Exception in thread "main" org.apache.avro.AvroRuntimeException: Not a remote message: test
    at org.apache.avro.ipc.Requestor$Response.getResponse(Requestor.java:532)
    at org.apache.avro.ipc.Requestor$TransceiverCallback.handleResult(Requestor.java:359)
    at org.apache.avro.ipc.Requestor$TransceiverCallback.handleResult(Requestor.java:322)
    at org.apache.avro.ipc.NettyTransceiver$NettyClientAvroHandler.messageReceived(NettyTransceiver.java:613)
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at org.apache.avro.ipc.NettyTransceiver$NettyClientAvroHandler.handleUpstream(NettyTransceiver.java:595)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:558)
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:786)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:458)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:439)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:558)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:553)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:84)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:471)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:332)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:35)
    at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:102)
    at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

My goal is that I am able to ensure that events populated by my application conforms to the Avro schema generated to avoid invalid events being published. I would prefer I achieve this using the embedded Flume agent, but if this is not possible then I would consider using the Avro RPC approach talking directly to my remote Flume agent.

Any help / guidance would be a great help. Thanks in advance.

UPDATE

After further reading I wonder if I have misunderstood the purpose of Apache Flume. I originally thought this could be used to automatically create Avro events based on the data / schema, but now wondering if the application should assume responsibility for producing Avro events which will be stored in Flume according to the channel configuration and sent as a batch via the sink (in my case a Spark Streaming cluster).

If the above is correct then I would like to know whether Flume is required to know about the schema or just my Spark Streaming cluster which will eventually process this data? If Flume is required to know about the schema then can you please provide details of how this can be achieved?

Thanks in advance.


Solution

  • Since your goal is to process the data using Spark Streaming cluster you may solve this problem with 2 solutions

    1) Using Flume client (tested with flume-ng-sdk 1.9.0) and Spark Streaming (tested with spark-streaming_2.11 2.4.0 and spark-streaming-flume_2.11 2.3.0) without Flume server in between the network topology.

    Client class sends Flume json event at port 41416

      public class JSONFlumeClient {
        public static void main(String[] args) {
        RpcClient client = RpcClientFactory.getDefaultInstance("localhost", 41416);
        String jsonData = "{\r\n" + "  \"namespace\": \"com.example.avro\",\r\n" + "  \"name\": \"Example\",\r\n"
                + "  \"type\": \"record\",\r\n" + "  \"fields\": [\r\n" + "    {\r\n"
                + "      \"name\": \"object1\",\r\n" + "      \"type\": {\r\n" + "        \"name\": \"Object1\",\r\n"
                + "        \"type\": \"record\",\r\n" + "        \"fields\": [\r\n" + "          {\r\n"
                + "            \"name\": \"value1\",\r\n" + "            \"type\": \"string\"\r\n" + "          },\r\n"
                + "          {\r\n" + "            \"name\": \"value2\",\r\n" + "            \"type\": \"string\"\r\n"
                + "          }\r\n" + "        ]\r\n" + "      }\r\n" + "    }\r\n" + "  ]\r\n" + "}";
        Event event = EventBuilder.withBody(jsonData, Charset.forName("UTF-8"));
        try {
            client.append(event);
        } catch (Throwable t) {
            System.err.println(t.getMessage());
            t.printStackTrace();
        } finally {
            client.close();
        }
      }
    }
    

    Spark Streaming Server class listens at port 41416

    public class SparkStreamingToySample {
      public static void main(String[] args) throws Exception {
        SparkConf sparkConf = new SparkConf().setMaster("local[2]")
        .setAppName("SparkStreamingToySample");
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(30));
        JavaReceiverInputDStream<SparkFlumeEvent> lines = FlumeUtils
        .createStream(ssc, "localhost", 41416);
        lines.map(sfe -> new String(sfe.event().getBody().array(), "UTF-8"))
        .foreachRDD((data,time)->
        System.out.println("***" + new Date(time.milliseconds()) + "=" + data.collect().toString()));
        ssc.start();
        ssc.awaitTermination();
      }
    }
    

    2) Using Flume client + Flume server between + Spark Streaming (as Flume Sink) as network topology.

    For this option, the code is the same, but now the SparkStreaming must specify the full dns qualified hostname instead of localhost to start SparkStreaming server at same port 41416 if you're running this locally for testing. The Flume client will connect to flume server port 41415. The tricky part now is how to define your flume topology. You need to specify both a source and a sink for this to work.

    See flume conf below

    agent1.channels.ch1.type = memory
    
    agent1.sources.avroSource1.channels = ch1
    agent1.sources.avroSource1.type = avro
    agent1.sources.avroSource1.bind = 0.0.0.0
    agent1.sources.avroSource1.port = 41415
    
    agent1.sinks.avroSink.channel = ch1
    agent1.sinks.avroSink.type = avro
    agent1.sinks.avroSink.hostname = <full dns qualified hostname>
    agent1.sinks.avroSink.port = 41416
    
    agent1.channels = ch1
    agent1.sources = avroSource1
    agent1.sinks = avroSink
    

    You should get same results with both solutions, but returning to your question of if Flume is really needed for Spark Streaming contents from Json stream, the answer is it depends, Flume supports interceptors so in this case it could be used to cleanse or filter invalid data for your Spark project, but since you're adding an extra component to the topology it may impact performance and require more resources (CPU/Memory) than without Flume.