Advertisement
Guest User

Untitled

a guest
May 22nd, 2019
352
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 9.23 KB | None | 0 0
  1. package main
  2.  
  3. import (
  4. "flag"
  5. "fmt"
  6. "os"
  7. "path/filepath"
  8. "time"
  9.  
  10. "k8s.io/klog"
  11.  
  12. v1 "k8s.io/api/core/v1"
  13. meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  14. "k8s.io/apimachinery/pkg/fields"
  15. "k8s.io/apimachinery/pkg/util/runtime"
  16. "k8s.io/apimachinery/pkg/util/wait"
  17. "k8s.io/client-go/kubernetes"
  18. "k8s.io/client-go/tools/cache"
  19. "k8s.io/client-go/tools/clientcmd"
  20. "k8s.io/client-go/util/workqueue"
  21. )
  22.  
  23. type Controller struct {
  24. indexer cache.Indexer
  25. queue workqueue.RateLimitingInterface
  26. informer cache.Controller
  27. }
  28.  
  29. func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {
  30. return &Controller{
  31. informer: informer,
  32. indexer: indexer,
  33. queue: queue,
  34. }
  35. }
  36.  
  37. func (c *Controller) processNextItem() bool {
  38. // Wait until there is a new item in the working queue
  39. key, quit := c.queue.Get()
  40. if quit {
  41. return false
  42. }
  43. // Tell the queue that we are done with processing this key. This unblocks the key for other workers
  44. // This allows safe parallel processing because two PersistentVolume with the same key are never processed in
  45. // parallel.
  46. defer c.queue.Done(key)
  47.  
  48. // Invoke the method containing the business logic
  49. err := c.syncToStdout(key.(string))
  50. // Handle the error if something went wrong during the execution of the business logic
  51. c.handleErr(err, key)
  52. return true
  53. }
  54.  
  55. // syncToStdout is the business logic of the controller. In this controller it simply prints
  56. // information about the PersistentVolume to stdout. In case an error happened, it has to simply return the error.
  57. // The retry logic should not be part of the business logic.
  58. func (c *Controller) syncToStdout(key string) error {
  59. obj, exists, err := c.indexer.GetByKey(key)
  60. if err != nil {
  61. klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
  62. return err
  63. }
  64.  
  65. if !exists {
  66. // Below we will warm up our cache with a PersistentVolume, so that we will see a delete for one PersistentVolume
  67. fmt.Printf("PersistentVolume %s does not exist anymore\n", key)
  68.  
  69. //fmt.Println("%s\n", obj.(*v1.PersistentVolume).GetName(), "%s\n", obj.(*v1.PersistentVolume).GetAnnotations(), "PV: ", "With annotations: ")
  70. //fmt.Println(obj.(*v1.PersistentVolume).GetName())
  71. //fmt.Printf("Sync/Add/Update for PersistentVolume %s\n", obj.(*v1.PersistentVolume).GetName())
  72. /*
  73. GetAnnotations("REGISTRATION_IN_PROCESS") {
  74.  
  75. }
  76. if obj.(*v1.PersistentVolume).GetAnnotations("REGISTRATION_IN_PROCESS") {
  77.  
  78. }
  79. if obj.(*v1.PersistentVolume).GetAnnotations("REGISTRATION_IS_DONE") {
  80.  
  81. }
  82. */
  83. } else {
  84.  
  85. // Note that you also have to check the uid if you have a local controlled resource, which
  86. // is dependent on the actual instance, to detect that a PersistentVolume was recreated with the same name
  87. fmt.Printf("Sync/Add/Update for PersistentVolume %s\n", obj.(*v1.PersistentVolume).GetName())
  88.  
  89. // Get all annotations of the pvs
  90. for k, v := range obj.(*v1.PersistentVolume).GetAnnotations() {
  91. fmt.Println("PV: ", obj.(*v1.PersistentVolume).GetName(), "With annotations: ", "key[%s] value[%s]\n", k, v)
  92.  
  93. // ignore event if annotation REGISTRATION_IN_PROCESS job is present on PV
  94. if k == "REGISTRATION_IN_PROCESS" {
  95. fmt.Println("REGISTRATION_IN_PROCESS for volume", obj.(*v1.PersistentVolume).GetName())
  96.  
  97. // call a job in order to do chmod into the volume, this will run an alpine image, mount the volumeSource, and run chmod
  98. // check if job already exists
  99. // save job ID in annotation on PV
  100. }
  101. // check if job ID is in annotation on PV and is out of the queue
  102. if k == "REGISTRATION_IS_DONE" {
  103. fmt.Println("REGISTRATION_IS_DONE for volume", obj.(*v1.PersistentVolume).GetName())
  104.  
  105. } else {
  106. // if new event, register annotation
  107. register := map[string]string{"REGISTRATION_IN_PROCESS": ""}
  108. obj.(*v1.PersistentVolume).SetAnnotations(register)
  109. fmt.Println("Setting annotation REGISTRATION_IN_PROCESS for volume ", obj.(*v1.PersistentVolume).GetName())
  110. }
  111. }
  112.  
  113. //fmt.Println("PV: ", obj.(*v1.PersistentVolume).GetName(), "With annotations: ", obj.(*v1.PersistentVolume).GetAnnotations())
  114. }
  115. return nil
  116. }
  117.  
  118. // handleErr checks if an error happened and makes sure we will retry later.
  119. func (c *Controller) handleErr(err error, key interface{}) {
  120. if err == nil {
  121. // Forget about the #AddRateLimited history of the key on every successful synchronization.
  122. // This ensures that future processing of updates for this key is not delayed because of
  123. // an outdated error history.
  124. c.queue.Forget(key)
  125. return
  126. }
  127.  
  128. // This controller retries 5 times if something goes wrong. After that, it stops trying.
  129. if c.queue.NumRequeues(key) < 5 {
  130. klog.Infof("Error syncing PersistentVolume %v: %v", key, err)
  131.  
  132. // Re-enqueue the key rate limited. Based on the rate limiter on the
  133. // queue and the re-enqueue history, the key will be processed later again.
  134. c.queue.AddRateLimited(key)
  135. return
  136. }
  137.  
  138. c.queue.Forget(key)
  139. // Report to an external entity that, even after several retries, we could not successfully process this key
  140. runtime.HandleError(err)
  141. klog.Infof("Dropping PersistentVolume %q out of the queue: %v", key, err)
  142. }
  143.  
  144. func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
  145. defer runtime.HandleCrash()
  146.  
  147. // Let the workers stop when we are done
  148. defer c.queue.ShutDown()
  149. klog.Info("Starting PersistentVolume controller")
  150.  
  151. go c.informer.Run(stopCh)
  152.  
  153. // Wait for all involved caches to be synced, before processing items from the queue is started
  154. if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
  155. runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
  156. return
  157. }
  158.  
  159. for i := 0; i < threadiness; i++ {
  160. go wait.Until(c.runWorker, time.Second, stopCh)
  161. }
  162.  
  163. <-stopCh
  164. klog.Info("Stopping PersistentVolume controller")
  165. }
  166.  
  167. func (c *Controller) runWorker() {
  168. for c.processNextItem() {
  169. }
  170. }
  171.  
  172. func main() {
  173.  
  174. var master string
  175.  
  176. var kubeconfig *string
  177. if home := homeDir(); home != "" {
  178. kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
  179. } else {
  180. kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
  181. }
  182. flag.Parse()
  183.  
  184. // use the current context in kubeconfig
  185. config, err := clientcmd.BuildConfigFromFlags(master, *kubeconfig)
  186. if err != nil {
  187. panic(err.Error())
  188. }
  189.  
  190. /* INCLIENT CONFIGURATION
  191.  
  192. var kubeconfig string
  193. var master string
  194.  
  195. flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
  196. flag.StringVar(&master, "master", "", "master url")
  197. flag.Parse()
  198.  
  199. // creates the connection
  200. config, err := clientcmd.BuildConfigFromFlags(master, kubeconfig)
  201. if err != nil {
  202. klog.Fatal(err)
  203. }
  204.  
  205.  
  206.  
  207.  
  208. */
  209.  
  210. // creates the clientset
  211. clientset, err := kubernetes.NewForConfig(config)
  212. if err != nil {
  213. klog.Fatal(err)
  214. }
  215.  
  216. // create the PersistentVolume watcher
  217. PersistentVolumeListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "persistentvolumes", v1.NamespaceAll, fields.Everything())
  218.  
  219. // create the workqueue
  220. queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
  221.  
  222. // Bind the workqueue to a cache with the help of an informer. This way we make sure that
  223. // whenever the cache is updated, the PersistentVolume key is added to the workqueue.
  224. // Note that when we finally process the item from the workqueue, we might see a newer version
  225. // of the PersistentVolume than the version which was responsible for triggering the update.
  226. indexer, informer := cache.NewIndexerInformer(PersistentVolumeListWatcher, &v1.PersistentVolume{}, 0, cache.ResourceEventHandlerFuncs{
  227. AddFunc: func(obj interface{}) {
  228. key, err := cache.MetaNamespaceKeyFunc(obj)
  229. if err == nil {
  230. queue.Add(key)
  231. }
  232. },
  233. UpdateFunc: func(old interface{}, new interface{}) {
  234. key, err := cache.MetaNamespaceKeyFunc(new)
  235. if err == nil {
  236. queue.Add(key)
  237. }
  238. },
  239. DeleteFunc: func(obj interface{}) {
  240. // IndexerInformer uses a delta queue, therefore for deletes we have to use this
  241. // key function.
  242. key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
  243. if err == nil {
  244. queue.Add(key)
  245. }
  246. },
  247. }, cache.Indexers{})
  248.  
  249. controller := NewController(queue, indexer, informer)
  250.  
  251. //informer2 := cache.NewSharedIndexInformer(
  252. // PersistentVolumeListWatcher,
  253. // &v1.PersistentVolume{},
  254. // 0, //Skip resyncr
  255. // cache.Indexers{},
  256. //)
  257.  
  258. // We can now warm up the cache for initial synchronization.
  259. // Let's suppose that we knew about a PersistentVolume "myPersistentVolume" on our last run, therefore add it to the cache.
  260. // If this PersistentVolume is not there anymore, the controller will be notified about the removal after the
  261. // cache has synchronized.
  262. indexer.Add(&v1.PersistentVolume{
  263. ObjectMeta: meta_v1.ObjectMeta{
  264. Name: "myPersistentVolume",
  265. Namespace: v1.NamespaceAll,
  266. },
  267. })
  268.  
  269. // Now let's start the controller
  270. stop := make(chan struct{})
  271. defer close(stop)
  272. go controller.Run(1, stop)
  273.  
  274. // Wait forever
  275. select {}
  276. }
  277.  
  278. func homeDir() string {
  279. if h := os.Getenv("HOME"); h != "" {
  280. return h
  281. }
  282. return os.Getenv("USERPROFILE") // windows
  283. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement