gokubernetesclient-go

Kubernetes client-go use Informers to watch deployments


I'm trying to use client-go informers to get the replica count on deployments. Whenever autoscaling changes the number of replicas, I need to retrieve this in order to handle some other logic. I was previously using the Watch() function, but there are a few inconsistencies with timeouts and connection drops.

The following code below shows an example of the implementation:

labelOptions := informers.WithTweakListOptions(func(opts *v1.ListOptions) {
    opts.FieldSelector = "metadata.name=" + name
})
factory := informers.NewSharedInformerFactoryWithOptions(clientSet, 2*time.Second, informers.WithNamespace(namespace), labelOptions)
informer := factory.Apps().V1().Deployments().Informer()

// Using the channels and goroutines below didn't show changes:
stopper := make(chan struct{})
defer close(stopper)
//go func() {
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: func(obj interface{}) {
        mObj, ok := obj.(*appsv1.Deployment)
        if !ok {
            panic(spew.Sdump("informer returned invalid type", mObj))
        }

        replicas := int(*mObj.Spec.Replicas)
        logger.Infof("updating replicas to %d", replicas)

        sendUpdates() // use updates elsewhere
    },

    
    UpdateFunc: func(oldObj, newObj interface{}) {
        old, ok := oldObj.(*appsv1.Deployment)
        if !ok {
            panic(spew.Sdump("informer returned invalid type", old))
        }
        newDeployment, ok := newObj.(*appsv1.Deployment)
        if !ok {
            panic(spew.Sdump("informer returned invalid type", newDeployment))
        }
        oldReplicas := int(*old.Spec.Replicas)
        newReplicas := int(*newDeployment.Spec.Replicas)
        if oldReplicas != newReplicas {
            sendUpdates()
        }
    },
})

//factory.Start(wait.NeverStop)
//factory.WaitForCacheSync(wait.NeverStop)
informer.Run(stopper)

When Kubernetes autoscales or I change the Deployments replica manually, I get deployment.apps/app scaled but it doesn't get caught by the Informer. Nothing gets printed in the logs and it enters a crash loop with no error message.

I used the following resources:


Solution

  • A few things to note:

    labelOptions := informers.WithTweakListOptions(func(opts *v1.ListOptions) {
        opts.FieldSelector = "metadata.name=" + name
    })
    factory := informers.NewSharedInformerFactoryWithOptions(clientSet, 2*time.Second, informers.WithNamespace(namespace), labelOptions)
    informer := factory.Apps().V1().Deployments().Informer()
    
    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            mObj, ok := obj.(*appsv1.Deployment)
            if !ok {
                doSomething()
            }
            replicas := int(*mObj.Spec.Replicas)
            doSomething() 
        },
    
        
        UpdateFunc: func(oldObj, newObj interface{}) {
            old, ok := oldObj.(*appsv1.Deployment)
            if !ok {
                doSomething()
            }
            newDeployment, ok := newObj.(*appsv1.Deployment)
            if !ok {
                doSomething()
            }
            oldReplicas := int(*old.Spec.Replicas)
            newReplicas := int(*newDeployment.Spec.Replicas)
            if oldReplicas != newReplicas {
                doSomething()
            }
        },
    })
    
    // Initializes all active informers and starts the internal goroutine
    factory.Start(wait.NeverStop)
    factory.WaitForCacheSync(wait.NeverStop)