Advertisement
Guest User

Untitled

a guest
Dec 13th, 2019
120
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 6.94 KB | None | 0 0
  1. package kubernetes
  2.  
  3. import (
  4.     "context"
  5.     "encoding/base64"
  6.     "fmt"
  7.  
  8.     container "cloud.google.com/go/container/apiv1"
  9.     "github.com/pkg/errors"
  10.     "golang.org/x/oauth2"
  11.     containerpb "google.golang.org/genproto/googleapis/container/v1"
  12.     v1core "k8s.io/api/core/v1"
  13.     kerrors "k8s.io/apimachinery/pkg/api/errors"
  14.     v1meta "k8s.io/apimachinery/pkg/apis/meta/v1"
  15.     "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
  16.     "k8s.io/apimachinery/pkg/fields"
  17.     "k8s.io/apimachinery/pkg/runtime"
  18.     "k8s.io/apimachinery/pkg/runtime/schema"
  19.     "k8s.io/apimachinery/pkg/types"
  20.     "k8s.io/apimachinery/pkg/util/jsonmergepatch"
  21.     "k8s.io/apimachinery/pkg/util/mergepatch"
  22.     "k8s.io/client-go/discovery"
  23.     "k8s.io/client-go/dynamic"
  24.     "k8s.io/client-go/kubernetes"
  25.     _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
  26.     "k8s.io/client-go/rest"
  27.     "k8s.io/client-go/tools/clientcmd/api"
  28.  
  29.     "gitlab.messagebird.io/infrastructure-team/tools/tron/pkg/appenv"
  30. )
  31.  
  32. type Clusters struct {
  33.     Clusters       map[string]appenv.ClusterID
  34.     ClusterManager *container.ClusterManagerClient
  35.     TokenSource    oauth2.TokenSource
  36. }
  37.  
  38. func (kCls Clusters) GetClientForEnvironment(env string) (*Client, error) {
  39.     clID, ok := kCls.Clusters[env]
  40.     if !ok {
  41.         return nil, errors.New(fmt.Sprintf("no cluster for environment %s", env))
  42.     }
  43.  
  44.     cluster, err := kCls.ClusterManager.GetCluster(context.TODO(), &containerpb.GetClusterRequest{
  45.         Name: clID.GCloudName(),
  46.     })
  47.     if err != nil {
  48.         return nil, errors.Wrapf(err, "can't get cluster info for %s", env)
  49.     }
  50.  
  51.     return &Client{
  52.         cluster:     cluster,
  53.         tokenSource: kCls.TokenSource,
  54.         ClusterID:   clID,
  55.     }, nil
  56. }
  57.  
  58. type Client struct {
  59.     cluster     *containerpb.Cluster
  60.     tokenSource oauth2.TokenSource
  61.     ClusterID   appenv.ClusterID
  62. }
  63.  
  64. type authorized struct {
  65.     clientset       *kubernetes.Clientset
  66.     dynamicClient   dynamic.Interface
  67.     discoveryClient resourceDiscovery
  68. }
  69.  
  70. func (c Client) newAuthorized() (*authorized, error) {
  71.     config, err := c.getConfig()
  72.     if err != nil {
  73.         return nil, errors.WithStack(err)
  74.     }
  75.     clientset, err := kubernetes.NewForConfig(config)
  76.     if err != nil {
  77.         return nil, errors.WithStack(err)
  78.     }
  79.     dynClient, err := dynamic.NewForConfig(config)
  80.     if err != nil {
  81.         return nil, errors.WithStack(err)
  82.     }
  83.     discClient, err := newResourceDiscovery(config)
  84.     if err != nil {
  85.         return nil, errors.WithStack(err)
  86.     }
  87.     return &authorized{
  88.         clientset:       clientset,
  89.         dynamicClient:   dynClient,
  90.         discoveryClient: discClient,
  91.     }, nil
  92. }
  93.  
  94. func (c Client) getConfig() (*rest.Config, error) {
  95.     token, err := c.tokenSource.Token()
  96.     if err != nil {
  97.         return nil, errors.Wrapf(err, "can't issue authentication token")
  98.     }
  99.     caDecoded, err := base64.StdEncoding.DecodeString(c.cluster.MasterAuth.ClusterCaCertificate)
  100.     if err != nil {
  101.         return nil, errors.Wrapf(err, "invalid cluster CA for %s", c.cluster.Name)
  102.     }
  103.     return &rest.Config{
  104.         Host: c.cluster.Endpoint,
  105.         TLSClientConfig: rest.TLSClientConfig{
  106.             CAData: caDecoded,
  107.         },
  108.         AuthProvider: &api.AuthProviderConfig{
  109.             Name: "gcp",
  110.             Config: map[string]string{
  111.                 "token": token.AccessToken,
  112.             },
  113.         },
  114.     }, nil
  115. }
  116.  
  117. type groupVersionKind struct {
  118.     group   string
  119.     version string
  120.     kind    string
  121. }
  122.  
  123. type resourceDiscovery struct {
  124.     gvkToResource map[groupVersionKind]string
  125.     resourceLists []*v1meta.APIResourceList
  126. }
  127.  
  128. func newResourceDiscovery(config *rest.Config) (resourceDiscovery, error) {
  129.     d := resourceDiscovery{
  130.         gvkToResource: map[groupVersionKind]string{},
  131.     }
  132.     client, err := discovery.NewDiscoveryClientForConfig(config)
  133.     if err != nil {
  134.         return d, errors.WithStack(err)
  135.     }
  136.     d.resourceLists, err = client.ServerPreferredResources()
  137.     if err != nil {
  138.         return d, errors.WithStack(err)
  139.     }
  140.     for _, resourceList := range d.resourceLists {
  141.         for _, r := range resourceList.APIResources {
  142.             gv, err := schema.ParseGroupVersion(resourceList.GroupVersion)
  143.             if err != nil {
  144.                 return d, errors.WithStack(err)
  145.             }
  146.             k := groupVersionKind{
  147.                 group:   gv.Group,
  148.                 version: gv.Version,
  149.                 kind:    r.Kind,
  150.             }
  151.             d.gvkToResource[k] = r.Name
  152.         }
  153.     }
  154.     return d, nil
  155. }
  156.  
  157. func (d resourceDiscovery) resourceForObject(o runtime.Object) (string, error) {
  158.     gvk := o.GetObjectKind().GroupVersionKind()
  159.     resourceName, ok := d.gvkToResource[groupVersionKind{
  160.         group:   gvk.Group,
  161.         version: gvk.Version,
  162.         kind:    gvk.Kind,
  163.     }]
  164.     if !ok {
  165.         return "", errors.Errorf("can't find resource for %s/%s/%s", gvk.Version, gvk.Group, gvk.Kind)
  166.     }
  167.     return resourceName, nil
  168. }
  169.  
  170. func (c Client) Apply(manifest Manifest) error {
  171.     auth, err := c.newAuthorized()
  172.     if err != nil {
  173.         return errors.WithStack(err)
  174.     }
  175.     for _, resource := range manifest.resources {
  176.         gvk := resource.GetObjectKind().GroupVersionKind()
  177.         resourceName, err := auth.discoveryClient.resourceForObject(resource)
  178.         if err != nil {
  179.             return errors.WithStack(err)
  180.         }
  181.         namespaceableResource := auth.dynamicClient.Resource(schema.GroupVersionResource{
  182.             Group:    gvk.Group,
  183.             Version:  gvk.Version,
  184.             Resource: resourceName,
  185.         })
  186.         var (
  187.             r        dynamic.ResourceInterface
  188.             unstruct unstructured.Unstructured
  189.         )
  190.         if resource.GetNamespace() == "" {
  191.             r = namespaceableResource
  192.         } else {
  193.             r = namespaceableResource.Namespace(resource.GetNamespace())
  194.         }
  195.  
  196.         if unstruct.Object, err = runtime.DefaultUnstructuredConverter.ToUnstructured(resource); err != nil {
  197.             return errors.WithStack(err)
  198.         }
  199.         annotations := unstruct.GetAnnotations()
  200.         if annotations == nil {
  201.             annotations = map[string]string{}
  202.         }
  203.         oldConfiguration := []byte(annotations[v1core.LastAppliedConfigAnnotation])
  204.         newConfiguration, err := unstruct.MarshalJSON()
  205.         if err != nil {
  206.             return errors.WithStack(err)
  207.         }
  208.         annotations[v1core.LastAppliedConfigAnnotation] = string(newConfiguration)
  209.         unstruct.SetAnnotations(annotations)
  210.  
  211.         if existing, err := r.Get(resource.GetName(), v1meta.GetOptions{}); err != nil && !kerrors.IsNotFound(err) {
  212.             return errors.WithStack(err)
  213.         } else if kerrors.IsNotFound(err) {
  214.             if _, err := r.Create(&unstruct, v1meta.CreateOptions{}); err != nil {
  215.                 return errors.Wrapf(err, "resource %s/%s/%s %s/%s", gvk.Version, gvk.Group, gvk.Kind, resource.GetNamespace(), resource.GetName())
  216.             }
  217.         } else {
  218.             preconditions := []mergepatch.PreconditionFunc{
  219.                 mergepatch.RequireKeyUnchanged("apiVersion"),
  220.                 mergepatch.RequireKeyUnchanged("kind"),
  221.                 mergepatch.RequireMetadataKeyUnchanged("name"),
  222.             }
  223.             original, err := existing.MarshalJSON()
  224.             if err != nil {
  225.                 return errors.WithStack(err)
  226.             }
  227.             patch, err := jsonmergepatch.CreateThreeWayJSONMergePatch(oldConfiguration, newConfiguration, original, preconditions...)
  228.             if err != nil {
  229.                 return errors.WithStack(err)
  230.             }
  231.             if _, err := r.Patch(resource.GetName(), types.MergePatchType, patch, v1meta.PatchOptions{}); err != nil {
  232.                 return errors.Wrapf(err, "resource %s/%s/%s %s/%s", gvk.Version, gvk.Group, gvk.Kind, resource.GetNamespace(), resource.GetName())
  233.             }
  234.         }
  235.     }
  236.     return nil
  237. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement