Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package sb
- import java.nio.file.Paths
- import monix.execution.Ack
- import monix.execution.Scheduler.Implicits.global
- import monix.nio.text.UTF8Codec.utf8Decode
- import monix.reactive.observers.Subscriber
- import monix.reactive.{Observable, Observer, OverflowStrategy}
- import scala.concurrent.Await
- import scala.concurrent.duration.Duration
- object ParseLines {
- case class LinesObserver(sub: Subscriber.Sync[String]) extends Observer.Sync[String] {
- val bldr: StringBuilder = StringBuilder.newBuilder
- override def onNext(elem: String): Ack = {
- def go(i: Int): Ack = elem.indexOf("\n", i) match {
- case -1 =>
- bldr.append(elem.substring(i))
- Ack.Continue
- case j =>
- bldr.append(elem.substring(i, j))
- sub.onNext(bldr.result()) match {
- case Ack.Stop => Ack.Stop
- case Ack.Continue =>
- bldr.clear()
- go(j + 1)
- }
- }
- go(0)
- }
- override def onError(ex: Throwable): Unit = sub.onError(ex)
- override def onComplete(): Unit = {
- sub.onNext(bldr.result())
- sub.onComplete()
- }
- }
- def lines(bufferSize: Int = 100)(ls: Observable[String]): Observable[String] =
- Observable.create(OverflowStrategy.Fail(bufferSize)) { sub => ls.subscribe(LinesObserver(sub)) }
- def main(args: Array[String]): Unit = {
- Await.result(
- lines(1000)(monix.nio.file.readAsync(Paths.get("lol.csv"), 5)
- .pipeThrough(utf8Decode)
- ).foreach(println), Duration.Inf)
- }
- }
Add Comment
Please, Sign In to add comment