Advertisement
MARSHAL327

BDOT LR5

Dec 2nd, 2023
1,008
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 6.43 KB | None | 0 0
  1. package sevsu.spark
  2.  
  3. import java.nio.file.Paths
  4. import org.apache.spark.{RangePartitioner, SparkConf, SparkContext}
  5. import org.apache.spark.rdd.RDD
  6. import sevsu.spark.Application.time
  7.  
  8. object Application {
  9.     private val conf: SparkConf = new SparkConf()
  10.         .setMaster("local[*]")
  11.         .setAppName("spark_example")
  12.         .set("spark.ui.showConsoleProgress", "false")
  13.  
  14.     private val sc: SparkContext = getSparkContext(conf)
  15.  
  16.     private val resourcesRoot: String = this.getClass.getResource("/").toString
  17.     private val personPath: String = resourcesRoot + "person.csv"
  18.     private val apartmentPath: String = resourcesRoot + "apartment.csv"
  19.  
  20.     case class Person(id: Int, name: String)
  21.  
  22.     case class Apartment(id_apartment: Int, id_human: Int, num_rooms: Int, address: String)
  23.  
  24.     def time[R](block: => R): R = {
  25.         val t0 = System.nanoTime()
  26.         val result = block // call-by-name
  27.         val t1 = System.nanoTime()
  28.         val resultTime = BigDecimal((t1 - t0) / Math.pow(10, 9))
  29.             .setScale(4, BigDecimal.RoundingMode.HALF_UP)
  30.         println("Время: " + resultTime + " сек.")
  31.         result
  32.     }
  33.  
  34.     def lab4(apartments: RDD[Apartment], persons: RDD[Person], printResults: Boolean = true): Unit = {
  35.         println("========== lab4 ==========")
  36.         // ====== Task 1 ======
  37.         val personCounter: RDD[(Int, Int)] = apartments.map(item => (item.id_human, 1))
  38.         val numPersonApartments =
  39.             personCounter
  40.                 .reduceByKey((a, b) => a + b)
  41.                 .persist()
  42.         val numApartmentsPerson =
  43.             numPersonApartments.map(_.swap)
  44.                 .groupByKey()
  45.                 .sortByKey()
  46.  
  47.         if (printResults) {
  48.             println(numApartmentsPerson.collect().mkString("\n"))
  49.         }
  50.  
  51.         print("1 задание - ")
  52.         time(numApartmentsPerson.collect())
  53.  
  54.  
  55.         // ====== Task 2 ======
  56.         val personPairRDD = persons.map(item => (item.id, item.name))
  57.         val joined = personPairRDD join numPersonApartments
  58.         val numPersonNameApartments = joined.map(item => item._2)
  59.  
  60.         if (printResults) {
  61.             println(numPersonNameApartments.collect().mkString("\n"))
  62.         }
  63.  
  64.         print("2 задание - ")
  65.         time(numPersonNameApartments.collect())
  66.  
  67.  
  68.         // ====== Task 3 ======
  69.         val personApartmentsAddress = apartments
  70.             .filter(_.num_rooms > 2)
  71.             .map(item =>
  72.                 (item.id_human, item.address)
  73.             )
  74.         val personNameAddressWithId = personApartmentsAddress join personPairRDD
  75.         val personNameAddress = personNameAddressWithId.map(item => item._2)
  76.  
  77.         if (printResults) {
  78.             println(personNameAddress.collect().mkString("\n"))
  79.         }
  80.  
  81.         print("3 задание - ")
  82.         time(personNameAddress.collect())
  83.     }
  84.  
  85.     def lab5(apartments: RDD[Apartment], persons: RDD[Person], printResults: Boolean = true): Unit = {
  86.         println("\n========== lab5 ==========")
  87.  
  88.         // ====== Task 1 ======
  89.         val pairApartments = apartments.map(apartment => (apartment.id_human, 1))
  90.  
  91.         val partitioner = new RangePartitioner(4, pairApartments)
  92.         val partitionedPairApartments = pairApartments.partitionBy(partitioner).persist()
  93.  
  94.         val numPersonApartments =
  95.             partitionedPairApartments
  96.                 .reduceByKey((a, b) => a + b)
  97.                 .persist()
  98.         val numApartmentsPerson =
  99.             numPersonApartments.map(_.swap)
  100.                 .groupByKey()
  101.                 .sortByKey()
  102.  
  103.         if (printResults) {
  104.             println(numApartmentsPerson.collect().mkString("\n"))
  105.         }
  106.  
  107.         print("1 задание - ")
  108.         time(numApartmentsPerson.collect())
  109.  
  110.  
  111.         // ====== Task 2 ======
  112.         val personPairRDD = persons.map(item => (item.id, item.name))
  113.  
  114.         val partitionerPersonPairRDD = new RangePartitioner(4, personPairRDD)
  115.         val partitionedPersonPairRDD = personPairRDD.partitionBy(partitionerPersonPairRDD).persist()
  116.  
  117.         val joined: RDD[(Int, (String, Int))] = partitionedPersonPairRDD join numPersonApartments
  118.         val numPersonNameApartments = joined.mapValues(item => item._1)
  119.  
  120.         if (printResults) {
  121.             println(numPersonNameApartments.collect().mkString("\n"))
  122.         }
  123.  
  124.         print("2 задание - ")
  125.         time(numPersonNameApartments.collect())
  126.  
  127.  
  128.         // ====== Task 3 ======
  129.         val personApartmentsAddress = apartments
  130.             .filter(_.num_rooms > 2)
  131.             .map(item =>
  132.                 (item.id_human, item.address)
  133.             )
  134.  
  135.         val partitionerPersonApartmentsAddress = new RangePartitioner(4, personApartmentsAddress)
  136.         val partitionedPersonApartmentsAddress = personApartmentsAddress.partitionBy(partitionerPersonApartmentsAddress).persist()
  137.  
  138.         val personNameAddressWithId = partitionedPersonApartmentsAddress join personPairRDD
  139.         val personNameAddress = personNameAddressWithId.mapValues(item => item._1)
  140.  
  141.         if (printResults) {
  142.             println(personNameAddress.collect().mkString("\n"))
  143.         }
  144.  
  145.         print("3 задание - ")
  146.         time(personNameAddress.collect())
  147.     }
  148.  
  149.     def main(args: Array[String]): Unit = {
  150.         val rawPersonRdd: RDD[String] = sc.textFile(personPath)
  151.         val rawApartmentRdd: RDD[String] = sc.textFile(apartmentPath)
  152.         val persons: RDD[Person] = rawPersonRdd.map(strPerson => {
  153.             strPerson.split(",").map(_.trim) match {
  154.                 case Array(id, name) => Person(id.toInt, name)
  155.             }
  156.         })
  157.         val apartments: RDD[Apartment] = rawApartmentRdd.map(strPerson => {
  158.             strPerson.split(",").map(_.trim) match {
  159.                 case Array(id_apartment, id_human, num_rooms, address) => Apartment(id_apartment.toInt, id_human.toInt, num_rooms.toInt, address)
  160.             }
  161.         })
  162.        
  163.         time(lab4(apartments, persons, printResults = false))
  164.         time(lab5(apartments, persons, printResults = false))
  165.  
  166.         sc.stop()
  167.     }
  168.  
  169.     private def getSparkContext(conf: SparkConf): SparkContext = {
  170.         if (System.getProperty("os.name").toLowerCase.contains("windows")) {
  171.             System.setProperty(
  172.                 "hadoop.home.dir",
  173.                 Paths.get(this.getClass.getResource("/winutils/hadoop-2.7.1/").toURI).toString
  174.             )
  175.         }
  176.  
  177.         new SparkContext(conf)
  178.     }
  179. }
  180.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement