Advertisement
ddsKrtm

Untitled

Sep 17th, 2019
144
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 7.60 KB | None | 0 0
  1. package main
  2.  
  3. import (
  4.     "bufio"
  5.     "database/sql"
  6.     "flag"
  7.     "fmt"
  8.     "github.com/satori/go.uuid"
  9.     "sync"
  10.  
  11.     //"go/scanner"
  12.     "log"
  13.     "os"
  14.     "pds-sample/client"
  15.     "strings"
  16.     "time"
  17.  
  18.     _ "github.com/lib/pq"
  19.  
  20. )
  21.  
  22. type PhoneNumber struct {
  23.     CustomerId  string
  24.     PhoneNumber string
  25.     StartTime   string
  26.     EndTime     string
  27.     sipId       string
  28. }
  29.  
  30. const sleepTime time.Duration = 10
  31.  
  32. func main() {
  33.  
  34.     //Settings vars
  35.     //var projectId int32 = 27
  36.     //var selectCount int32 = 500
  37.     //var queueId int32 = 26673
  38.  
  39.     projectIdFlag := flag.Int("project", 0, "int projectId")
  40.     queueIdFlag := flag.Int("queue", 0, "int queueId")
  41.     selectCountFlag := flag.Int("select-count", 100, "int Buffercount")
  42.     flag.Parse()
  43.  
  44.     projectId := int32(*projectIdFlag)
  45.     queueId := int32(*queueIdFlag)
  46.     selectCount := int32(*selectCountFlag)
  47.  
  48.     fmt.Println("projectId", projectId)
  49.     fmt.Println("queueId", queueId)
  50.     fmt.Println("selectCount", selectCount)
  51.  
  52.     if projectId == 0 {
  53.         fmt.Println("project-id cannot be " + " set project-id")
  54.         panic("wrong params")
  55.     }
  56.  
  57.     if queueId == 0 {
  58.         fmt.Println("queue-id cannot be " + " set queue-id")
  59.         panic("wrong params")
  60.     }
  61.  
  62.     prop, err := client.NewAgentProperties(client.NewAuth(141354, "e91caeab-b563-4265-be0b-bf636579a65a"))
  63.     if err != nil {
  64.         panic(err)
  65.     }
  66.  
  67.     conn, err := client.NewConn(prop.Host)
  68.     if err != nil {
  69.         panic(err)
  70.     }
  71.     defer conn.Close()
  72.  
  73.     agent, err := client.NewAgent(conn, prop.Auth, &client.PDSConf {
  74.         RuleID:            1456819, //1869031,
  75.         QueueID:           queueId, //26205,26205
  76.         ReferenceIP:       "127.0.0.1",
  77.         AvgTimeTalkSec:    80.0,
  78.         PercentSuccessful: 0.4,
  79.         MaximumErrorRate:  0.15,
  80.         SessionID:         uuid.NewV4().String(),
  81.     })
  82.     if err != nil {
  83.         panic(err)
  84.     }
  85.  
  86.     log.Println(" PROJECT ID: ", projectId)
  87.     log.Println(" PROJECT START queueId: ", queueId)
  88.  
  89.     // START SQL
  90.  
  91.     connStr := "postgres://user:host@127.0.0.1/"
  92.     db, err := sql.Open("postgres", connStr)
  93.     if err != nil {
  94.         log.Fatal(err)
  95.     }
  96.  
  97.     numbersArray, e := readFromDbToStructArrWithSleep(db, projectId, selectCount)
  98.     // end SQL
  99.  
  100.     if e != nil {
  101.         log.Println("DB READ READ ERROR: ", )
  102.         log.Fatal(e)
  103.     }
  104.  
  105.     log.Println(" LENGHT START:::   ", len(numbersArray))
  106.  
  107.  
  108.     taskChan := agent.GetTaskChannel()
  109.     receiveChan := agent.GetReceiveChannel() //DEBUG OLEG
  110.  
  111.     wg:=&sync.WaitGroup{}
  112.     wg.Add(1)
  113.  
  114.     go func() {
  115.         defer  close2(taskChan)
  116.         defer wg.Done()
  117.  
  118.         for {
  119.             taskCount := <- receiveChan // OLEG
  120.             //taskCount := 30
  121.             log.Println("FROM RECEIVE_CHANEL_taskCount:  ", taskCount)
  122.             taskNumbers := make([]map[string]interface{}, 0, taskCount)
  123.  
  124.             key := 0
  125.  
  126.             if taskCount > 0 {
  127.                 for taskCount > 0 {
  128.                     task := map[string]interface{} {
  129.                         "customerId":   numbersArray[0].CustomerId,
  130.                         "phone_number": numbersArray[0].PhoneNumber,
  131.                         "endTime": numbersArray[0].EndTime,
  132.                         "startTime": numbersArray[0].StartTime,
  133.                         "sipCredentialsId" : numbersArray[0].sipId,
  134.                     }
  135.  
  136.                     log.Println(fmt.Sprintf( "%d TASK VALUE::  ", key), task)
  137.  
  138.                     if checkGeoTime(task) {
  139.                         taskChan <- task
  140.                         taskCount--
  141.                     }
  142.  
  143.                     if len(numbersArray) == 1 {
  144.                         numbersArray = append(numbersArray[:0])
  145.                     } else {
  146.                         numbersArray = append(numbersArray[:0], numbersArray[1:]...) // Удаление через один TODO поправить на что-то более адекватное
  147.                     }
  148.  
  149.                     key++
  150.  
  151.                     taskNumbers = append(taskNumbers, task)
  152.                     //log.Println("taskCount_AFTER_CHAN::  ", taskCount)
  153.                     if taskCount <= 0 || len(numbersArray) == 0 {
  154.                         break
  155.                     }
  156.  
  157.                 }
  158.  
  159.                 if len(numbersArray) <= 0 {
  160.                     numbersArray, e = readFromDbToStructArrWithSleep(db, projectId, selectCount)
  161.                     if e != nil {
  162.                         fmt.Println("BAD DB CONNECTION ERR: ")
  163.                     }
  164.  
  165.                     log.Println("GET FROM DB LENGHT::: ", len(numbersArray))
  166.                 }
  167.  
  168.             } // if tascCount > 0
  169.         }
  170.     }()
  171.  
  172.     wg.Wait()
  173.  
  174.     for repeat := 5; repeat > 0; repeat -- {
  175.         err = agent.Start()
  176.         if err != nil {
  177.             time.Sleep(2 * time.Second)
  178.             fmt.Println(err)
  179.         }
  180.     }
  181. }
  182.  
  183. func close2(taskChan chan<- map[string]interface{}) {
  184.     log.Println("CLOSE CHANNEL: \n")
  185.     close(taskChan)
  186. }
  187.  
  188. // @deprecated
  189. func readCsvToMap(path string) (map[int]map[string]interface{}, error) {
  190.  
  191.     file, err := os.Open(path)
  192.     if err != nil {
  193.         return nil, err
  194.     }
  195.     defer file.Close()
  196.  
  197.     numbersArray := map[int]map[string]interface{} {}
  198.     i := -1
  199.     scanner := bufio.NewScanner(file)
  200.     for scanner.Scan() {
  201.         if i == -1 { // пропустим первую строку с шапкой
  202.             i++
  203.             continue
  204.         }
  205.         strData := strings.Split(scanner.Text(), ";")
  206.         contact := map[string]interface{} {
  207.             "phone_number": strData[1],
  208.             "customerId":strData[0],
  209.         }
  210.         numbersArray[i] = contact
  211.         i++
  212.     }
  213.  
  214.     return numbersArray, scanner.Err()
  215. }
  216.  
  217. func checkGeoTime(numberArray map[string]interface{}) bool {
  218.  
  219.     nowUnix := time.Now().UTC().Unix()
  220.  
  221.     p("NOW_TIME: ", time.Now().UTC())
  222.     p("NOW_UNIX: ", nowUnix)
  223.  
  224.     timeStr := time.Now().UTC().Format("2006-01-02")
  225.     endTimeFromArr := strings.Split(fmt.Sprintf("%s", numberArray["endTime"]), "T")
  226.     endTimeStr := fmt.Sprintf("%sT%s", timeStr, endTimeFromArr[1])
  227.  
  228.     p("END TIME STR: ", endTimeStr)
  229.  
  230.     endTime, err := time.Parse(time.RFC3339, endTimeStr)
  231.  
  232.     p("END TIME: ", endTime)
  233.     p("END UNIX: ", endTime.Unix())
  234.  
  235.  
  236.     if err != nil {
  237.         log.Println("TIME CONVERT ERROR: ", err)
  238.     }
  239.  
  240.     if nowUnix > endTime.Unix() {
  241.         log.Println("BAD GEO customerId, phone_number, endTimeUTC : ", numberArray["customerId"], numberArray["phone_number"], endTime)
  242.         return false
  243.     }
  244.  
  245.     return true
  246. }
  247.  
  248. func readCsvToStructArr(path string) ([]PhoneNumber, error) {
  249.  
  250.     file, err := os.Open(path)
  251.     if err != nil {
  252.         return nil, err
  253.     }
  254.     defer file.Close()
  255.  
  256.     var numbersArray []PhoneNumber
  257.  
  258.     i := -1
  259.     scanner := bufio.NewScanner(file)
  260.     for scanner.Scan() {
  261.         if i == -1 { // пропустим первую строку с шапкой
  262.             i++
  263.             continue
  264.         }
  265.         strData := strings.Split(scanner.Text(), ";")
  266.         contact := PhoneNumber {
  267.             CustomerId:  strData[0],
  268.             PhoneNumber: strData[1],
  269.             StartTime: strings.ReplaceAll(strData[6], "\"", ""),
  270.             EndTime: strings.ReplaceAll(strData[7], "\"", ""),
  271.         }
  272.         numbersArray = append(numbersArray, contact)
  273.         i++
  274.     }
  275.  
  276.     return numbersArray, scanner.Err()
  277. }
  278.  
  279. func p(text string, thisVar interface{}) {
  280.     print := false
  281.  
  282.     if print {
  283.         fmt.Println(text, thisVar)
  284.     }
  285. }
  286.  
  287.  
  288. func readFromDbToStructArr(db *sql.DB, projectId int32, selectCount int32) ([]PhoneNumber, error) {
  289.  
  290.     var numbersArray []PhoneNumber
  291.  
  292.     sql := "UPDATE call_list SET is_serviced = true " +
  293.             "WHERE id IN " +
  294.             "(SELECT id FROM call_list WHERE project_id = $1 AND (is_serviced = false or is_serviced ISNULL)  ORDER BY id DESC LIMIT $2)" +
  295.             "RETURNING customer_id, phone, start_execution_time, end_execution_time, sip_credentials_id"
  296.  
  297.     rows, err := db.Query(sql, projectId, selectCount)
  298.     if err != nil {
  299.         fmt.Printf("%#v\n", err)
  300.         return numbersArray, err
  301.     }
  302.  
  303.     for rows.Next(){
  304.         var pn PhoneNumber
  305.  
  306.         err := rows.Scan(&pn.CustomerId, &pn.PhoneNumber, &pn.StartTime, &pn.EndTime, &pn.sipId)
  307.         if err != nil{
  308.             fmt.Println(err)
  309.             continue
  310.         }
  311.         numbersArray = append(numbersArray, pn)
  312.     }
  313.  
  314.  
  315.     return numbersArray, err
  316. }
  317.  
  318. func readFromDbToStructArrWithSleep(db *sql.DB, projectId int32, selectCount int32) ([]PhoneNumber, error) {
  319.  
  320.     numbersArray, e := readFromDbToStructArr(db, projectId, selectCount)
  321.  
  322.     for len(numbersArray) == 0 {
  323.         time.Sleep(sleepTime * time.Second)
  324.         fmt.Println("SLEEP " + fmt.Sprintf("%s", sleepTime))
  325.         numbersArray, e = readFromDbToStructArr(db, projectId, selectCount)
  326.     }
  327.  
  328.  
  329.     return numbersArray, e
  330. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement