SHARE
TWEET

Untitled

a guest May 22nd, 2019 77 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. package main
  2.  
  3. import (
  4.     "flag"
  5.     "fmt"
  6.     "os"
  7.     "path/filepath"
  8.     "time"
  9.  
  10.     "k8s.io/klog"
  11.  
  12.     v1 "k8s.io/api/core/v1"
  13.     meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  14.     "k8s.io/apimachinery/pkg/fields"
  15.     "k8s.io/apimachinery/pkg/util/runtime"
  16.     "k8s.io/apimachinery/pkg/util/wait"
  17.     "k8s.io/client-go/kubernetes"
  18.     "k8s.io/client-go/tools/cache"
  19.     "k8s.io/client-go/tools/clientcmd"
  20.     "k8s.io/client-go/util/workqueue"
  21. )
  22.  
  23. type Controller struct {
  24.     indexer  cache.Indexer
  25.     queue    workqueue.RateLimitingInterface
  26.     informer cache.Controller
  27. }
  28.  
  29. func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {
  30.     return &Controller{
  31.         informer: informer,
  32.         indexer:  indexer,
  33.         queue:    queue,
  34.     }
  35. }
  36.  
  37. func (c *Controller) processNextItem() bool {
  38.     // Wait until there is a new item in the working queue
  39.     key, quit := c.queue.Get()
  40.     if quit {
  41.         return false
  42.     }
  43.     // Tell the queue that we are done with processing this key. This unblocks the key for other workers
  44.     // This allows safe parallel processing because two PersistentVolume with the same key are never processed in
  45.     // parallel.
  46.     defer c.queue.Done(key)
  47.  
  48.     // Invoke the method containing the business logic
  49.     err := c.syncToStdout(key.(string))
  50.     // Handle the error if something went wrong during the execution of the business logic
  51.     c.handleErr(err, key)
  52.     return true
  53. }
  54.  
  55. // syncToStdout is the business logic of the controller. In this controller it simply prints
  56. // information about the PersistentVolume to stdout. In case an error happened, it has to simply return the error.
  57. // The retry logic should not be part of the business logic.
  58. func (c *Controller) syncToStdout(key string) error {
  59.     obj, exists, err := c.indexer.GetByKey(key)
  60.     if err != nil {
  61.         klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
  62.         return err
  63.     }
  64.  
  65.     if !exists {
  66.         // Below we will warm up our cache with a PersistentVolume, so that we will see a delete for one PersistentVolume
  67.         fmt.Printf("PersistentVolume %s does not exist anymore\n", key)
  68.  
  69.         //fmt.Println("%s\n", obj.(*v1.PersistentVolume).GetName(), "%s\n", obj.(*v1.PersistentVolume).GetAnnotations(), "PV: ", "With annotations: ")
  70.         //fmt.Println(obj.(*v1.PersistentVolume).GetName())
  71.         //fmt.Printf("Sync/Add/Update for PersistentVolume %s\n", obj.(*v1.PersistentVolume).GetName())
  72.         /*
  73.             GetAnnotations("REGISTRATION_IN_PROCESS") {
  74.  
  75.             }
  76.             if obj.(*v1.PersistentVolume).GetAnnotations("REGISTRATION_IN_PROCESS") {
  77.  
  78.             }
  79.             if obj.(*v1.PersistentVolume).GetAnnotations("REGISTRATION_IS_DONE") {
  80.  
  81.             }
  82.         */
  83.     } else {
  84.  
  85.         // Note that you also have to check the uid if you have a local controlled resource, which
  86.         // is dependent on the actual instance, to detect that a PersistentVolume was recreated with the same name
  87.         fmt.Printf("Sync/Add/Update for PersistentVolume %s\n", obj.(*v1.PersistentVolume).GetName())
  88.  
  89.         // Get all annotations of the pvs
  90.         for k, v := range obj.(*v1.PersistentVolume).GetAnnotations() {
  91.             fmt.Println("PV: ", obj.(*v1.PersistentVolume).GetName(), "With annotations: ", "key[%s] value[%s]\n", k, v)
  92.  
  93.             // ignore event if annotation REGISTRATION_IN_PROCESS job is present on PV
  94.             if k == "REGISTRATION_IN_PROCESS" {
  95.                 fmt.Println("REGISTRATION_IN_PROCESS for volume", obj.(*v1.PersistentVolume).GetName())
  96.  
  97.                 // call a job in order to do chmod into the volume, this will run an alpine image, mount the volumeSource, and run chmod
  98.                 // check if job already exists
  99.                 // save job ID in annotation on PV
  100.             }
  101.             // check if job ID is in annotation on PV and is out of the queue
  102.             if k == "REGISTRATION_IS_DONE" {
  103.                 fmt.Println("REGISTRATION_IS_DONE for volume", obj.(*v1.PersistentVolume).GetName())
  104.  
  105.             } else {
  106.                 // if new event, register annotation
  107.                 register := map[string]string{"REGISTRATION_IN_PROCESS": ""}
  108.                 obj.(*v1.PersistentVolume).SetAnnotations(register)
  109.                 fmt.Println("Setting annotation REGISTRATION_IN_PROCESS for volume ", obj.(*v1.PersistentVolume).GetName())
  110.             }
  111.         }
  112.  
  113.         //fmt.Println("PV: ", obj.(*v1.PersistentVolume).GetName(), "With annotations: ", obj.(*v1.PersistentVolume).GetAnnotations())
  114.     }
  115.     return nil
  116. }
  117.  
  118. // handleErr checks if an error happened and makes sure we will retry later.
  119. func (c *Controller) handleErr(err error, key interface{}) {
  120.     if err == nil {
  121.         // Forget about the #AddRateLimited history of the key on every successful synchronization.
  122.         // This ensures that future processing of updates for this key is not delayed because of
  123.         // an outdated error history.
  124.         c.queue.Forget(key)
  125.         return
  126.     }
  127.  
  128.     // This controller retries 5 times if something goes wrong. After that, it stops trying.
  129.     if c.queue.NumRequeues(key) < 5 {
  130.         klog.Infof("Error syncing PersistentVolume %v: %v", key, err)
  131.  
  132.         // Re-enqueue the key rate limited. Based on the rate limiter on the
  133.         // queue and the re-enqueue history, the key will be processed later again.
  134.         c.queue.AddRateLimited(key)
  135.         return
  136.     }
  137.  
  138.     c.queue.Forget(key)
  139.     // Report to an external entity that, even after several retries, we could not successfully process this key
  140.     runtime.HandleError(err)
  141.     klog.Infof("Dropping PersistentVolume %q out of the queue: %v", key, err)
  142. }
  143.  
  144. func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
  145.     defer runtime.HandleCrash()
  146.  
  147.     // Let the workers stop when we are done
  148.     defer c.queue.ShutDown()
  149.     klog.Info("Starting PersistentVolume controller")
  150.  
  151.     go c.informer.Run(stopCh)
  152.  
  153.     // Wait for all involved caches to be synced, before processing items from the queue is started
  154.     if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
  155.         runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
  156.         return
  157.     }
  158.  
  159.     for i := 0; i < threadiness; i++ {
  160.         go wait.Until(c.runWorker, time.Second, stopCh)
  161.     }
  162.  
  163.     <-stopCh
  164.     klog.Info("Stopping PersistentVolume controller")
  165. }
  166.  
  167. func (c *Controller) runWorker() {
  168.     for c.processNextItem() {
  169.     }
  170. }
  171.  
  172. func main() {
  173.  
  174.     var master string
  175.  
  176.     var kubeconfig *string
  177.     if home := homeDir(); home != "" {
  178.         kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
  179.     } else {
  180.         kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
  181.     }
  182.     flag.Parse()
  183.  
  184.     // use the current context in kubeconfig
  185.     config, err := clientcmd.BuildConfigFromFlags(master, *kubeconfig)
  186.     if err != nil {
  187.         panic(err.Error())
  188.     }
  189.  
  190.     /* INCLIENT CONFIGURATION
  191.  
  192.     var kubeconfig string
  193.     var master string
  194.  
  195.     flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
  196.     flag.StringVar(&master, "master", "", "master url")
  197.     flag.Parse()
  198.  
  199.     // creates the connection
  200.     config, err := clientcmd.BuildConfigFromFlags(master, kubeconfig)
  201.     if err != nil {
  202.         klog.Fatal(err)
  203.     }
  204.  
  205.  
  206.  
  207.  
  208.     */
  209.  
  210.     // creates the clientset
  211.     clientset, err := kubernetes.NewForConfig(config)
  212.     if err != nil {
  213.         klog.Fatal(err)
  214.     }
  215.  
  216.     // create the PersistentVolume watcher
  217.     PersistentVolumeListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "persistentvolumes", v1.NamespaceAll, fields.Everything())
  218.  
  219.     // create the workqueue
  220.     queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
  221.  
  222.     // Bind the workqueue to a cache with the help of an informer. This way we make sure that
  223.     // whenever the cache is updated, the PersistentVolume key is added to the workqueue.
  224.     // Note that when we finally process the item from the workqueue, we might see a newer version
  225.     // of the PersistentVolume than the version which was responsible for triggering the update.
  226.     indexer, informer := cache.NewIndexerInformer(PersistentVolumeListWatcher, &v1.PersistentVolume{}, 0, cache.ResourceEventHandlerFuncs{
  227.         AddFunc: func(obj interface{}) {
  228.             key, err := cache.MetaNamespaceKeyFunc(obj)
  229.             if err == nil {
  230.                 queue.Add(key)
  231.             }
  232.         },
  233.         UpdateFunc: func(old interface{}, new interface{}) {
  234.             key, err := cache.MetaNamespaceKeyFunc(new)
  235.             if err == nil {
  236.                 queue.Add(key)
  237.             }
  238.         },
  239.         DeleteFunc: func(obj interface{}) {
  240.             // IndexerInformer uses a delta queue, therefore for deletes we have to use this
  241.             // key function.
  242.             key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
  243.             if err == nil {
  244.                 queue.Add(key)
  245.             }
  246.         },
  247.     }, cache.Indexers{})
  248.  
  249.     controller := NewController(queue, indexer, informer)
  250.  
  251.     //informer2 := cache.NewSharedIndexInformer(
  252.     //  PersistentVolumeListWatcher,
  253.     //  &v1.PersistentVolume{},
  254.     //  0, //Skip resyncr
  255.     //  cache.Indexers{},
  256.     //)
  257.  
  258.     // We can now warm up the cache for initial synchronization.
  259.     // Let's suppose that we knew about a PersistentVolume "myPersistentVolume" on our last run, therefore add it to the cache.
  260.     // If this PersistentVolume is not there anymore, the controller will be notified about the removal after the
  261.     // cache has synchronized.
  262.     indexer.Add(&v1.PersistentVolume{
  263.         ObjectMeta: meta_v1.ObjectMeta{
  264.             Name:      "myPersistentVolume",
  265.             Namespace: v1.NamespaceAll,
  266.         },
  267.     })
  268.  
  269.     // Now let's start the controller
  270.     stop := make(chan struct{})
  271.     defer close(stop)
  272.     go controller.Run(1, stop)
  273.  
  274.     // Wait forever
  275.     select {}
  276. }
  277.  
  278. func homeDir() string {
  279.     if h := os.Getenv("HOME"); h != "" {
  280.         return h
  281.     }
  282.     return os.Getenv("USERPROFILE") // windows
  283. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top