goapache-kafkacloudevents

Send data in cloudevents format to Kafka topic


Right now I have this code and it works fine. (It sends some json format data to Kafka topic)

j, err := json.Marshal(data)
if err != nil {
    log.Fatal(err)
}
msg := &sarama.ProducerMessage{
    Topic: tName,
    Value: sarama.StringEncoder(j),
}

_, _, err = producer.SendMessage(msg)

but somebody wishes ho have this data in cloudevents format. -> https://github.com/cloudevents/sdk-go so what should I do because this Event structure can not be directly casted to string.

type Event struct {
    Context     EventContext
    DataEncoded []byte
    // DataBase64 indicates if the event, when serialized, represents
    // the data field using the base64 encoding.
    // In v0.3, this field is superseded by DataContentEncoding
    DataBase64  bool
    FieldErrors map[string]error
}

so this code won't even compile.

j, err := json.Marshal(data)
if err != nil {
    log.Fatal(err)
}

//...

event := cloudevents.NewEvent()
event.SetSource("example/uri") 
event.SetType("example.type")
event.SetData(cloudevents.ApplicationJSON, j)

producerMsg := &sarama.ProducerMessage{
    Topic: s.outputTopic,
    Value: sarama.StringEncoder(event),
}
_, _, err = s.producer.SendMessage(producerMsg)

What should I do to send this Event to Kafka? Try to cast event.DataEncoded to string or something like that? btw. Programming language is golang.


Solution

  • Did you see the section of the docs that serialized the event?

    https://github.com/cloudevents/sdk-go#serializedeserialize-a-cloudevent

    event := cloudevents.NewEvent()
    event.SetSource("example/uri") 
    event.SetType("example.type") 
    // data here is a map[string] interface{}, or some other Struct type representing the "example.type" schema type above 
    event.SetData(cloudevents.ApplicationJSON, data)
    
    bytes, err := json.Marshal(event)
    if err != nil {
      log.Fatal(err)
    }
    producerMsg := &sarama.ProducerMessage{
        Topic: s.outputTopic,
        Value: bytes,  // you've already encoded the event 
    }
    

    Otherwise, be sure to look at the sample code provided that uses the CloudEvent client https://github.com/cloudevents/sdk-go/blob/main/samples/kafka/sender/main.go

    sender, err := kafka_sarama.NewSender([]string{"127.0.0.1:9092"}, saramaConfig, "test-topic")
    if err != nil {
        log.Fatalf("failed to create protocol: %s", err.Error())
    }
    
    defer sender.Close(context.Background())
    
    c, err := cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
    if err != nil {
        log.Fatalf("failed to create client, %v", err)
    }
    
    event := cloudevents.NewEvent() 
    event.Set... 
    
    c.Send(..., event)
    
    ...