Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package test
- import org.apache.spark.SparkContext
- import org.apache.spark.SparkContext._
- import org.apache.spark.SparkConf
- import java.sql.DriverManager
- import java.sql.Connection
- import java.sql.SQLException;
- object Main {
- def main(args: Array[String]) {
- if(args.length<3){
- println("Not enough args")
- System.exit(1)
- }
- val driver = "com.mysql.jdbc.Driver"
- val url = "jdbc:mysql://172.20.140.245/db_stat"
- val username = "db_stat"
- val password = "pass4db!"
- var connection:Connection = null
- //val conf = new SparkConf().setAppName("WordCounter").setMaster("yarn-client")
- //val sc = new SparkContext(conf)
- //val text_file = sc.textFile(args(0))
- //val text_names = sc.textFile(args(2))
- //val names = text_names.flatMap(line => line.split(",") ).collect()
- //val lines = text_file.flatMap( line => line.split(" ") )
- //val counts = lines.filter(line => names.contains(line)).map(word => (word, 1)).reduceByKey(_ + _)
- //val summary = counts.collect()
- //counts.saveAsTextFile(args(1))
- try {
- // make the connection
- Class.forName(driver)
- connection = DriverManager.getConnection(url, username, password)
- // create the statement, and run the select query
- val statement = connection.createStatement()
- val sql = "SELECT * FROM DIM_key_brand where keybrand_name_status = 'Y';"
- val resultSet = statement.executeQuery(sql)
- val mapWithBrands = scala.collection.mutable.Map[String,Int]()
- while ( resultSet.next() ) {
- val keybrand_name = resultSet.getString("keybrand_name")
- val keybrand_id = resultSet.getInt("keybrand_id")
- mapWithBrands(keybrand_name) = keybrand_id
- println("brand = " + keybrand_name + " " + keybrand_id)
- }
- val names = mapWithBrands.keys
- //TODO: insert code
- // create the statement, and run the select query
- val sqlInsert = "INSERT INTO FACT_brand_mentioned VALUES(?,?)"
- val statementInsert = connection.prepareStatement(sqlInsert)
- def ins(a:(String, Int)){
- statementInsert.setInt(1, a._2 )
- statementInsert.setInt(2,mapWithBrands(a._1) )
- val resultSet = statementInsert.executeUpdate()
- }
- //summary.foreach { x=>ins(x) }
- } catch {
- case ex => println("SQLException: " + ex.getMessage())
- }
- connection.close()
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement