Advertisement
Guest User

Untitled

a guest
Jun 23rd, 2017
51
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.41 KB | None | 0 0
  1. import akka.stream.scaladsl._
  2. import akka.actor.ActorSystem
  3. import akka.NotUsed
  4. import akka.stream._
  5. import akka.util.ByteString
  6. import java.nio.file.Paths
  7. import scala.concurrent.Future
  8.  
  9. object DescriptiveStats extends App {
  10.  
  11. case class PebbleData(
  12. time: Double,
  13. mainAvgCurrent: Double,
  14. mainAvgPower: Double,
  15. mainAvgVoltage: Double)
  16.  
  17. def readFile(file: String) = FileIO.fromPath(Paths.get(file)).
  18. via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 8192, allowTruncation = true)).
  19. map(_.utf8String)
  20.  
  21. def marshall = Flow[String].
  22. map(_.split(",")).
  23. map(s => PebbleData(s.head.toDouble, s(1).toDouble, s(2).toDouble, s(3).toDouble))
  24.  
  25. def mapBatch[T](batchSize: Int)(f: Seq[PebbleData] => Future[T]) =
  26. Flow[PebbleData].
  27. grouped(batchSize).
  28. mapAsync(4)(f)
  29.  
  30. def mean(xs: Seq[Double]) = xs.sum / xs.size
  31.  
  32. implicit val system = ActorSystem("Descriptivestats")
  33. implicit val materializer = ActorMaterializer()
  34. implicit val executionContext = system.dispatcher
  35.  
  36. readFile("170622_PebbleSteel_firstTestRun.csv").
  37. drop(1).
  38. via(marshall).
  39. via(mapBatch(1000)((p: Seq[PebbleData]) =>
  40. Future.successful(PebbleData(
  41. p.head.time,
  42. mean(p.map(_.mainAvgCurrent)),
  43. mean(p.map(_.mainAvgPower)),
  44. mean(p.map(_.mainAvgVoltage))
  45. ))
  46. )).
  47. runWith(Sink.seq).
  48. onComplete { s =>
  49. println(s)
  50. system.terminate()
  51. }
  52. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement