Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /**
- * Method generator parses sample data from a json stream.
- * @param sampleJsonUrl String is the URL to the Azure Blob Storage sample data.
- * @return Flowable<Sample> is a cold, synchronous, stateful and backpressure-aware
- * generator of features.
- */
- fun generator(sampleJsonUrl: String) =
- blobStorage.downloadBlob(sampleJsonUrl)
- .map { bbuf: ByteBuffer -> JsonFactory().createParser(bbuf.array()) }
- .map { jParser ->
- Flowable.generate<Sample, JsonParser>(
- java.util.concurrent.Callable { jParser.gobbleJsonToSamples() },
- io.reactivex.functions.BiConsumer<JsonParser, Emitter<Sample>> {
- parser: JsonParser, emitter: Emitter<Sample> ->
- pullOrComplete(parser, emitter)
- },
- Consumer<JsonParser> { jParser.close() }
- )
- }.flatMapPublisher { it }
- /**
- * Method downloadBlob will retrieve a blob from storage and return a
- * reactive stream of bytes.
- * @param url String is the url to the blob.
- * @return Single<ByteBuffer> is the blob's content.
- */
- fun downloadBlob(url: String): Single<ByteBuffer> =
- BlockBlobURL(URL(url), pipeline)
- .download(null, null, false, null)
- .flatMap {
- FlowableUtil
- .collectBytesInBuffer(
- it.body(ReliableDownloadOptions().withMaxRetryRequests(3))
- )
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement