Advertisement
Guest User

Untitled

a guest
Jul 12th, 2019
93
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.65 KB | None | 0 0
  1. /**
  2. * Method generator parses sample data from a json stream.
  3. * @param sampleJsonUrl String is the URL to the Azure Blob Storage sample data.
  4. * @return Flowable<Sample> is a cold, synchronous, stateful and backpressure-aware
  5. * generator of features.
  6. */
  7. fun generator(sampleJsonUrl: String) =
  8. blobStorage.downloadBlob(sampleJsonUrl)
  9. .map { bbuf: ByteBuffer -> JsonFactory().createParser(bbuf.array()) }
  10. .map { jParser ->
  11. Flowable.generate<Sample, JsonParser>(
  12. java.util.concurrent.Callable { jParser.gobbleJsonToSamples() },
  13.  
  14. io.reactivex.functions.BiConsumer<JsonParser, Emitter<Sample>> {
  15. parser: JsonParser, emitter: Emitter<Sample> ->
  16. pullOrComplete(parser, emitter)
  17. },
  18.  
  19. Consumer<JsonParser> { jParser.close() }
  20. )
  21. }.flatMapPublisher { it }
  22.  
  23.  
  24. /**
  25. * Method downloadBlob will retrieve a blob from storage and return a
  26. * reactive stream of bytes.
  27. * @param url String is the url to the blob.
  28. * @return Single<ByteBuffer> is the blob's content.
  29. */
  30. fun downloadBlob(url: String): Single<ByteBuffer> =
  31.  
  32. BlockBlobURL(URL(url), pipeline)
  33. .download(null, null, false, null)
  34. .flatMap {
  35. FlowableUtil
  36. .collectBytesInBuffer(
  37. it.body(ReliableDownloadOptions().withMaxRetryRequests(3))
  38. )
  39. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement