Guest User

Untitled

a guest
Apr 4th, 2020
256
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Kotlin 4.65 KB | None | 0 0
  1. override fun oneOnOneChatMessagesFromConversationDescending(
  2.         oneOnOneChatConversationId: String
  3.     ): Flow<State<List<OneOnOneChatMessage<ChatMessageContent>>>> =
  4.         channelFlow<State<List<OneOnOneChatMessage<ChatMessageContent>>>> {
  5.             println("PENTAGON: channelFlow start")
  6.             send(State.Loading)
  7.  
  8.             val chatMessageHistoryDataState: State.Data<List<OneOnOneChatMessage<ChatMessageContent>>> = run {
  9.                 val chatMessageHistory: List<OneOnOneChatMessage<ChatMessageContent>> = run {
  10.                     when (val getChatMessageHistoryResult =
  11.                         getChatMessagesFromOneOnOneChatConversation(oneOnOneChatConversationId)) {
  12.  
  13.                         is Result.Failure -> {
  14.                             offer(State.Error(getChatMessageHistoryResult.exception))
  15.                             close()
  16.                             return@channelFlow
  17.                         }
  18.                         is Result.Success -> getChatMessageHistoryResult.data
  19.                     }
  20.                 }
  21.                 State.Data(chatMessageHistory)
  22.             }
  23.             send(chatMessageHistoryDataState)
  24.  
  25.             val chatMessageStatusUpdates: Flow<OneOnOneChatMessageStatusUpdate> =
  26.                 chatMessageStatusUpdatesFromConversation(oneOnOneChatConversationId)
  27.  
  28.             val incomingChatMessageResults: Flow<Result<OneOnOneChatMessage<ChatMessageContent>>> =
  29.                 incomingChatMessagesFromOneOnOneConversation(
  30.                     quickbloxChatService = QBChatService.getInstance(),
  31.                     oneOnOneChatConversationId = oneOnOneChatConversationId
  32.                 )
  33.  
  34.             val incomingFailedChatMessageResults = incomingChatMessageResults.filterIsInstance<Result.Failure>()
  35.  
  36.             val incomingSuccessfulChatMessageResults = incomingChatMessageResults
  37.                 .filterIsInstance<Result.Success<OneOnOneChatMessage<ChatMessageContent>>>()
  38.  
  39.             val currentChatMessages = chatMessageHistoryDataState.data.toMutableList()
  40.  
  41.             // Update state to error on incoming failed chat message
  42.             incomingFailedChatMessageResults.onEach {
  43.                 if (isClosedForSend) return@onEach
  44.                 send(State.Error(it.exception))
  45.                 close()
  46.                 return@onEach
  47.             }.launchIn(this)
  48.  
  49.             // Update state to data with new incoming message
  50.             incomingSuccessfulChatMessageResults.onEach { incomingSuccessfulChatMessageResult ->
  51.                 if (isClosedForSend) return@onEach
  52.                 currentChatMessages.add(0, incomingSuccessfulChatMessageResult.data)
  53.                 send(State.Data(currentChatMessages))
  54.             }.launchIn(this)
  55.  
  56.             // Update state to data with new sent message
  57.             sentChatMessages.onEach { sentChatMessage ->
  58.                 if (isClosedForSend) return@onEach
  59.                 println("PENTAGON: sentChatMessages.onEach start")
  60.  
  61.                 println("PENTAGON: sentChatMessages.onEach currentChatMessages.size=${currentChatMessages.size}")
  62.                 println("PENTAGON: adding sentChatMessage, content=${sentChatMessage.content}")
  63.                 currentChatMessages.add(0, sentChatMessage)
  64.                 println("PENTAGON: sending: currentChatMessages.size=${currentChatMessages.size}")
  65.                 send(State.Data(currentChatMessages))
  66.                 println("PENTAGON: sentChatMessages.onEach end")
  67.             }.launchIn(this)
  68.  
  69.             // Update state on chat message status update
  70.             chatMessageStatusUpdates.onEach { chatMessageStatusUpdate ->
  71.                 if (isClosedForSend) return@onEach
  72.                 val toBeUpdatedMessageIndex = currentChatMessages.indexOfFirst {
  73.                     it.id == chatMessageStatusUpdate.oneOnOneChatMessageId
  74.                 }
  75.                 if (toBeUpdatedMessageIndex == -1) {
  76.                     val exception = IllegalStateException(
  77.                         "$chatMessageStatusUpdate is not applicable to $currentChatMessages. ID not found"
  78.                     )
  79.                     send(State.Error(exception))
  80.                     close()
  81.                     return@onEach
  82.                 }
  83.  
  84.                 val updatedMessage = run {
  85.                     val toBeUpdatedMessage = currentChatMessages[toBeUpdatedMessageIndex]
  86.                     toBeUpdatedMessage.copy(status = chatMessageStatusUpdate.newStatus)
  87.                 }
  88.                 currentChatMessages[toBeUpdatedMessageIndex] = updatedMessage
  89.                 send(State.Data(currentChatMessages))
  90.             }.launchIn(this)
  91.             println("PENTAGON: channelFlow end")
  92.         }.conflate()
Advertisement
Add Comment
Please, Sign In to add comment