Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- fun <T> ApolloSubscriptionCall<T>.toFlowCacheSafely(): Flow<Response<T>> = callbackFlow {
- val clone = clone()
- var pendingException: ApolloException? = null
- var hasResponse: Boolean = false
- clone.execute(
- object : ApolloSubscriptionCall.Callback<T> {
- override fun onConnected() {
- }
- override fun onResponse(response: Response<T>) {
- runCatching {
- channel.offer(response)
- hasResponse = true
- if (pendingException != null) {
- channel.close(pendingException)
- }
- }
- }
- override fun onFailure(e: ApolloException) {
- pendingException = e
- if (hasResponse) {
- channel.close(e)
- }
- }
- override fun onCompleted() {
- channel.close()
- }
- override fun onTerminated() {
- channel.close()
- }
- }
- )
- awaitClose { clone.cancel() }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement