Advertisement
Guest User

Untitled

a guest
Mar 20th, 2019
100
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.29 KB | None | 0 0
  1. import scalaz.zio.console._
  2. import scalaz.zio.stream._
  3. import scalaz.zio.{Chunk, ZIO}
  4.  
  5.  
  6. object Framing {
  7.  
  8. def lfDelimiter = Chunk('\n')
  9.  
  10. def crLfDelimiter = Chunk('\r', '\n')
  11.  
  12. def delimited[@specialized A](maxSize: Int, delimiter: Chunk[A]): Sink[Any, Nothing, Chunk[A], Chunk[A], Chunk[A]] = {
  13. Sink.fold(Chunk.empty: Chunk[A]) { (buffer, in) =>
  14. val searchBuffer = buffer ++ in
  15. findDelimiter(delimiter)(searchBuffer)
  16. .map {
  17. case (found, remaining) =>
  18. Sink.Step.done(found, Chunk.succeed(remaining))
  19. }
  20. .getOrElse {
  21. if (buffer.length + in.length > maxSize) {
  22. Sink.Step.done(searchBuffer, Chunk.succeed(Chunk.empty))
  23. } else {
  24. Sink.Step.more(searchBuffer)
  25. }
  26. }
  27. }
  28. }
  29.  
  30. private def findDelimiter[@specialized A](delim: Chunk[A])(chunk: Chunk[A]): Option[(Chunk[A], Chunk[A])] = {
  31. val length = delim.length
  32.  
  33. @scala.annotation.tailrec
  34. def help(pos: Int): Option[(Chunk[A], Chunk[A])] = {
  35. val compare = chunk.drop(pos).take(length)
  36. if (compare.length < length) {
  37. None
  38. } else if (compare == delim) {
  39. val (matched, remaining) = chunk.splitAt(pos)
  40. Some((matched, remaining.drop(delim.length)))
  41. } else {
  42. help(pos + 1)
  43. }
  44. }
  45.  
  46. help(0)
  47. }
  48.  
  49. def groupSink[A](groupSize: Int): Sink[Any, Nothing, A, A, Chunk[A]] = {
  50. Sink.fold(Chunk.empty: Chunk[A]) { (group, a) =>
  51. val next = group ++ Chunk.succeed(a)
  52. if (next.length == groupSize) {
  53. Sink.Step.done(next, Chunk.empty)
  54. } else {
  55. Sink.Step.more(next)
  56. }
  57. }
  58. }
  59.  
  60. }
  61.  
  62. object Test extends scalaz.zio.App {
  63.  
  64. def printChunk(c: Chunk[_]): String = c.mkString("[", ",", "]") + s" (${c.length})"
  65.  
  66. override def run(args: List[String]): ZIO[Test.Environment, Nothing, Int] = {
  67. val data =
  68. """line 1
  69. |line 2 ABCDEFGHIJKLMNO
  70. |line 3 ABCDEF
  71. |line 4 AB
  72. """.stripMargin
  73. val grouped = Stream.fromIterable(data).transduce(Framing.groupSink[Char](5))
  74. for {
  75. _ <- grouped.foreach(chunk => putStrLn(printChunk(chunk)))
  76. _ <- putStrLn("-----")
  77. _ <- grouped
  78. .transduce(Framing.delimited(150, Framing.lfDelimiter))
  79. .foreach(chunk => putStrLn(printChunk(chunk)))
  80. } yield {
  81. 0
  82. }
  83. }
  84.  
  85. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement