Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import scalaz.zio.console._
- import scalaz.zio.stream._
- import scalaz.zio.{Chunk, ZIO}
- object Framing {
- def lfDelimiter = Chunk('\n')
- def crLfDelimiter = Chunk('\r', '\n')
- def delimited[@specialized A](maxSize: Int, delimiter: Chunk[A]): Sink[Any, Nothing, Chunk[A], Chunk[A], Chunk[A]] = {
- Sink.fold(Chunk.empty: Chunk[A]) { (buffer, in) =>
- val searchBuffer = buffer ++ in
- findDelimiter(delimiter)(searchBuffer)
- .map {
- case (found, remaining) =>
- Sink.Step.done(found, Chunk.succeed(remaining))
- }
- .getOrElse {
- if (buffer.length + in.length > maxSize) {
- Sink.Step.done(searchBuffer, Chunk.succeed(Chunk.empty))
- } else {
- Sink.Step.more(searchBuffer)
- }
- }
- }
- }
- private def findDelimiter[@specialized A](delim: Chunk[A])(chunk: Chunk[A]): Option[(Chunk[A], Chunk[A])] = {
- val length = delim.length
- @scala.annotation.tailrec
- def help(pos: Int): Option[(Chunk[A], Chunk[A])] = {
- val compare = chunk.drop(pos).take(length)
- if (compare.length < length) {
- None
- } else if (compare == delim) {
- val (matched, remaining) = chunk.splitAt(pos)
- Some((matched, remaining.drop(delim.length)))
- } else {
- help(pos + 1)
- }
- }
- help(0)
- }
- def groupSink[A](groupSize: Int): Sink[Any, Nothing, A, A, Chunk[A]] = {
- Sink.fold(Chunk.empty: Chunk[A]) { (group, a) =>
- val next = group ++ Chunk.succeed(a)
- if (next.length == groupSize) {
- Sink.Step.done(next, Chunk.empty)
- } else {
- Sink.Step.more(next)
- }
- }
- }
- }
- object Test extends scalaz.zio.App {
- def printChunk(c: Chunk[_]): String = c.mkString("[", ",", "]") + s" (${c.length})"
- override def run(args: List[String]): ZIO[Test.Environment, Nothing, Int] = {
- val data =
- """line 1
- |line 2 ABCDEFGHIJKLMNO
- |line 3 ABCDEF
- |line 4 AB
- """.stripMargin
- val grouped = Stream.fromIterable(data).transduce(Framing.groupSink[Char](5))
- for {
- _ <- grouped.foreach(chunk => putStrLn(printChunk(chunk)))
- _ <- putStrLn("-----")
- _ <- grouped
- .transduce(Framing.delimited(150, Framing.lfDelimiter))
- .foreach(chunk => putStrLn(printChunk(chunk)))
- } yield {
- 0
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement