I'm in the process of migrating from azure-event-hubs-go/v3 to the newer azeventhubs Go SDK. In the older SDK, there was a ReceiveOption argument that allowed me to specify from where to start consuming events.
In the new SDK, I'm using the following code to initialize the processor:
processor, err := azeventhubs.NewProcessor(
e.ConsumerClient,
checkpointStore,
&azeventhubs.ProcessorOptions{
UpdateInterval: time.Second,
Prefetch: 0,
StartPositions: azeventhubs.StartPositions{
Default: azeventhubs.StartPosition{
Latest: to.Ptr(true),
EnqueuedTime: to.Ptr(time.Now()),
Inclusive: true
}
}
}
)
However, I've noticed that the events are being consumed from the last checkpoint rather than from the most recently sent events.
What I've Tried: I've experimented with both ConsumingEventsUsingConsumerClient and ConsumingEventsWithCheckpoints examples, but they both behave the same way, consuming events from the last checkpoint rather than the most recent events.
What I'm Expecting: I want the processor to start consuming the latest events sent from the device, which sends a message every second. How can I achieve this behavior using azeventhubs Go SDK?
I initially struggled to grasp the underlying mechanisms of AMQP. However, I am pleased to report that the issue has been successfully resolved.
var wg sync.WaitGroup
wg.Add(1)
for _, partition := range p.PartitionIDs {
go func(partition string) {
defer wg.Done()
partitionClient, err := consumerClient.NewPartitionClient(partition, nil)
if err != nil {
panic(err)
}
receiveCtx, cancel := context.WithTimeout(context.TODO(), time.Second*30)
defer cancel()
for {
events, err := partitionClient.ReceiveEvents(receiveCtx, 1, nil)
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
panic(err)
}
for _, evt := range events {
fmt.Printf("partition: %s\n", partition)
fmt.Printf("Body: %s\n", string(evt.Body))
}
}
}(partition)
}
wg.Wait()
I extend my gratitude to the Azure customer support services team for their invaluable assistance.