Advertisement
diegogaulke

golang elasticsearch bulk insert

Feb 1st, 2019
152
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 3.74 KB | None | 0 0
  1. package main
  2.  
  3. import (
  4.     "context"
  5.     "encoding/csv"
  6.     "fmt"
  7.     "log"
  8.     "math"
  9.     "os"
  10.     "strconv"
  11.     "strings"
  12.  
  13.     "github.com/olivere/elastic"
  14. )
  15.  
  16. func main() {
  17.     // check if csv file were informed
  18.     args := os.Args
  19.     if len(args) < 2 {
  20.         log.Fatal("CSV file path not informed.")
  21.     }
  22.  
  23.     // check if all elasticsearch env vars were exported
  24.     url := os.Getenv("ELASTIC_URL")
  25.     index := os.Getenv("ELASTIC_INDEX")
  26.     _type := os.Getenv("ELASTIC_TYPE")
  27.  
  28.     if url == "" || index == "" || _type == "" {
  29.         log.Fatal("Please set all elastic search env vars.")
  30.     }
  31.  
  32.     // open file
  33.     file, err := os.Open(args[1])
  34.     if err != nil {
  35.         log.Fatal(err)
  36.     }
  37.     defer file.Close()
  38.  
  39.     reader := csv.NewReader(file)
  40.     reader.Comma = ';'
  41.  
  42.     lines, err := reader.ReadAll()
  43.     if err != nil {
  44.         log.Fatal(err)
  45.     }
  46.  
  47.     fmt.Println(url)
  48.  
  49.     //  get elasticsearch client
  50.     client, err := elastic.NewClient(elastic.SetURL(url), elastic.SetSniff(false), elastic.SetHealthcheck(false))
  51.     if err != nil {
  52.         log.Fatal(err)
  53.     }
  54.  
  55.     ctx := context.Background()
  56.     p, _ := client.BulkProcessor().Name("worker").BulkActions(250).Workers(2).Stats(true).Do(ctx)
  57.  
  58.     for i, line := range lines {
  59.         // skip header
  60.         if i == 1 {
  61.             continue
  62.         }
  63.  
  64.         // get data from csv line
  65.         data := get(line)
  66.         p.Add(elastic.NewBulkIndexRequest().Index(index).Type(_type).Doc(data))
  67.         if err != nil {
  68.             fmt.Println(err.Error())
  69.         }
  70.  
  71.         if math.Mod(float64(i), float64(1000)) == 0 {
  72.             printStats(p)
  73.         }
  74.     }
  75.  
  76.     err = p.Flush()
  77.     if err != nil {
  78.         fmt.Println("Error while flushing remaing requests: " + err.Error())
  79.     }
  80.  
  81.     printStats(p)
  82.  
  83. }
  84.  
  85. func get(arr []string) *sus {
  86.     return &sus{arr[0], arr[1], arr[2], arr[3], arr[4], arr[5], arr[6], arr[7], arr[8], arr[9], arr[10], arr[11], arr[12], arr[13], arr[14], arr[15], arr[16], arr[17], arr[18], arr[19], arr[20], arr[21], parseFloat(arr[22]), parseFloat(arr[23])}
  87. }
  88.  
  89. type sus struct {
  90.     Month            string  `json:"month"`
  91.     Abi              string  `json:"abi"`
  92.     AbiYear          string  `json:"abiYear"`
  93.     Attendance       string  `json:"attendance"`
  94.     Operator         string  `json:"operator"`
  95.     AttendanceType   string  `json:"attendanceType"`
  96.     State            string  `json:"state"`
  97.     SelectedPlan     string  `json:"selectedPlan"`
  98.     PlanReach        string  `json:"planReach"`
  99.     SelectedType     string  `json:"selectedType"`
  100.     PlanSegment      string  `json:"planSegment"`
  101.     Sex              string  `json:"sex"`
  102.     AgeGroup         string  `json:"ageGroup"`
  103.     Specialty        string  `json:"specialty"`
  104.     ProcedureCode    string  `json:"procedureCode"`
  105.     Icd              string  `json:"icd"`
  106.     DaysHospitalised string  `json:"hospitalised"`
  107.     CnesCode         string  `json:"cnesCode"`
  108.     InstituteState   string  `json:"instituteState"`
  109.     InstituteCity    string  `json:"instituteCity"`
  110.     InstituteReach   string  `json:"instituteReach"`
  111.     InstituteNature  string  `json:"instituteNature"`
  112.     AttendanceValue  float64 `json:"attendanceValue"`
  113.     RefundValue      float64 `json:"refundValue"`
  114. }
  115.  
  116. func parseFloat(s string) float64 {
  117.     f, _ := strconv.ParseFloat(strings.Replace(s, ",", ".", 1), 64)
  118.     return f
  119. }
  120.  
  121. func printStats(p *elastic.BulkProcessor) {
  122.     stats := p.Stats()
  123.     fmt.Println("-------------------------------------------")
  124.     fmt.Printf("Number of times flush has been invoked: %d\n", stats.Flushed)
  125.     fmt.Printf("Number of times workers committed reqs: %d\n", stats.Committed)
  126.     fmt.Printf("Number of requests indexed            : %d\n", stats.Indexed)
  127.     fmt.Printf("Number of requests reported as created: %d\n", stats.Created)
  128.     fmt.Printf("Number of requests reported as updated: %d\n", stats.Updated)
  129.     fmt.Printf("Number of requests reported as success: %d\n", stats.Succeeded)
  130.     fmt.Printf("Number of requests reported as failed : %d\n", stats.Failed)
  131. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement