Advertisement
magheru_san

Untitled

May 10th, 2021
821
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Diff 5.95 KB | None | 0 0
  1. diff --git a/core/autoscaling.go b/core/autoscaling.go
  2. index f08f705..bd68227 100644
  3. --- a/core/autoscaling.go
  4. +++ b/core/autoscaling.go
  5. @@ -378,7 +378,7 @@ func (a *autoScalingGroup) replaceOnDemandInstanceWithSpot(spotInstanceID string
  6.  
  7.     } else {
  8.  
  9. -       if err := a.region.sqsSendMessageSpotInstanceLaunch(&a.name, &spotInstanceID, spotInst.State.Name); err != nil {
  10. +       if err := a.region.sqsSendMessageOnInstanceLaunch(&a.name, &spotInstanceID, spotInst.State.Name, "spot"); err != nil {
  11.             return err
  12.         }
  13.         // add to FinalRecap
  14. diff --git a/core/main.go b/core/main.go
  15. index 6d7f8ca..7b06eab 100644
  16. --- a/core/main.go
  17. +++ b/core/main.go
  18. @@ -195,6 +195,7 @@ func (a *AutoSpotting) convertRawEventToCloudwatchEvent(event *json.RawMessage)
  19.     if sqsEvent.Records != nil {
  20.         sqsRecord := sqsEvent.Records[0]
  21.         parseEvent = []byte(sqsRecord.Body)
  22. +       // this will tell us later if the current run was triggered from SQS events
  23.         a.config.sqsReceiptHandle = sqsRecord.ReceiptHandle
  24.     } else {
  25.         a.config.sqsReceiptHandle = ""
  26. @@ -256,7 +257,7 @@ func (a *AutoSpotting) processEvent(event *json.RawMessage) error {
  27.     if (eventType == InstanceStateChangeNotificationCode ||
  28.         eventType == SpotInstanceInterruptionWarningCode ||
  29.         eventType == InstanceRebalanceRecommendationCode) && instanceID != nil {
  30. -       // Hanlde Instance Events
  31. +       // Handle Instance Events
  32.         log.SetPrefix(fmt.Sprintf("%s:%s ", eventType, *instanceID))
  33.         a.processEventInstance(eventType, cloudwatchEvent.Region, instanceID, instanceState)
  34.     } else if eventType == AWSAPICallCloudTrailCode {
  35. @@ -357,7 +358,7 @@ func (a *AutoSpotting) handleLifecycleHookEvent(event events.CloudWatchEvent) er
  36.         "attempting to swap it against a running on-demand instance",
  37.         i.region.name, *i.InstanceId)
  38.  
  39. -   i.region.sqsSendMessageSpotInstanceLaunch(asgName, i.InstanceId, i.State.Name)
  40. +   i.region.sqsSendMessageOnInstanceLaunch(asgName, i.InstanceId, i.State.Name, "spot")
  41.  
  42.     return nil
  43.  }
  44. @@ -411,6 +412,19 @@ func (a *AutoSpotting) handleNewInstanceLaunch(regionName string, instanceID str
  45.  
  46.  func (a *AutoSpotting) handleNewOnDemandInstanceLaunch(r *region, i *instance) error {
  47.     if i.shouldBeReplacedWithSpot(false) {
  48. +
  49. +       // In case we're not triggered by SQS event we generate such an event and send it to the queue.
  50. +       // We want to delay the further below code for until we're processing it through the SQS queue,
  51. +       // in order to avoid launching Spot instances too early and having them run outside their ASG
  52. +       // for too long.
  53. +       if len(a.config.sqsReceiptHandle) == 0 {
  54. +           if i.asg.isEnabledForEventBasedInstanceReplacement() {
  55. +               return i.region.sqsSendMessageOnInstanceLaunch(&i.asg.name, i.InstanceId, i.State.Name, "on-demand")
  56. +           }
  57. +           return nil
  58. +       }
  59. +       defer i.region.sqsDeleteMessage(i.InstanceId, "on-demand")
  60. +
  61.         log.Printf("%s instance %s belongs to an enabled ASG and should be "+
  62.             "replaced with spot, attempting to launch spot replacement",
  63.             i.region.name, *i.InstanceId)
  64. @@ -447,13 +461,14 @@ func (a *AutoSpotting) handleNewSpotInstanceLaunch(r *region, i *instance) error
  65.         return fmt.Errorf("region %s is missing asg data", i.region.name)
  66.     }
  67.  
  68. +   // in case we're not triggered by SQS event
  69.     if len(a.config.sqsReceiptHandle) == 0 {
  70.         if asg.isEnabledForEventBasedInstanceReplacement() {
  71. -           i.region.sqsSendMessageSpotInstanceLaunch(asgName, i.InstanceId, i.State.Name)
  72. +           return i.region.sqsSendMessageOnInstanceLaunch(asgName, i.InstanceId, i.State.Name, "spot")
  73.         }
  74.         return nil
  75.     }
  76. -   defer i.region.sqsDeleteMessage(i.InstanceId)
  77. +   defer i.region.sqsDeleteMessage(i.InstanceId, "spot")
  78.  
  79.     log.Printf("%s Found instance %s is not yet attached to its ASG, "+
  80.         "attempting to swap it against a running on-demand instance",
  81. diff --git a/core/region.go b/core/region.go
  82. index a3eed6e..3c1c762 100644
  83. --- a/core/region.go
  84. +++ b/core/region.go
  85. @@ -478,7 +478,7 @@ func (r *region) findEnabledASGByName(name string) *autoScalingGroup {
  86.     return nil
  87.  }
  88.  
  89. -func (r *region) sqsSendMessageSpotInstanceLaunch(asgName *string, instanceID *string, instanceState *string) error {
  90. +func (r *region) sqsSendMessageOnInstanceLaunch(asgName, instanceID, instanceState *string, instanceLifecycle string) error {
  91.     inputJSON := "{\"version\":\"0\",\"id\":\"890abcde-f123-4567-890a-bcdef1234567\"," +
  92.         "\"detail-type\":\"EC2 Instance State-change Notification\",\"source\":\"aws.events\"," +
  93.         "\"account\":\"\",\"time\":\"" + time.Now().Format(time.RFC3339) + "\"," +
  94. @@ -497,18 +497,18 @@ func (r *region) sqsSendMessageSpotInstanceLaunch(asgName *string, instanceID *s
  95.         })
  96.  
  97.     if err != nil {
  98. -       log.Printf("%s Error sending spot instance %s launch event message "+
  99. -           "to the SQS Queue %s: %s", r.name, *instanceID, r.conf.SQSQueueURL, err)
  100. +       log.Printf("%s Error sending %s instance %s launch event message "+
  101. +           "to the SQS Queue %s: %s", r.name, instanceLifecycle, *instanceID, r.conf.SQSQueueURL, err)
  102.         return err
  103.     }
  104.  
  105. -   log.Printf("%s Successfully sent spot instance %s launch event message"+
  106. -       "to the SQS Queue %s", r.name, *instanceID, r.conf.SQSQueueURL)
  107. +   log.Printf("%s Successfully sent %s instance %s launch event message"+
  108. +       "to the SQS Queue %s", r.name, instanceLifecycle, *instanceID, r.conf.SQSQueueURL)
  109.  
  110.     return nil
  111.  }
  112.  
  113. -func (r *region) sqsDeleteMessage(instanceID *string) error {
  114. +func (r *region) sqsDeleteMessage(instanceID *string, instanceLifecycle string) error {
  115.     svc := r.services.sqs
  116.  
  117.     _, err := svc.DeleteMessage(
  118. @@ -517,8 +517,8 @@ func (r *region) sqsDeleteMessage(instanceID *string) error {
  119.             ReceiptHandle: &r.conf.sqsReceiptHandle,
  120.         })
  121.     if err != nil {
  122. -       log.Printf("%s Error deleting spot instance %s launch event message "+
  123. -           "from the SQS Queue %s: %s", r.name, *instanceID, r.conf.SQSQueueURL, err)
  124. +       log.Printf("%s Error deleting %s instance %s launch event message "+
  125. +           "from the SQS Queue %s: %s", r.name, instanceLifecycle, *instanceID, r.conf.SQSQueueURL, err)
  126.         return err
  127.     }
  128.  
  129.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement