andrejsstepanovs

fast data (imdb) importer challenge

Feb 5th, 2022 (edited)
1,099
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 6.21 KB | None | 0 0
  1. package main
  2.  
  3. //Download and unpack data file, start db and run importer.
  4. //Play around with numbers to get max performance out of your system.
  5. //Code is using goroutines quite heavily to get min import time.
  6. //
  7. //```bash
  8. //wget https://datasets.imdbws.com/title.basics.tsv.gz
  9. //unarchive somewhere (to ./data/)
  10. //service mysql start
  11. //
  12. //# run directly binary
  13. //bin/imdb-andrejs "root:test@tcp(127.0.0.1:3306)/imdb" .\data\data.tsv 3000 120
  14. //# or from source code
  15. //go run .\main.go "root:test@tcp(127.0.0.1:3306)/imdb" .\data\data.tsv 3000 120
  16. //
  17. //# where
  18. //# 3000 = batch size
  19. //# 120 = writer count (db connections)
  20. //```
  21.  
  22. import (
  23.     "bufio"
  24.     "context"
  25.     "database/sql"
  26.     "fmt"
  27.     _ "github.com/go-sql-driver/mysql"
  28.     "log"
  29.     "os"
  30.     "strconv"
  31.     "strings"
  32.     "sync"
  33.     "time"
  34. )
  35.  
  36. type Job struct {
  37.     from  uint32
  38.     till  uint32
  39.     lines [][]string
  40. }
  41.  
  42. type Result struct {
  43.     job     Job
  44.     success bool
  45. }
  46.  
  47. var jobs chan Job
  48. var results chan Result
  49.  
  50. const insertQuery = "INSERT  INTO titles (`id`, `titleType`, `primaryTitle`, `originalTitle`, `isAdult`, `startYear`, `endYear`, `runtimeMinutes`, `genres`) VALUES "
  51. const queryVals = "(?, ?, ?, ?, ?, ?, ?, ?, ?)"
  52.  
  53. var dbConnStr string
  54.  
  55. func getDb() *sql.DB {
  56.     db, dbErr := sql.Open("mysql", dbConnStr)
  57.     if dbErr != nil {
  58.         log.Fatal(dbErr)
  59.         return nil
  60.     }
  61.  
  62.     return db
  63. }
  64.  
  65. func processLines(lines [][]string, ctx context.Context, db *sql.DB) bool {
  66.     var inserts []string
  67.     var params []interface{}
  68.  
  69.     var startYear, endYear, runtimeMinutes int
  70.     for _, columns := range lines {
  71.         inserts = append(inserts, queryVals)
  72.         startYear, _ = strconv.Atoi(columns[5])
  73.         endYear, _ = strconv.Atoi(columns[6])
  74.         runtimeMinutes, _ = strconv.Atoi(columns[7])
  75.  
  76.         params = append(params, columns[0], columns[1], columns[2], columns[3], columns[4], startYear, endYear, runtimeMinutes, columns[8])
  77.     }
  78.     var query = insertQuery + strings.Join(inserts, ",")
  79.  
  80.     stmt, err := db.PrepareContext(ctx, query)
  81.     if err != nil {
  82.         log.Printf("#1Error %s", err)
  83.         log.Fatal(err)
  84.     }
  85.  
  86.     res, err := stmt.ExecContext(ctx, params...)
  87.     if err != nil {
  88.         log.Printf("#2Error %s", err)
  89.         return false
  90.     }
  91.  
  92.     stmt.Close()
  93.  
  94.     _, errAffected := res.RowsAffected()
  95.     if errAffected != nil {
  96.         log.Printf("Error %s when finding rows affected", errAffected)
  97.         log.Fatal(errAffected)
  98.  
  99.         return false
  100.     }
  101.  
  102.     return true
  103. }
  104.  
  105. func worker(wg *sync.WaitGroup, db *sql.DB) {
  106.     ctx := context.Background()
  107.  
  108.     for job := range jobs {
  109.         output := Result{job, processLines(job.lines, ctx, db)}
  110.         results <- output
  111.     }
  112.     wg.Done()
  113. }
  114.  
  115. func createWorkerPool(noOfWorkers uint32) {
  116.     var wg sync.WaitGroup
  117.     var i uint32
  118.  
  119.     db := getDb()
  120.     defer db.Close()
  121.  
  122.     for i = 0; i < noOfWorkers; i++ {
  123.         wg.Add(1)
  124.         go worker(&wg, db)
  125.     }
  126.     wg.Wait()
  127.     close(results)
  128. }
  129.  
  130. func allocate(scanner *bufio.Scanner, batchSize uint32) {
  131.     var from, k uint32
  132.     var header = true
  133.     var stopIndexSize, i uint32
  134.  
  135.     batchedLines := make([][]string, batchSize)
  136.     stopIndexSize = batchSize - 1
  137.  
  138.     for scanner.Scan() {
  139.         if header {
  140.             header = false
  141.             continue
  142.         }
  143.         k++
  144.  
  145.         columns := strings.Split(scanner.Text(), "\t")
  146.         if columns == nil {
  147.             continue
  148.         }
  149.  
  150.         batchedLines[i] = columns
  151.  
  152.         if i >= stopIndexSize {
  153.             copyBatchedLines := make([][]string, batchSize)
  154.             copy(copyBatchedLines, batchedLines)
  155.  
  156.             jobs <- Job{from, k, copyBatchedLines}
  157.             i = 0
  158.             from = k
  159.             continue
  160.         }
  161.  
  162.         i++
  163.     }
  164.  
  165.     z := k % batchSize
  166.  
  167.     copyBatchedLines := make([][]string, z)
  168.     copy(copyBatchedLines, batchedLines)
  169.     jobs <- Job{from, k, copyBatchedLines}
  170.  
  171.     close(jobs)
  172. }
  173.  
  174. func result(done chan bool, startTime *time.Time) {
  175.     for result := range results {
  176.         go printStats(&result, startTime)
  177.     }
  178.     done <- true
  179. }
  180.  
  181. func printStats(result *Result, startTime *time.Time) {
  182.     var perSec int
  183.     elapsed := time.Since(*startTime)
  184.     d := int(elapsed) / 1000000000
  185.     if d > 0 {
  186.         perSec = int(result.job.till) / d
  187.     }
  188.     fmt.Printf("%d - %d = %v (%v /sec)\n", result.job.from, result.job.till, result.success, perSec)
  189. }
  190.  
  191. func main() {
  192.     var batchSize uint32 = 2000
  193.     var noOfWriters uint32 = 120
  194.     var file = "data/data.tsv"
  195.     dbConnStr = "root:test@tcp(127.0.0.1:3306)/imdb"
  196.  
  197.     argsLen := len(os.Args)
  198.     if argsLen >= 2 {
  199.         dbConnStr = string(os.Args[1])
  200.     }
  201.     if argsLen >= 3 {
  202.         file = string(os.Args[2])
  203.     }
  204.     if argsLen >= 4 {
  205.         if val, err := strconv.ParseUint(os.Args[3], 0, 0); err == nil {
  206.             batchSize = uint32(val)
  207.         }
  208.     }
  209.     if argsLen >= 5 {
  210.         if val, err := strconv.ParseUint(os.Args[4], 0, 0); err == nil {
  211.             noOfWriters = uint32(val)
  212.         }
  213.     }
  214.  
  215.     var noOfWorkers = noOfWriters + uint32(noOfWriters/10)
  216.  
  217.     var sleep time.Duration = 3
  218.     fmt.Printf("Processing with Batch size = %v Writers = %v Workers = %v in %d sec..\n", batchSize, noOfWriters, noOfWorkers, sleep)
  219.  
  220.     db := getDb()
  221.     createDBTable(db)
  222.     db.Close()
  223.  
  224.     time.Sleep(sleep * time.Second)
  225.  
  226.     // start calculation after table is created
  227.     startTime := time.Now()
  228.  
  229.     jobs = make(chan Job, noOfWorkers)
  230.     results = make(chan Result, noOfWriters)
  231.  
  232.     scanner := getScanner(file)
  233.     go allocate(scanner, batchSize)
  234.  
  235.     done := make(chan bool)
  236.     go result(done, &startTime)
  237.  
  238.     createWorkerPool(noOfWriters)
  239.     <-done
  240.  
  241.     elapsed := time.Since(startTime)
  242.     log.Printf("TIME %s", elapsed)
  243. }
  244.  
  245. func getScanner(filename string) *bufio.Scanner {
  246.     file, err := os.Open(filename)
  247.     if err != nil {
  248.         log.Fatal(err)
  249.     }
  250.  
  251.     return bufio.NewScanner(file)
  252. }
  253.  
  254. func createDBTable(db *sql.DB) {
  255.     tableCreate := "CREATE TABLE IF NOT EXISTS `imdb`.`titles` (" +
  256.         " `id` VARCHAR(25) NOT NULL," +
  257.         " `titleType` VARCHAR(25) NULL," +
  258.         " `primaryTitle` VARCHAR(511) NULL," +
  259.         " `originalTitle` VARCHAR(511) NULL," +
  260.         " `isAdult` INT(1) NULL," +
  261.         " `startYear` INT(4) NULL," +
  262.         " `endYear` INT(4) NULL," +
  263.         " `runtimeMinutes` INT(5) NULL," +
  264.         " `genres` VARCHAR(255) NULL," +
  265.         " PRIMARY KEY (`id`)) ENGINE=InnoDB ROW_FORMAT=DYNAMIC;"
  266.  
  267.     var sqls = []string{
  268.         "CREATE DATABASE IF NOT EXISTS `imdb`;",
  269.         "DROP TABLE IF EXISTS `imdb`.`titles`;",
  270.         tableCreate,
  271.     }
  272.  
  273.     execQuery(db, sqls)
  274. }
  275.  
  276. func execQuery(db *sql.DB, sqls []string) {
  277.     db.SetMaxOpenConns(len(sqls))
  278.  
  279.     for _, sql := range sqls {
  280.         _, errDrop := db.Query(sql)
  281.         if errDrop != nil {
  282.             log.Fatal(errDrop)
  283.         }
  284.     }
  285. }
  286.  
Add Comment
Please, Sign In to add comment