Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package main
- //Download and unpack data file, start db and run importer.
- //Play around with numbers to get max performance out of your system.
- //Code is using goroutines quite heavily to get min import time.
- //
- //```bash
- //wget https://datasets.imdbws.com/title.basics.tsv.gz
- //unarchive somewhere (to ./data/)
- //service mysql start
- //
- //# run directly binary
- //bin/imdb-andrejs "root:test@tcp(127.0.0.1:3306)/imdb" .\data\data.tsv 3000 120
- //# or from source code
- //go run .\main.go "root:test@tcp(127.0.0.1:3306)/imdb" .\data\data.tsv 3000 120
- //
- //# where
- //# 3000 = batch size
- //# 120 = writer count (db connections)
- //```
- import (
- "bufio"
- "context"
- "database/sql"
- "fmt"
- _ "github.com/go-sql-driver/mysql"
- "log"
- "os"
- "strconv"
- "strings"
- "sync"
- "time"
- )
- type Job struct {
- from uint32
- till uint32
- lines [][]string
- }
- type Result struct {
- job Job
- success bool
- }
- var jobs chan Job
- var results chan Result
- const insertQuery = "INSERT INTO titles (`id`, `titleType`, `primaryTitle`, `originalTitle`, `isAdult`, `startYear`, `endYear`, `runtimeMinutes`, `genres`) VALUES "
- const queryVals = "(?, ?, ?, ?, ?, ?, ?, ?, ?)"
- var dbConnStr string
- func getDb() *sql.DB {
- db, dbErr := sql.Open("mysql", dbConnStr)
- if dbErr != nil {
- log.Fatal(dbErr)
- return nil
- }
- return db
- }
- func processLines(lines [][]string, ctx context.Context, db *sql.DB) bool {
- var inserts []string
- var params []interface{}
- var startYear, endYear, runtimeMinutes int
- for _, columns := range lines {
- inserts = append(inserts, queryVals)
- startYear, _ = strconv.Atoi(columns[5])
- endYear, _ = strconv.Atoi(columns[6])
- runtimeMinutes, _ = strconv.Atoi(columns[7])
- params = append(params, columns[0], columns[1], columns[2], columns[3], columns[4], startYear, endYear, runtimeMinutes, columns[8])
- }
- var query = insertQuery + strings.Join(inserts, ",")
- stmt, err := db.PrepareContext(ctx, query)
- if err != nil {
- log.Printf("#1Error %s", err)
- log.Fatal(err)
- }
- res, err := stmt.ExecContext(ctx, params...)
- if err != nil {
- log.Printf("#2Error %s", err)
- return false
- }
- stmt.Close()
- _, errAffected := res.RowsAffected()
- if errAffected != nil {
- log.Printf("Error %s when finding rows affected", errAffected)
- log.Fatal(errAffected)
- return false
- }
- return true
- }
- func worker(wg *sync.WaitGroup, db *sql.DB) {
- ctx := context.Background()
- for job := range jobs {
- output := Result{job, processLines(job.lines, ctx, db)}
- results <- output
- }
- wg.Done()
- }
- func createWorkerPool(noOfWorkers uint32) {
- var wg sync.WaitGroup
- var i uint32
- db := getDb()
- defer db.Close()
- for i = 0; i < noOfWorkers; i++ {
- wg.Add(1)
- go worker(&wg, db)
- }
- wg.Wait()
- close(results)
- }
- func allocate(scanner *bufio.Scanner, batchSize uint32) {
- var from, k uint32
- var header = true
- var stopIndexSize, i uint32
- batchedLines := make([][]string, batchSize)
- stopIndexSize = batchSize - 1
- for scanner.Scan() {
- if header {
- header = false
- continue
- }
- k++
- columns := strings.Split(scanner.Text(), "\t")
- if columns == nil {
- continue
- }
- batchedLines[i] = columns
- if i >= stopIndexSize {
- copyBatchedLines := make([][]string, batchSize)
- copy(copyBatchedLines, batchedLines)
- jobs <- Job{from, k, copyBatchedLines}
- i = 0
- from = k
- continue
- }
- i++
- }
- z := k % batchSize
- copyBatchedLines := make([][]string, z)
- copy(copyBatchedLines, batchedLines)
- jobs <- Job{from, k, copyBatchedLines}
- close(jobs)
- }
- func result(done chan bool, startTime *time.Time) {
- for result := range results {
- go printStats(&result, startTime)
- }
- done <- true
- }
- func printStats(result *Result, startTime *time.Time) {
- var perSec int
- elapsed := time.Since(*startTime)
- d := int(elapsed) / 1000000000
- if d > 0 {
- perSec = int(result.job.till) / d
- }
- fmt.Printf("%d - %d = %v (%v /sec)\n", result.job.from, result.job.till, result.success, perSec)
- }
- func main() {
- var batchSize uint32 = 2000
- var noOfWriters uint32 = 120
- var file = "data/data.tsv"
- dbConnStr = "root:test@tcp(127.0.0.1:3306)/imdb"
- argsLen := len(os.Args)
- if argsLen >= 2 {
- dbConnStr = string(os.Args[1])
- }
- if argsLen >= 3 {
- file = string(os.Args[2])
- }
- if argsLen >= 4 {
- if val, err := strconv.ParseUint(os.Args[3], 0, 0); err == nil {
- batchSize = uint32(val)
- }
- }
- if argsLen >= 5 {
- if val, err := strconv.ParseUint(os.Args[4], 0, 0); err == nil {
- noOfWriters = uint32(val)
- }
- }
- var noOfWorkers = noOfWriters + uint32(noOfWriters/10)
- var sleep time.Duration = 3
- fmt.Printf("Processing with Batch size = %v Writers = %v Workers = %v in %d sec..\n", batchSize, noOfWriters, noOfWorkers, sleep)
- db := getDb()
- createDBTable(db)
- db.Close()
- time.Sleep(sleep * time.Second)
- // start calculation after table is created
- startTime := time.Now()
- jobs = make(chan Job, noOfWorkers)
- results = make(chan Result, noOfWriters)
- scanner := getScanner(file)
- go allocate(scanner, batchSize)
- done := make(chan bool)
- go result(done, &startTime)
- createWorkerPool(noOfWriters)
- <-done
- elapsed := time.Since(startTime)
- log.Printf("TIME %s", elapsed)
- }
- func getScanner(filename string) *bufio.Scanner {
- file, err := os.Open(filename)
- if err != nil {
- log.Fatal(err)
- }
- return bufio.NewScanner(file)
- }
- func createDBTable(db *sql.DB) {
- tableCreate := "CREATE TABLE IF NOT EXISTS `imdb`.`titles` (" +
- " `id` VARCHAR(25) NOT NULL," +
- " `titleType` VARCHAR(25) NULL," +
- " `primaryTitle` VARCHAR(511) NULL," +
- " `originalTitle` VARCHAR(511) NULL," +
- " `isAdult` INT(1) NULL," +
- " `startYear` INT(4) NULL," +
- " `endYear` INT(4) NULL," +
- " `runtimeMinutes` INT(5) NULL," +
- " `genres` VARCHAR(255) NULL," +
- " PRIMARY KEY (`id`)) ENGINE=InnoDB ROW_FORMAT=DYNAMIC;"
- var sqls = []string{
- "CREATE DATABASE IF NOT EXISTS `imdb`;",
- "DROP TABLE IF EXISTS `imdb`.`titles`;",
- tableCreate,
- }
- execQuery(db, sqls)
- }
- func execQuery(db *sql.DB, sqls []string) {
- db.SetMaxOpenConns(len(sqls))
- for _, sql := range sqls {
- _, errDrop := db.Query(sql)
- if errDrop != nil {
- log.Fatal(errDrop)
- }
- }
- }
Add Comment
Please, Sign In to add comment