Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package main
- import (
- "bufio"
- "database/sql"
- "flag"
- "fmt"
- "github.com/satori/go.uuid"
- "sync"
- //"go/scanner"
- "log"
- "os"
- "pds-sample/client"
- "strings"
- "time"
- _ "github.com/lib/pq"
- )
- type PhoneNumber struct {
- CustomerId string
- PhoneNumber string
- StartTime string
- EndTime string
- sipId string
- }
- const sleepTime time.Duration = 10
- func main() {
- //Settings vars
- //var projectId int32 = 27
- //var selectCount int32 = 500
- //var queueId int32 = 26673
- projectIdFlag := flag.Int("project", 0, "int projectId")
- queueIdFlag := flag.Int("queue", 0, "int queueId")
- selectCountFlag := flag.Int("select-count", 100, "int Buffercount")
- flag.Parse()
- projectId := int32(*projectIdFlag)
- queueId := int32(*queueIdFlag)
- selectCount := int32(*selectCountFlag)
- fmt.Println("projectId", projectId)
- fmt.Println("queueId", queueId)
- fmt.Println("selectCount", selectCount)
- if projectId == 0 {
- fmt.Println("project-id cannot be " + " set project-id")
- panic("wrong params")
- }
- if queueId == 0 {
- fmt.Println("queue-id cannot be " + " set queue-id")
- panic("wrong params")
- }
- prop, err := client.NewAgentProperties(client.NewAuth(141354, "e91caeab-b563-4265-be0b-bf636579a65a"))
- if err != nil {
- panic(err)
- }
- conn, err := client.NewConn(prop.Host)
- if err != nil {
- panic(err)
- }
- defer conn.Close()
- agent, err := client.NewAgent(conn, prop.Auth, &client.PDSConf {
- RuleID: 1456819, //1869031,
- QueueID: queueId, //26205,26205
- ReferenceIP: "127.0.0.1",
- AvgTimeTalkSec: 80.0,
- PercentSuccessful: 0.4,
- MaximumErrorRate: 0.15,
- SessionID: uuid.NewV4().String(),
- })
- if err != nil {
- panic(err)
- }
- log.Println(" PROJECT ID: ", projectId)
- log.Println(" PROJECT START queueId: ", queueId)
- // START SQL
- connStr := "postgres://user:host@127.0.0.1/"
- db, err := sql.Open("postgres", connStr)
- if err != nil {
- log.Fatal(err)
- }
- numbersArray, e := readFromDbToStructArrWithSleep(db, projectId, selectCount)
- // end SQL
- if e != nil {
- log.Println("DB READ READ ERROR: ", )
- log.Fatal(e)
- }
- log.Println(" LENGHT START::: ", len(numbersArray))
- taskChan := agent.GetTaskChannel()
- receiveChan := agent.GetReceiveChannel() //DEBUG OLEG
- wg:=&sync.WaitGroup{}
- wg.Add(1)
- go func() {
- defer close2(taskChan)
- defer wg.Done()
- for {
- taskCount := <- receiveChan // OLEG
- //taskCount := 30
- log.Println("FROM RECEIVE_CHANEL_taskCount: ", taskCount)
- taskNumbers := make([]map[string]interface{}, 0, taskCount)
- key := 0
- if taskCount > 0 {
- for taskCount > 0 {
- task := map[string]interface{} {
- "customerId": numbersArray[0].CustomerId,
- "phone_number": numbersArray[0].PhoneNumber,
- "endTime": numbersArray[0].EndTime,
- "startTime": numbersArray[0].StartTime,
- "sipCredentialsId" : numbersArray[0].sipId,
- }
- log.Println(fmt.Sprintf( "%d TASK VALUE:: ", key), task)
- if checkGeoTime(task) {
- taskChan <- task
- taskCount--
- }
- if len(numbersArray) == 1 {
- numbersArray = append(numbersArray[:0])
- } else {
- numbersArray = append(numbersArray[:0], numbersArray[1:]...) // Удаление через один TODO поправить на что-то более адекватное
- }
- key++
- taskNumbers = append(taskNumbers, task)
- //log.Println("taskCount_AFTER_CHAN:: ", taskCount)
- if taskCount <= 0 || len(numbersArray) == 0 {
- break
- }
- }
- if len(numbersArray) <= 0 {
- numbersArray, e = readFromDbToStructArrWithSleep(db, projectId, selectCount)
- if e != nil {
- fmt.Println("BAD DB CONNECTION ERR: ")
- }
- log.Println("GET FROM DB LENGHT::: ", len(numbersArray))
- }
- } // if tascCount > 0
- }
- }()
- wg.Wait()
- for repeat := 5; repeat > 0; repeat -- {
- err = agent.Start()
- if err != nil {
- time.Sleep(2 * time.Second)
- fmt.Println(err)
- }
- }
- }
- func close2(taskChan chan<- map[string]interface{}) {
- log.Println("CLOSE CHANNEL: \n")
- close(taskChan)
- }
- // @deprecated
- func readCsvToMap(path string) (map[int]map[string]interface{}, error) {
- file, err := os.Open(path)
- if err != nil {
- return nil, err
- }
- defer file.Close()
- numbersArray := map[int]map[string]interface{} {}
- i := -1
- scanner := bufio.NewScanner(file)
- for scanner.Scan() {
- if i == -1 { // пропустим первую строку с шапкой
- i++
- continue
- }
- strData := strings.Split(scanner.Text(), ";")
- contact := map[string]interface{} {
- "phone_number": strData[1],
- "customerId":strData[0],
- }
- numbersArray[i] = contact
- i++
- }
- return numbersArray, scanner.Err()
- }
- func checkGeoTime(numberArray map[string]interface{}) bool {
- nowUnix := time.Now().UTC().Unix()
- p("NOW_TIME: ", time.Now().UTC())
- p("NOW_UNIX: ", nowUnix)
- timeStr := time.Now().UTC().Format("2006-01-02")
- endTimeFromArr := strings.Split(fmt.Sprintf("%s", numberArray["endTime"]), "T")
- endTimeStr := fmt.Sprintf("%sT%s", timeStr, endTimeFromArr[1])
- p("END TIME STR: ", endTimeStr)
- endTime, err := time.Parse(time.RFC3339, endTimeStr)
- p("END TIME: ", endTime)
- p("END UNIX: ", endTime.Unix())
- if err != nil {
- log.Println("TIME CONVERT ERROR: ", err)
- }
- if nowUnix > endTime.Unix() {
- log.Println("BAD GEO customerId, phone_number, endTimeUTC : ", numberArray["customerId"], numberArray["phone_number"], endTime)
- return false
- }
- return true
- }
- func readCsvToStructArr(path string) ([]PhoneNumber, error) {
- file, err := os.Open(path)
- if err != nil {
- return nil, err
- }
- defer file.Close()
- var numbersArray []PhoneNumber
- i := -1
- scanner := bufio.NewScanner(file)
- for scanner.Scan() {
- if i == -1 { // пропустим первую строку с шапкой
- i++
- continue
- }
- strData := strings.Split(scanner.Text(), ";")
- contact := PhoneNumber {
- CustomerId: strData[0],
- PhoneNumber: strData[1],
- StartTime: strings.ReplaceAll(strData[6], "\"", ""),
- EndTime: strings.ReplaceAll(strData[7], "\"", ""),
- }
- numbersArray = append(numbersArray, contact)
- i++
- }
- return numbersArray, scanner.Err()
- }
- func p(text string, thisVar interface{}) {
- print := false
- if print {
- fmt.Println(text, thisVar)
- }
- }
- func readFromDbToStructArr(db *sql.DB, projectId int32, selectCount int32) ([]PhoneNumber, error) {
- var numbersArray []PhoneNumber
- sql := "UPDATE call_list SET is_serviced = true " +
- "WHERE id IN " +
- "(SELECT id FROM call_list WHERE project_id = $1 AND (is_serviced = false or is_serviced ISNULL) ORDER BY id DESC LIMIT $2)" +
- "RETURNING customer_id, phone, start_execution_time, end_execution_time, sip_credentials_id"
- rows, err := db.Query(sql, projectId, selectCount)
- if err != nil {
- fmt.Printf("%#v\n", err)
- return numbersArray, err
- }
- for rows.Next(){
- var pn PhoneNumber
- err := rows.Scan(&pn.CustomerId, &pn.PhoneNumber, &pn.StartTime, &pn.EndTime, &pn.sipId)
- if err != nil{
- fmt.Println(err)
- continue
- }
- numbersArray = append(numbersArray, pn)
- }
- return numbersArray, err
- }
- func readFromDbToStructArrWithSleep(db *sql.DB, projectId int32, selectCount int32) ([]PhoneNumber, error) {
- numbersArray, e := readFromDbToStructArr(db, projectId, selectCount)
- for len(numbersArray) == 0 {
- time.Sleep(sleepTime * time.Second)
- fmt.Println("SLEEP " + fmt.Sprintf("%s", sleepTime))
- numbersArray, e = readFromDbToStructArr(db, projectId, selectCount)
- }
- return numbersArray, e
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement