Advertisement
Guest User

Untitled

a guest
Dec 12th, 2016
109
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.71 KB | None | 0 0
  1. package by.iba
  2. import org.apache.spark.SparkContext
  3. import org.apache.spark.SparkContext._
  4. import org.apache.spark.SparkConf
  5. import java.sql.DriverManager
  6. import java.sql.Connection
  7. import java.sql.SQLException;
  8.  
  9. object Main {
  10. def main(args: Array[String]) {
  11. if(args.length<3){
  12. println("Not enough args")
  13. System.exit(1)
  14. }
  15. val driver = "com.mysql.jdbc.Driver"
  16. val url = "jdbc:mysql://172.20.140.245/db_stat"
  17. val username = "db_stat"
  18. val password = "pass4db!"
  19. var connection:Connection = null
  20. val conf = new SparkConf().setAppName("WordCounter").setMaster("yarn-client")
  21. val sc = new SparkContext(conf)
  22. val text_file = sc.textFile(args(0))
  23. val text_names = sc.textFile(args(2))
  24. val names = text_names.flatMap(line => line.split(",") ).collect()
  25. val lines = text_file.flatMap( line => line.split(" ") )
  26. val counts = lines.filter(line => names.contains(line)).map(word => (word, 1)).reduceByKey(_ + _)
  27. val summary = counts.collect()
  28. counts.saveAsTextFile(args(1))
  29. try {
  30. // make the connection
  31. Class.forName(driver)
  32. connection = DriverManager.getConnection(url, username, password)
  33.  
  34. // create the statement, and run the select query
  35. val sql = "INSERT INTO TEST(test_name,count)VALUES(?,?);"
  36. val statement = connection.prepareStatement(sql)
  37. def ins(a:(String, Int)){
  38. statement.setString(1,a._1 )
  39. statement.setInt(2,a._2 )
  40. val resultSet = statement.executeUpdate()
  41. }
  42. summary.foreach { x=>ins(x) }
  43. } catch {
  44. case ex => println("SQLException: " + ex.getMessage())
  45.  
  46. }
  47. connection.close()
  48. }
  49.  
  50. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement