Advertisement
Guest User

Untitled

a guest
Jul 20th, 2017
145
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 13.18 KB | None | 0 0
  1. package main
  2.  
  3. // Copyright 2016-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  4. // Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with the License. A copy of the License is located at
  5. // http://aws.amazon.com/apache2.0/
  6. // or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
  7.  
  8. import (
  9.     "fmt"
  10.     log "github.com/Sirupsen/logrus"
  11.     "github.com/aws/aws-sdk-go/aws"
  12.     "github.com/aws/aws-sdk-go/aws/ec2metadata"
  13.     "github.com/aws/aws-sdk-go/aws/session"
  14.     "github.com/aws/aws-sdk-go/service/route53"
  15.     "github.com/fsouza/go-dockerclient"
  16.     "github.com/aws/aws-sdk-go/service/cloudwatchevents"
  17.     "net/http"
  18.     "io/ioutil"
  19.     "strconv"
  20.     "strings"
  21.     "time"
  22.     "flag"
  23. )
  24.  
  25. const workerTimeout = 180 * time.Second
  26. const defaultTTL = 0
  27. const defaultWeight = 1
  28.  
  29. var DNSName = "alissa.agilent.com"
  30.  
  31. type handler interface {
  32.     Handle(*docker.APIEvents) error
  33. }
  34.  
  35. type dockerRouter struct {
  36.     handlers      map[string][]handler
  37.     dockerClient  *docker.Client
  38.     listener      chan *docker.APIEvents
  39.     workers       chan *worker
  40.     workerTimeout time.Duration
  41. }
  42.  
  43. func dockerEventsRouter(bufferSize int, workerPoolSize int, dockerClient *docker.Client,
  44.     handlers map[string][]handler) (*dockerRouter, error) {
  45.     workers := make(chan *worker, workerPoolSize)
  46.     for i := 0; i < workerPoolSize; i++ {
  47.         workers <- &worker{}
  48.     }
  49.  
  50.     dockerRouter := &dockerRouter{
  51.         handlers:      handlers,
  52.         dockerClient:  dockerClient,
  53.         listener:      make(chan *docker.APIEvents, bufferSize),
  54.         workers:       workers,
  55.         workerTimeout: workerTimeout,
  56.     }
  57.  
  58.     return dockerRouter, nil
  59. }
  60.  
  61. func (e *dockerRouter) start() error {
  62.     go e.manageEvents()
  63.     return e.dockerClient.AddEventListener(e.listener)
  64. }
  65.  
  66. func (e *dockerRouter) stop() error {
  67.     if e.listener == nil {
  68.         return nil
  69.     }
  70.     return e.dockerClient.RemoveEventListener(e.listener)
  71. }
  72.  
  73. func (e *dockerRouter) manageEvents() {
  74.     for {
  75.         event := <-e.listener
  76.         timer := time.NewTimer(e.workerTimeout)
  77.         gotWorker := false
  78.         // Wait until we get a free worker or a timeout
  79.         // there is a limit in the number of concurrent events managed by workers to avoid resource exhaustion
  80.         // so we wait until we have a free worker or a timeout occurs
  81.         for !gotWorker {
  82.             select {
  83.             case w := <-e.workers:
  84.                 if !timer.Stop() {
  85.                     <-timer.C
  86.                 }
  87.                 go w.doWork(event, e)
  88.                 gotWorker = true
  89.             case <-timer.C:
  90.                 log.Infof("Timed out waiting.")
  91.             }
  92.         }
  93.     }
  94. }
  95.  
  96. type worker struct{}
  97.  
  98. func (w *worker) doWork(event *docker.APIEvents, e *dockerRouter) {
  99.     defer func() { e.workers <- w }()
  100.     if handlers, ok := e.handlers[event.Status]; ok {
  101.         log.Infof("Processing event: %#v", event)
  102.         for _, handler := range handlers {
  103.             if err := handler.Handle(event); err != nil {
  104.                 log.Errorf("Error processing event %#v. Error: %v", event, err)
  105.             }
  106.         }
  107.     }
  108. }
  109.  
  110. type dockerHandler struct {
  111.     handlerFunc func(event *docker.APIEvents) error
  112. }
  113.  
  114. func (th *dockerHandler) Handle(event *docker.APIEvents) error {
  115.     return th.handlerFunc(event)
  116. }
  117.  
  118. type config struct {
  119.     HostedZoneId string
  120.     Hostname     string
  121.     Region       string
  122. }
  123.  
  124. var configuration config
  125.  
  126. func logErrorAndFail(err error) {
  127.     if err != nil {
  128.         log.Fatal(err)
  129.     }
  130. }
  131.  
  132. func logErrorNoFatal(err error) {
  133.     if err != nil {
  134.         log.Error(err)
  135.     }
  136. }
  137.  
  138. type topTasks struct {
  139.     Tasks []taskInfo
  140. }
  141.  
  142. type taskInfo struct {
  143.     Arn           string
  144.     DesiredStatus string
  145.     KnownStatus   string
  146.     Family        string
  147.     Version       string
  148.     Containers    []ContainerInfo
  149. }
  150.  
  151. type ContainerInfo struct {
  152.     DockerId   string
  153.     DockerName string
  154.     Name       string
  155. }
  156.  
  157. type ServiceInfo struct {
  158.     Name string
  159.     Port string
  160. }
  161.  
  162. func getDNSHostedZoneId() (string, error) {
  163.     r53 := route53.New(session.New())
  164.     params := &route53.ListHostedZonesByNameInput{
  165.         DNSName: aws.String(DNSName),
  166.     }
  167.  
  168.     zones, err := r53.ListHostedZonesByName(params)
  169.  
  170.     if err == nil {
  171.         if len(zones.HostedZones) > 0 {
  172.             return aws.StringValue(zones.HostedZones[0].Id), nil
  173.         }
  174.     }
  175.  
  176.     return "", err
  177. }
  178.  
  179. func createDNSRecord(serviceName string, dockerId string, port string) error {
  180.     r53 := route53.New(session.New())
  181.     srvRecordName := serviceName + "." + DNSName
  182.     // This API call creates a new DNS record for this service
  183.     params := &route53.ChangeResourceRecordSetsInput{
  184.         ChangeBatch: &route53.ChangeBatch{
  185.             Changes: []*route53.Change{
  186.                 {
  187.                     Action: aws.String(route53.ChangeActionCreate),
  188.                     ResourceRecordSet: &route53.ResourceRecordSet{
  189.                         Name: aws.String(srvRecordName),
  190.                         // It creates a SRV record with the name of the service
  191.                         Type: aws.String(route53.RRTypeSrv),
  192.                         ResourceRecords: []*route53.ResourceRecord{
  193.                             {
  194.                                 // priority: the priority of the target host, lower value means more preferred
  195.                                 // weight: A relative weight for records with the same priority, higher value means more preferred
  196.                                 // port: the TCP or UDP port on which the service is to be found
  197.                                 // target: the canonical hostname of the machine providing the service
  198.                                 Value: aws.String("1 1 " + port + " " + configuration.Hostname),
  199.                             },
  200.                         },
  201.                         SetIdentifier: aws.String(dockerId),
  202.                         // TTL=0 to avoid DNS caches
  203.                         TTL:    aws.Int64(defaultTTL),
  204.                         Weight: aws.Int64(defaultWeight),
  205.                     },
  206.                 },
  207.             },
  208.             Comment: aws.String("Service Discovery Created Record"),
  209.         },
  210.         HostedZoneId: aws.String(configuration.HostedZoneId),
  211.     }
  212.     _, err := r53.ChangeResourceRecordSets(params)
  213.     logErrorNoFatal(err)
  214.     fmt.Println("Record " + srvRecordName + " created (1 1 " + port + " " + configuration.Hostname + ")")
  215.     return err
  216. }
  217.  
  218. func deleteDNSRecord(serviceName string, dockerId string) error {
  219.     var err error
  220.     r53 := route53.New(session.New())
  221.     srvRecordName := serviceName + "." + DNSName
  222.     // This API Call looks for the Route53 DNS record for this service and docker ID to get the values to delete
  223.     paramsList := &route53.ListResourceRecordSetsInput{
  224.         HostedZoneId:          aws.String(configuration.HostedZoneId), // Required
  225.         MaxItems:              aws.String("10"),
  226.         StartRecordIdentifier: aws.String(dockerId),
  227.         StartRecordName:       aws.String(srvRecordName),
  228.         StartRecordType:       aws.String(route53.RRTypeSrv),
  229.     }
  230.     resp, err := r53.ListResourceRecordSets(paramsList)
  231.     logErrorNoFatal(err)
  232.     if err != nil {
  233.         return err
  234.     }
  235.     srvValue := ""
  236.     for _, rrset := range resp.ResourceRecordSets {
  237.         if *rrset.SetIdentifier == dockerId && (*rrset.Name == srvRecordName || *rrset.Name == srvRecordName+".") {
  238.             for _, rrecords := range rrset.ResourceRecords {
  239.                 srvValue = aws.StringValue(rrecords.Value)
  240.                 break
  241.             }
  242.         }
  243.     }
  244.     if srvValue == "" {
  245.         log.Error("Route53 Record doesn't exist")
  246.         return nil
  247.     }
  248.  
  249.     // This API call deletes the DNS record for the service for this docker ID
  250.     params := &route53.ChangeResourceRecordSetsInput{
  251.         ChangeBatch: &route53.ChangeBatch{
  252.             Changes: []*route53.Change{
  253.                 {
  254.                     Action: aws.String(route53.ChangeActionDelete),
  255.                     ResourceRecordSet: &route53.ResourceRecordSet{
  256.                         Name: aws.String(srvRecordName),
  257.                         Type: aws.String(route53.RRTypeSrv),
  258.                         ResourceRecords: []*route53.ResourceRecord{
  259.                             {
  260.                                 Value: aws.String(srvValue),
  261.                             },
  262.                         },
  263.                         SetIdentifier: aws.String(dockerId),
  264.                         TTL:           aws.Int64(defaultTTL),
  265.                         Weight:        aws.Int64(defaultWeight),
  266.                     },
  267.                 },
  268.             },
  269.         },
  270.         HostedZoneId: aws.String(configuration.HostedZoneId),
  271.     }
  272.     _, err = r53.ChangeResourceRecordSets(params)
  273.     logErrorNoFatal(err)
  274.     fmt.Println("Record " + srvRecordName + " deleted ( " + srvValue + ")")
  275.     return err
  276. }
  277.  
  278. var dockerClient *docker.Client
  279.  
  280. func getNetworkPortAndServiceName(container *docker.Container, includePort bool) []ServiceInfo {
  281.     // One of the environment varialbles should be SERVICE_<port>_NAME = <name of the service>
  282.     // We look for this environment variable doing a split in the "=" and another one in the "_"
  283.     // So envEval = [SERVICE_<port>_NAME, <name>]
  284.     // nameEval = [SERVICE, <port>, NAME]
  285.     var svc []ServiceInfo = make([]ServiceInfo, 0)
  286.     for _, env := range container.Config.Env {
  287.         envEval := strings.Split(env, "=")
  288.         nameEval := strings.Split(envEval[0], "_")
  289.         if len(envEval) == 2 && len(nameEval) == 3 && nameEval[0] == "SERVICE" && nameEval[2] == "NAME" {
  290.             if _, err := strconv.Atoi(nameEval[1]); err == nil {
  291.                 if includePort {
  292.                     for srcPort, mapping := range container.NetworkSettings.Ports {
  293.                         portEval := strings.Split(string(srcPort), "/")
  294.                         if len(portEval) > 0 && portEval[0] == nameEval[1] {
  295.                             if len(mapping) > 0 {
  296.                                 svc = append(svc, ServiceInfo{envEval[1], mapping[0].HostPort})
  297.                             }
  298.                         }
  299.                     }
  300.                 } else {
  301.                     svc = append(svc, ServiceInfo{envEval[1], ""})
  302.                 }
  303.             }
  304.         }
  305.     }
  306.     return svc
  307. }
  308.  
  309. func sendToCWEvents (detail string, detailType string, resource string, source string) error {
  310.     config := aws.NewConfig().WithRegion(configuration.Region)
  311.     sess := session.New(config)
  312.     svc := cloudwatchevents.New(sess)
  313.     params := &cloudwatchevents.PutEventsInput{
  314.         Entries: []*cloudwatchevents.PutEventsRequestEntry{
  315.             {
  316.                 Detail: aws.String(detail),
  317.                 DetailType: aws.String(detailType),
  318.                 Resources: []*string{
  319.                     aws.String(resource),
  320.                 },
  321.                 Source: aws.String(source),
  322.                 Time: aws.Time(time.Now()),
  323.             },
  324.         },
  325.     }
  326.     _, err := svc.PutEvents(params)
  327.     logErrorNoFatal(err)
  328.     return err
  329. }
  330.  
  331. func getTaskArn(dockerID string) string {
  332.     resp, err := http.Get("http://127.0.0.1:51678/v1/tasks")
  333.     if err != nil {
  334.         logErrorAndFail(err)
  335.     }
  336.     defer resp.Body.Close()
  337.     body, err := ioutil.ReadAll(resp.Body)
  338.     bodyStr := string(body)
  339.     idIndex := strings.Index(bodyStr, string(dockerID))
  340.     arnStartIndex := strings.LastIndex(bodyStr[:idIndex], "arn:aws:ecs:")
  341.     arnString := bodyStr[arnStartIndex:]
  342.     arnEndIndex := strings.Index(arnString, "\"")
  343.     return arnString[:arnEndIndex]
  344. }
  345.  
  346. func main() {
  347.     var err error
  348.     var sum int
  349.     var zoneId string
  350.  
  351.     var sendEvents = flag.Bool("cw-send-events", false, "Send CloudWatch events when a container is created or terminated")
  352.     var zoneIdStr = flag.String("hosted-zone-id", "none", "Route53 Hosted Zone Id")
  353.  
  354.     flag.Parse()
  355.  
  356.     var DNSNameArg = flag.Arg(0)
  357.     if DNSNameArg != "" {
  358.         DNSName = DNSNameArg
  359.     }
  360.  
  361.     if zoneIdStr != nil {
  362.         zoneId = *zoneIdStr
  363.     } else {
  364.         for {
  365.             // We try to get the Hosted Zone Id using exponential backoff
  366.             zoneId, err = getDNSHostedZoneId()
  367.             if err == nil {
  368.                 break
  369.             }
  370.             if sum > 8 {
  371.                 logErrorAndFail(err)
  372.             }
  373.             time.Sleep(time.Duration(sum) * time.Second)
  374.             sum += 2
  375.         }
  376.     }
  377.     configuration.HostedZoneId = zoneId
  378.     metadataClient := ec2metadata.New(session.New())
  379.     hostname, err := metadataClient.GetMetadata("/hostname")
  380.     configuration.Hostname = hostname
  381.     logErrorAndFail(err)
  382.     region, err := metadataClient.Region()
  383.     configuration.Region = region
  384.     logErrorAndFail(err)
  385.  
  386.     endpoint := "unix:///var/run/docker.sock"
  387.     startFn := func(event *docker.APIEvents) error {
  388.         var err error
  389.         container, err := dockerClient.InspectContainer(event.ID)
  390.         logErrorAndFail(err)
  391.         allService := getNetworkPortAndServiceName(container, true)
  392.         for _, svc := range allService {
  393.             if svc.Name != "" && svc.Port != "" {
  394.                 sum = 1
  395.                 for {
  396.                     if err = createDNSRecord(svc.Name, event.ID, svc.Port); err == nil {
  397.                         break
  398.                     }
  399.                     if sum > 8 {
  400.                         log.Error("Error creating DNS record")
  401.                         break
  402.                     }
  403.                     time.Sleep(time.Duration(sum) * time.Second)
  404.                     sum += 2
  405.                 }
  406.             }
  407.         }
  408.         if *sendEvents {
  409.             taskArn := getTaskArn(event.ID)
  410.             sendToCWEvents(`{ "dockerId": "` + event.ID + `","TaskArn":"` + taskArn + `" }`, "Task Started", configuration.Hostname, "awslabs.ecs.container" )
  411.         }
  412.         fmt.Println("Docker " + event.ID + " started")
  413.         return nil
  414.     }
  415.  
  416.     stopFn := func(event *docker.APIEvents) error {
  417.         var err error
  418.         container, err := dockerClient.InspectContainer(event.ID)
  419.         logErrorAndFail(err)
  420.         allService := getNetworkPortAndServiceName(container, false)
  421.         for _, svc := range allService {
  422.             if svc.Name != "" {
  423.                 sum = 1
  424.                 for {
  425.                     if err = deleteDNSRecord(svc.Name, event.ID); err == nil {
  426.                         break
  427.                     }
  428.                     if sum > 8 {
  429.                         log.Error("Error deleting DNS record")
  430.                         break
  431.                     }
  432.                     time.Sleep(time.Duration(sum) * time.Second)
  433.                     sum += 2
  434.                 }
  435.             }
  436.         }
  437.         if *sendEvents {
  438.             taskArn := getTaskArn(event.ID)
  439.             sendToCWEvents(`{ "dockerId": "` + event.ID + `","TaskArn":"` + taskArn + `" }`, "Task Stopped", configuration.Hostname, "awslabs.ecs.container" )
  440.         }
  441.         fmt.Println("Docker " + event.ID + " stopped")
  442.         return nil
  443.     }
  444.  
  445.     startHandler := &dockerHandler{
  446.         handlerFunc: startFn,
  447.     }
  448.     stopHandler := &dockerHandler{
  449.         handlerFunc: stopFn,
  450.     }
  451.     handlers := map[string][]handler{"start": []handler{startHandler}, "die": []handler{stopHandler}}
  452.  
  453.     dockerClient, _ = docker.NewClient(endpoint)
  454.     router, err := dockerEventsRouter(5, 5, dockerClient, handlers)
  455.     logErrorAndFail(err)
  456.     defer router.stop()
  457.     router.start()
  458.     fmt.Println("Waiting events")
  459.     select {}
  460. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement