Guest User

Untitled

a guest
Apr 23rd, 2018
72
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.48 KB | None | 0 0
  1. package sb
  2. import java.nio.file.Paths
  3.  
  4. import monix.execution.Ack
  5. import monix.execution.Scheduler.Implicits.global
  6. import monix.nio.text.UTF8Codec.utf8Decode
  7. import monix.reactive.observers.Subscriber
  8. import monix.reactive.{Observable, Observer, OverflowStrategy}
  9.  
  10. import scala.concurrent.Await
  11. import scala.concurrent.duration.Duration
  12. object ParseLines {
  13. case class LinesObserver(sub: Subscriber.Sync[String]) extends Observer.Sync[String] {
  14. val bldr: StringBuilder = StringBuilder.newBuilder
  15. override def onNext(elem: String): Ack = {
  16. def go(i: Int): Ack = elem.indexOf("\n", i) match {
  17. case -1 =>
  18. bldr.append(elem.substring(i))
  19. Ack.Continue
  20. case j =>
  21. bldr.append(elem.substring(i, j))
  22. sub.onNext(bldr.result()) match {
  23. case Ack.Stop => Ack.Stop
  24. case Ack.Continue =>
  25. bldr.clear()
  26. go(j + 1)
  27. }
  28. }
  29. go(0)
  30. }
  31. override def onError(ex: Throwable): Unit = sub.onError(ex)
  32. override def onComplete(): Unit = {
  33. sub.onNext(bldr.result())
  34. sub.onComplete()
  35. }
  36. }
  37.  
  38. def lines(bufferSize: Int = 100)(ls: Observable[String]): Observable[String] =
  39. Observable.create(OverflowStrategy.Fail(bufferSize)) { sub => ls.subscribe(LinesObserver(sub)) }
  40.  
  41. def main(args: Array[String]): Unit = {
  42. Await.result(
  43. lines(1000)(monix.nio.file.readAsync(Paths.get("lol.csv"), 5)
  44. .pipeThrough(utf8Decode)
  45. ).foreach(println), Duration.Inf)
  46. }
  47. }
Add Comment
Please, Sign In to add comment