Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package main
- import (
- "context"
- "encoding/csv"
- "fmt"
- "log"
- "math"
- "os"
- "strconv"
- "strings"
- "github.com/olivere/elastic"
- )
- func main() {
- // check if csv file were informed
- args := os.Args
- if len(args) < 2 {
- log.Fatal("CSV file path not informed.")
- }
- // check if all elasticsearch env vars were exported
- url := os.Getenv("ELASTIC_URL")
- index := os.Getenv("ELASTIC_INDEX")
- _type := os.Getenv("ELASTIC_TYPE")
- if url == "" || index == "" || _type == "" {
- log.Fatal("Please set all elastic search env vars.")
- }
- // open file
- file, err := os.Open(args[1])
- if err != nil {
- log.Fatal(err)
- }
- defer file.Close()
- reader := csv.NewReader(file)
- reader.Comma = ';'
- lines, err := reader.ReadAll()
- if err != nil {
- log.Fatal(err)
- }
- fmt.Println(url)
- // get elasticsearch client
- client, err := elastic.NewClient(elastic.SetURL(url), elastic.SetSniff(false), elastic.SetHealthcheck(false))
- if err != nil {
- log.Fatal(err)
- }
- ctx := context.Background()
- p, _ := client.BulkProcessor().Name("worker").BulkActions(250).Workers(2).Stats(true).Do(ctx)
- for i, line := range lines {
- // skip header
- if i == 1 {
- continue
- }
- // get data from csv line
- data := get(line)
- p.Add(elastic.NewBulkIndexRequest().Index(index).Type(_type).Doc(data))
- if err != nil {
- fmt.Println(err.Error())
- }
- if math.Mod(float64(i), float64(1000)) == 0 {
- printStats(p)
- }
- }
- err = p.Flush()
- if err != nil {
- fmt.Println("Error while flushing remaing requests: " + err.Error())
- }
- printStats(p)
- }
- func get(arr []string) *sus {
- return &sus{arr[0], arr[1], arr[2], arr[3], arr[4], arr[5], arr[6], arr[7], arr[8], arr[9], arr[10], arr[11], arr[12], arr[13], arr[14], arr[15], arr[16], arr[17], arr[18], arr[19], arr[20], arr[21], parseFloat(arr[22]), parseFloat(arr[23])}
- }
- type sus struct {
- Month string `json:"month"`
- Abi string `json:"abi"`
- AbiYear string `json:"abiYear"`
- Attendance string `json:"attendance"`
- Operator string `json:"operator"`
- AttendanceType string `json:"attendanceType"`
- State string `json:"state"`
- SelectedPlan string `json:"selectedPlan"`
- PlanReach string `json:"planReach"`
- SelectedType string `json:"selectedType"`
- PlanSegment string `json:"planSegment"`
- Sex string `json:"sex"`
- AgeGroup string `json:"ageGroup"`
- Specialty string `json:"specialty"`
- ProcedureCode string `json:"procedureCode"`
- Icd string `json:"icd"`
- DaysHospitalised string `json:"hospitalised"`
- CnesCode string `json:"cnesCode"`
- InstituteState string `json:"instituteState"`
- InstituteCity string `json:"instituteCity"`
- InstituteReach string `json:"instituteReach"`
- InstituteNature string `json:"instituteNature"`
- AttendanceValue float64 `json:"attendanceValue"`
- RefundValue float64 `json:"refundValue"`
- }
- func parseFloat(s string) float64 {
- f, _ := strconv.ParseFloat(strings.Replace(s, ",", ".", 1), 64)
- return f
- }
- func printStats(p *elastic.BulkProcessor) {
- stats := p.Stats()
- fmt.Println("-------------------------------------------")
- fmt.Printf("Number of times flush has been invoked: %d\n", stats.Flushed)
- fmt.Printf("Number of times workers committed reqs: %d\n", stats.Committed)
- fmt.Printf("Number of requests indexed : %d\n", stats.Indexed)
- fmt.Printf("Number of requests reported as created: %d\n", stats.Created)
- fmt.Printf("Number of requests reported as updated: %d\n", stats.Updated)
- fmt.Printf("Number of requests reported as success: %d\n", stats.Succeeded)
- fmt.Printf("Number of requests reported as failed : %d\n", stats.Failed)
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement