Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import akka.stream.scaladsl._
- import akka.actor.ActorSystem
- import akka.NotUsed
- import akka.stream._
- import akka.util.ByteString
- import java.nio.file.Paths
- import scala.concurrent.Future
- object DescriptiveStats extends App {
- case class PebbleData(
- time: Double,
- mainAvgCurrent: Double,
- mainAvgPower: Double,
- mainAvgVoltage: Double)
- def readFile(file: String) = FileIO.fromPath(Paths.get(file)).
- via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 8192, allowTruncation = true)).
- map(_.utf8String)
- def marshall = Flow[String].
- map(_.split(",")).
- map(s => PebbleData(s.head.toDouble, s(1).toDouble, s(2).toDouble, s(3).toDouble))
- def mapBatch[T](batchSize: Int)(f: Seq[PebbleData] => Future[T]) =
- Flow[PebbleData].
- grouped(batchSize).
- mapAsync(4)(f)
- def mean(xs: Seq[Double]) = xs.sum / xs.size
- implicit val system = ActorSystem("Descriptivestats")
- implicit val materializer = ActorMaterializer()
- implicit val executionContext = system.dispatcher
- readFile("170622_PebbleSteel_firstTestRun.csv").
- drop(1).
- via(marshall).
- via(mapBatch(1000)((p: Seq[PebbleData]) =>
- Future.successful(PebbleData(
- p.head.time,
- mean(p.map(_.mainAvgCurrent)),
- mean(p.map(_.mainAvgPower)),
- mean(p.map(_.mainAvgVoltage))
- ))
- )).
- runWith(Sink.seq).
- onComplete { s =>
- println(s)
- system.terminate()
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement