Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package main
- import (
- "flag"
- "fmt"
- "os"
- "path/filepath"
- "time"
- "k8s.io/klog"
- v1 "k8s.io/api/core/v1"
- meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/fields"
- "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/tools/cache"
- "k8s.io/client-go/tools/clientcmd"
- "k8s.io/client-go/util/workqueue"
- )
- type Controller struct {
- indexer cache.Indexer
- queue workqueue.RateLimitingInterface
- informer cache.Controller
- }
- func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {
- return &Controller{
- informer: informer,
- indexer: indexer,
- queue: queue,
- }
- }
- func (c *Controller) processNextItem() bool {
- // Wait until there is a new item in the working queue
- key, quit := c.queue.Get()
- if quit {
- return false
- }
- // Tell the queue that we are done with processing this key. This unblocks the key for other workers
- // This allows safe parallel processing because two PersistentVolume with the same key are never processed in
- // parallel.
- defer c.queue.Done(key)
- // Invoke the method containing the business logic
- err := c.syncToStdout(key.(string))
- // Handle the error if something went wrong during the execution of the business logic
- c.handleErr(err, key)
- return true
- }
- // syncToStdout is the business logic of the controller. In this controller it simply prints
- // information about the PersistentVolume to stdout. In case an error happened, it has to simply return the error.
- // The retry logic should not be part of the business logic.
- func (c *Controller) syncToStdout(key string) error {
- obj, exists, err := c.indexer.GetByKey(key)
- if err != nil {
- klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
- return err
- }
- if !exists {
- // Below we will warm up our cache with a PersistentVolume, so that we will see a delete for one PersistentVolume
- fmt.Printf("PersistentVolume %s does not exist anymore\n", key)
- //fmt.Println("%s\n", obj.(*v1.PersistentVolume).GetName(), "%s\n", obj.(*v1.PersistentVolume).GetAnnotations(), "PV: ", "With annotations: ")
- //fmt.Println(obj.(*v1.PersistentVolume).GetName())
- //fmt.Printf("Sync/Add/Update for PersistentVolume %s\n", obj.(*v1.PersistentVolume).GetName())
- /*
- GetAnnotations("REGISTRATION_IN_PROCESS") {
- }
- if obj.(*v1.PersistentVolume).GetAnnotations("REGISTRATION_IN_PROCESS") {
- }
- if obj.(*v1.PersistentVolume).GetAnnotations("REGISTRATION_IS_DONE") {
- }
- */
- } else {
- // Note that you also have to check the uid if you have a local controlled resource, which
- // is dependent on the actual instance, to detect that a PersistentVolume was recreated with the same name
- fmt.Printf("Sync/Add/Update for PersistentVolume %s\n", obj.(*v1.PersistentVolume).GetName())
- // Get all annotations of the pvs
- for k, v := range obj.(*v1.PersistentVolume).GetAnnotations() {
- fmt.Println("PV: ", obj.(*v1.PersistentVolume).GetName(), "With annotations: ", "key[%s] value[%s]\n", k, v)
- // ignore event if annotation REGISTRATION_IN_PROCESS job is present on PV
- if k == "REGISTRATION_IN_PROCESS" {
- fmt.Println("REGISTRATION_IN_PROCESS for volume", obj.(*v1.PersistentVolume).GetName())
- // call a job in order to do chmod into the volume, this will run an alpine image, mount the volumeSource, and run chmod
- // check if job already exists
- // save job ID in annotation on PV
- }
- // check if job ID is in annotation on PV and is out of the queue
- if k == "REGISTRATION_IS_DONE" {
- fmt.Println("REGISTRATION_IS_DONE for volume", obj.(*v1.PersistentVolume).GetName())
- } else {
- // if new event, register annotation
- register := map[string]string{"REGISTRATION_IN_PROCESS": ""}
- obj.(*v1.PersistentVolume).SetAnnotations(register)
- fmt.Println("Setting annotation REGISTRATION_IN_PROCESS for volume ", obj.(*v1.PersistentVolume).GetName())
- }
- }
- //fmt.Println("PV: ", obj.(*v1.PersistentVolume).GetName(), "With annotations: ", obj.(*v1.PersistentVolume).GetAnnotations())
- }
- return nil
- }
- // handleErr checks if an error happened and makes sure we will retry later.
- func (c *Controller) handleErr(err error, key interface{}) {
- if err == nil {
- // Forget about the #AddRateLimited history of the key on every successful synchronization.
- // This ensures that future processing of updates for this key is not delayed because of
- // an outdated error history.
- c.queue.Forget(key)
- return
- }
- // This controller retries 5 times if something goes wrong. After that, it stops trying.
- if c.queue.NumRequeues(key) < 5 {
- klog.Infof("Error syncing PersistentVolume %v: %v", key, err)
- // Re-enqueue the key rate limited. Based on the rate limiter on the
- // queue and the re-enqueue history, the key will be processed later again.
- c.queue.AddRateLimited(key)
- return
- }
- c.queue.Forget(key)
- // Report to an external entity that, even after several retries, we could not successfully process this key
- runtime.HandleError(err)
- klog.Infof("Dropping PersistentVolume %q out of the queue: %v", key, err)
- }
- func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
- defer runtime.HandleCrash()
- // Let the workers stop when we are done
- defer c.queue.ShutDown()
- klog.Info("Starting PersistentVolume controller")
- go c.informer.Run(stopCh)
- // Wait for all involved caches to be synced, before processing items from the queue is started
- if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
- runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
- return
- }
- for i := 0; i < threadiness; i++ {
- go wait.Until(c.runWorker, time.Second, stopCh)
- }
- <-stopCh
- klog.Info("Stopping PersistentVolume controller")
- }
- func (c *Controller) runWorker() {
- for c.processNextItem() {
- }
- }
- func main() {
- var master string
- var kubeconfig *string
- if home := homeDir(); home != "" {
- kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
- } else {
- kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
- }
- flag.Parse()
- // use the current context in kubeconfig
- config, err := clientcmd.BuildConfigFromFlags(master, *kubeconfig)
- if err != nil {
- panic(err.Error())
- }
- /* INCLIENT CONFIGURATION
- var kubeconfig string
- var master string
- flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
- flag.StringVar(&master, "master", "", "master url")
- flag.Parse()
- // creates the connection
- config, err := clientcmd.BuildConfigFromFlags(master, kubeconfig)
- if err != nil {
- klog.Fatal(err)
- }
- */
- // creates the clientset
- clientset, err := kubernetes.NewForConfig(config)
- if err != nil {
- klog.Fatal(err)
- }
- // create the PersistentVolume watcher
- PersistentVolumeListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "persistentvolumes", v1.NamespaceAll, fields.Everything())
- // create the workqueue
- queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
- // Bind the workqueue to a cache with the help of an informer. This way we make sure that
- // whenever the cache is updated, the PersistentVolume key is added to the workqueue.
- // Note that when we finally process the item from the workqueue, we might see a newer version
- // of the PersistentVolume than the version which was responsible for triggering the update.
- indexer, informer := cache.NewIndexerInformer(PersistentVolumeListWatcher, &v1.PersistentVolume{}, 0, cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- key, err := cache.MetaNamespaceKeyFunc(obj)
- if err == nil {
- queue.Add(key)
- }
- },
- UpdateFunc: func(old interface{}, new interface{}) {
- key, err := cache.MetaNamespaceKeyFunc(new)
- if err == nil {
- queue.Add(key)
- }
- },
- DeleteFunc: func(obj interface{}) {
- // IndexerInformer uses a delta queue, therefore for deletes we have to use this
- // key function.
- key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
- if err == nil {
- queue.Add(key)
- }
- },
- }, cache.Indexers{})
- controller := NewController(queue, indexer, informer)
- //informer2 := cache.NewSharedIndexInformer(
- // PersistentVolumeListWatcher,
- // &v1.PersistentVolume{},
- // 0, //Skip resyncr
- // cache.Indexers{},
- //)
- // We can now warm up the cache for initial synchronization.
- // Let's suppose that we knew about a PersistentVolume "myPersistentVolume" on our last run, therefore add it to the cache.
- // If this PersistentVolume is not there anymore, the controller will be notified about the removal after the
- // cache has synchronized.
- indexer.Add(&v1.PersistentVolume{
- ObjectMeta: meta_v1.ObjectMeta{
- Name: "myPersistentVolume",
- Namespace: v1.NamespaceAll,
- },
- })
- // Now let's start the controller
- stop := make(chan struct{})
- defer close(stop)
- go controller.Run(1, stop)
- // Wait forever
- select {}
- }
- func homeDir() string {
- if h := os.Getenv("HOME"); h != "" {
- return h
- }
- return os.Getenv("USERPROFILE") // windows
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement