Advertisement
MARSHAL327

BDOT 4/5

Dec 9th, 2023
1,494
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 6.46 KB | None | 0 0
  1. package sevsu.spark
  2.  
  3. import java.nio.file.Paths
  4. import org.apache.spark.{RangePartitioner, SparkConf, SparkContext}
  5. import org.apache.spark.rdd.RDD
  6. import sevsu.spark.Application.time
  7.  
  8. object Application {
  9.     private val conf: SparkConf = new SparkConf()
  10.         .setMaster("local[*]")
  11.         .setAppName("spark_example")
  12.         .set("spark.ui.showConsoleProgress", "false")
  13.  
  14.     private val sc: SparkContext = getSparkContext(conf)
  15.  
  16.     private val resourcesRoot: String = this.getClass.getResource("/").toString
  17.     private val personPath: String = resourcesRoot + "person.csv"
  18.     private val apartmentPath: String = resourcesRoot + "apartment.csv"
  19.  
  20.     case class Person(id: Int, name: String)
  21.  
  22.     case class Apartment(id_apartment: Int, id_human: Int, num_rooms: Int, address: String)
  23.  
  24.     def time[R](block: => R): R = {
  25.         val t0 = System.nanoTime()
  26.         val result = block // call-by-name
  27.         val t1 = System.nanoTime()
  28.         val resultTime = BigDecimal((t1 - t0) / Math.pow(10, 9))
  29.             .setScale(4, BigDecimal.RoundingMode.HALF_UP)
  30.         println("Время: " + resultTime + " сек.")
  31.         result
  32.     }
  33.  
  34.     def lab4(apartments: RDD[Apartment], persons: RDD[Person], printResults: Boolean = true): Unit = {
  35.         println("========== lab4 ==========")
  36.         // ====== Task 1 ======
  37.         val personCounter: RDD[(Int, Int)] = apartments.map(item => (item.id_human, 1))
  38.         val numPersonApartments =
  39.             personCounter
  40.                 .reduceByKey((a, b) => a + b)
  41.                 .persist()
  42.         val numApartmentsPerson =
  43.             numPersonApartments
  44.                 .map(_.swap)
  45.                 .groupByKey()
  46.                 .sortByKey()
  47.  
  48.         if (printResults) {
  49.             println(numApartmentsPerson.collect().mkString("\n"))
  50.         }
  51.  
  52.         print("1 задание - ")
  53.         time(numApartmentsPerson.collect())
  54.  
  55.  
  56.         // ====== Task 2 ======
  57.         val personPairRDD = persons.map(item => (item.id, item.name))
  58.         val joined = personPairRDD join numPersonApartments
  59.         val numPersonNameApartments = joined.map(item => item._2)
  60.  
  61.         if (printResults) {
  62.             println(numPersonNameApartments.collect().mkString("\n"))
  63.         }
  64.  
  65.         print("2 задание - ")
  66.         time(numPersonNameApartments.collect())
  67.  
  68.  
  69.         // ====== Task 3 ======
  70.         val personApartmentsAddress = apartments
  71.             .filter(_.num_rooms > 2)
  72.             .map(item =>
  73.                 (item.id_human, item.address)
  74.             )
  75.         val personNameAddressWithId = personApartmentsAddress join personPairRDD
  76.         val personNameAddress = personNameAddressWithId.map(item => item._2)
  77.  
  78.         if (printResults) {
  79.             println(personNameAddress.collect().mkString("\n"))
  80.         }
  81.  
  82.         print("3 задание - ")
  83.         time(personNameAddress.collect())
  84.     }
  85.  
  86.     def lab5(apartments: RDD[Apartment], persons: RDD[Person], printResults: Boolean = true): Unit = {
  87.         println("\n========== lab5 ==========")
  88.  
  89.         // ====== Task 1 ======
  90.         val pairApartments = apartments.map(apartment => (apartment.id_human, 1))
  91.  
  92.         val partitioner = new RangePartitioner(4, pairApartments)
  93.         val partitionedPairApartments = pairApartments.partitionBy(partitioner).persist()
  94.  
  95.         val numPersonApartments =
  96.             partitionedPairApartments
  97.                 .reduceByKey((a, b) => a + b)
  98.                 .persist()
  99.         val numApartmentsPerson =
  100.             numPersonApartments
  101.                 .map(_.swap)
  102.                 .groupByKey()
  103.                 .sortByKey()
  104.  
  105.         if (printResults) {
  106.             println(numApartmentsPerson.collect().mkString("\n"))
  107.         }
  108.  
  109.         print("1 задание - ")
  110.         time(numApartmentsPerson.collect())
  111.  
  112.  
  113.         // ====== Task 2 ======
  114.         val personPairRDD = persons.map(item => (item.id, item.name))
  115.  
  116.         val partitionerPersonPairRDD = new RangePartitioner(4, personPairRDD)
  117.         val partitionedPersonPairRDD = personPairRDD.partitionBy(partitionerPersonPairRDD).persist()
  118.  
  119.         val joined: RDD[(Int, (String, Int))] = partitionedPersonPairRDD join numPersonApartments
  120.         val numPersonNameApartments = joined.mapValues(item => item._1)
  121.  
  122.         if (printResults) {
  123.             println(numPersonNameApartments.collect().mkString("\n"))
  124.         }
  125.  
  126.         print("2 задание - ")
  127.         time(numPersonNameApartments.collect())
  128.  
  129.  
  130.         // ====== Task 3 ======
  131.         val personApartmentsAddress = apartments
  132.             .filter(_.num_rooms > 2)
  133.             .map(item =>
  134.                 (item.id_human, item.address)
  135.             )
  136.  
  137.         val partitionerPersonApartmentsAddress = new RangePartitioner(4, personApartmentsAddress)
  138.         val partitionedPersonApartmentsAddress = personApartmentsAddress.partitionBy(partitionerPersonApartmentsAddress).persist()
  139.  
  140.         val personNameAddressWithId = partitionedPersonApartmentsAddress join personPairRDD
  141.         val personNameAddress = personNameAddressWithId.mapValues(item => item._1)
  142.  
  143.         if (printResults) {
  144.             println(personNameAddress.collect().mkString("\n"))
  145.         }
  146.  
  147.         print("3 задание - ")
  148.         time(personNameAddress.collect())
  149.     }
  150.  
  151.     def main(args: Array[String]): Unit = {
  152.         val rawPersonRdd: RDD[String] = sc.textFile(personPath)
  153.         val rawApartmentRdd: RDD[String] = sc.textFile(apartmentPath)
  154.         val persons: RDD[Person] = rawPersonRdd.map(strPerson => {
  155.             strPerson.split(",").map(_.trim) match {
  156.                 case Array(id, name) => Person(id.toInt, name)
  157.             }
  158.         })
  159.         val apartments: RDD[Apartment] = rawApartmentRdd.map(strPerson => {
  160.             strPerson.split(",").map(_.trim) match {
  161.                 case Array(id_apartment, id_human, num_rooms, address) => Apartment(id_apartment.toInt, id_human.toInt, num_rooms.toInt, address)
  162.             }
  163.         })
  164.  
  165.         time(lab4(apartments, persons, printResults = false))
  166.         time(lab5(apartments, persons, printResults = false))
  167.  
  168.         sc.stop()
  169.     }
  170.  
  171.     private def getSparkContext(conf: SparkConf): SparkContext = {
  172.         if (System.getProperty("os.name").toLowerCase.contains("windows")) {
  173.             System.setProperty(
  174.                 "hadoop.home.dir",
  175.                 Paths.get(this.getClass.getResource("/winutils/hadoop-2.7.1/").toURI).toString
  176.             )
  177.         }
  178.  
  179.         new SparkContext(conf)
  180.     }
  181. }
  182.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement