Right now I have this code and it works fine. (It sends some json format data to Kafka topic)
j, err := json.Marshal(data)
if err != nil {
log.Fatal(err)
}
msg := &sarama.ProducerMessage{
Topic: tName,
Value: sarama.StringEncoder(j),
}
_, _, err = producer.SendMessage(msg)
but somebody wishes ho have this data in cloudevents format. -> https://github.com/cloudevents/sdk-go so what should I do because this Event structure can not be directly casted to string.
type Event struct {
Context EventContext
DataEncoded []byte
// DataBase64 indicates if the event, when serialized, represents
// the data field using the base64 encoding.
// In v0.3, this field is superseded by DataContentEncoding
DataBase64 bool
FieldErrors map[string]error
}
so this code won't even compile.
j, err := json.Marshal(data)
if err != nil {
log.Fatal(err)
}
//...
event := cloudevents.NewEvent()
event.SetSource("example/uri")
event.SetType("example.type")
event.SetData(cloudevents.ApplicationJSON, j)
producerMsg := &sarama.ProducerMessage{
Topic: s.outputTopic,
Value: sarama.StringEncoder(event),
}
_, _, err = s.producer.SendMessage(producerMsg)
What should I do to send this Event to Kafka? Try to cast event.DataEncoded to string or something like that? btw. Programming language is golang.
Did you see the section of the docs that serialized the event?
https://github.com/cloudevents/sdk-go#serializedeserialize-a-cloudevent
event := cloudevents.NewEvent()
event.SetSource("example/uri")
event.SetType("example.type")
// data here is a map[string] interface{}, or some other Struct type representing the "example.type" schema type above
event.SetData(cloudevents.ApplicationJSON, data)
bytes, err := json.Marshal(event)
if err != nil {
log.Fatal(err)
}
producerMsg := &sarama.ProducerMessage{
Topic: s.outputTopic,
Value: bytes, // you've already encoded the event
}
Otherwise, be sure to look at the sample code provided that uses the CloudEvent client https://github.com/cloudevents/sdk-go/blob/main/samples/kafka/sender/main.go
sender, err := kafka_sarama.NewSender([]string{"127.0.0.1:9092"}, saramaConfig, "test-topic")
if err != nil {
log.Fatalf("failed to create protocol: %s", err.Error())
}
defer sender.Close(context.Background())
c, err := cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
if err != nil {
log.Fatalf("failed to create client, %v", err)
}
event := cloudevents.NewEvent()
event.Set...
c.Send(..., event)
...