Advertisement
Guest User

Untitled

a guest
Jan 14th, 2019
74
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Kotlin 16.93 KB | None | 0 0
  1. package com.grilleye.pro.data.repository
  2.  
  3. import android.content.Context
  4. import android.net.ConnectivityManager
  5. import android.util.Log
  6. import com.grilleye.pro.data.entities.SensorsResponseEntity
  7. import com.grilleye.pro.data.entities.WSState
  8. import com.grilleye.pro.data.mappers.*
  9. import com.grilleye.pro.data.models.AlarmNotificationModel
  10. import com.grilleye.pro.data.models.ProbeModel
  11. import com.grilleye.pro.extentions.*
  12. import com.neovisionaries.ws.client.*
  13. import rx.Completable
  14. import rx.Observable
  15. import rx.Single
  16. import rx.schedulers.Schedulers
  17. import rx.subjects.PublishSubject
  18. import rx.subscriptions.Subscriptions
  19. import java.lang.Math.abs
  20. import java.util.*
  21. import java.util.concurrent.TimeUnit
  22. import kotlin.collections.ArrayList
  23.  
  24. class WebSocketsRepositoryImpl(private val wsMessagesParser: WsMessagesParser,
  25.                                private val temperatureSensorsMapper: TemperatureSensorsMapper,
  26.                                private val context: Context,
  27.                                private val logMapper: LogMapper,
  28.                                private val alarmsMapper: AlarmsMapper
  29. ) : WebSocketsRepository {
  30.  
  31.     companion object {
  32.         val CONNECT_TIMEOUT = 10_000
  33.  
  34.         val HARDCODED_ADDR = "ws://10.1.43.158"
  35.     }                       //ws://10.1.43.177
  36.  
  37.     private var tempSubject: PublishSubject<SensorsResponseEntity> = PublishSubject.create()
  38.     private var completionSubject: PublishSubject<ByteArray> = PublishSubject.create()
  39.  
  40.     private var tempRequestsSubscription = Subscriptions.unsubscribed()
  41.  
  42.     private var webSocket: WebSocket? = null
  43.  
  44.     private var logLength = 0
  45.     private var logIndex = 0
  46.     private var logArray: ArrayList<Byte> = ArrayList()
  47.  
  48.     private var logSubject: PublishSubject<List<Int>> = PublishSubject.create()
  49.     private var signalQualitySubject: PublishSubject<Byte> = PublishSubject.create()
  50.     private var batteyLevelSubject: PublishSubject<Byte> = PublishSubject.create()
  51.     private var updateSubject: PublishSubject<Int> = PublishSubject.create()
  52.     private var wifiNameSubject: PublishSubject<String> = PublishSubject.create()
  53.     private var firmwareSubject: PublishSubject<String> = PublishSubject.create()
  54.     private var serialSubject: PublishSubject<String> = PublishSubject.create()
  55.     private var devicePresetsStateSubject: PublishSubject<Int> = PublishSubject.create()
  56.     private var alarmNotificationSubject: PublishSubject<List<AlarmNotificationModel>> = PublishSubject.create()
  57.  
  58.     private var blocksLogValusesSubject: PublishSubject<Int> = PublishSubject.create()
  59.  
  60.     private var probeId = -1
  61.  
  62.     private val webSocketListener = object : WebSocketAdapter() {
  63.         override fun onBinaryMessage(websocket: WebSocket?, binary: ByteArray) {
  64.  
  65.             val msgType = wsMessagesParser.getMessageTypeFromBytes(binary)
  66.             Log.d("UpdateTesting", "onBinaryMessage, msgType: $msgType, binary[5] = ${binary[5].toUINT8()}")
  67.             if (msgType == (UUIDS.ALARM)) {
  68.                 alarmNotificationSubject.onNext(alarmsMapper.map(Arrays.copyOfRange(binary, 4, binary.size)))
  69.             } else if (msgType == (UUIDS.UPDATE)) {
  70.                 if (binary[4] == 0x55.toByte()) {
  71.                     updateSubject.onNext(binary[5].toUINT8())
  72.                 }
  73.             } else if (msgType == (UUIDS.WIFI_NAME)) {
  74.                 wifiNameSubject.onNext(String(Arrays.copyOfRange(binary, 4, binary.size)))
  75.                 wifiNameSubject.onCompleted()
  76.             } else if (msgType == (UUIDS.SERIAL)) {
  77.                 serialSubject.onNext(getHexValue(Arrays.copyOfRange(binary, 4, binary.size)))
  78.                 serialSubject.onCompleted()
  79.             } else if (msgType == (UUIDS.FIRMWARE)) {
  80.                 firmwareSubject.onNext(String(Arrays.copyOfRange(binary, 4, binary.size)))
  81.                 firmwareSubject.onCompleted()
  82.             } else if (msgType == (UUIDS.TEMP)) {
  83.                 Log.d("onxTempMap", Arrays.toString(binary))
  84.                 val mapped = temperatureSensorsMapper.map(Arrays.copyOfRange(binary, 4, binary.size - 2))
  85.                 tempSubject.onNext(mapped)
  86.                 signalQualitySubject.onNext(binary[binary.size - 3])
  87.                 batteyLevelSubject.onNext(binary[binary.size - 2])
  88.                 devicePresetsStateSubject.onNext(binary[binary.size - 1].toInt())
  89.             } else if (msgType == (UUIDS.LOG_LENGTH)) {
  90.                 logLength = IntegerConversion.convertTwoBytesToInt2(binary[5], binary[6])
  91.                 blocksLogValusesSubject.onNext(logLength)
  92.                 Log.d("onxPoints1", "Len $logLength")
  93.                 trigerLogRequests(binary[4].toInt(), 0, 0)
  94.             } else if (msgType == (UUIDS.LOG)) {
  95.                 Log.d("onxPoints1", "triggered")
  96.                 if (binary[4].toInt() == probeId && probeId != -1) {
  97.                     Arrays.copyOfRange(binary, 7, binary.size).forEach {
  98.                         Log.d("onxPoint3", "$it")
  99.                         logArray.add(it)
  100.                     }
  101.                 }
  102.  
  103.                 blocksLogValusesSubject.onNext(logIndex)
  104.                 logIndex++
  105.                 if (logIndex <= logLength) {
  106.                     Log.d("onxPoints1", "Len $logLength      Index $logIndex")
  107.                     Log.d("onxPoints6", Arrays.toString(binary))
  108.                     trigerLogRequests(binary[4].toInt(), createBufferArrayForTemp(logIndex)[0].toInt(),
  109.                             createBufferArrayForTemp(logIndex)[1].toInt())
  110.                 } else if (logIndex == logLength + 1 && logLength != 0) {
  111.                     Log.d("onxPoints1", "else")
  112.                     Log.d("onxPoint3", logArray.toString())
  113.                     Log.d("adsjfks", "send12345" + "$logIndex, $logLength")
  114. //                    val mappedList = ArrayList<Int>()
  115. //                    logMapper.map(logArray.toByteArray()).forEach {
  116. //                        mappedList.add(tempUnitMapper.mapTempUnit(it))
  117. //                    }
  118.                     logSubject.onNext(logMapper.map(logArray.toByteArray()))
  119.                 }
  120.  
  121.             } else if (msgType == (UUIDS.TIME_STAMP)) {
  122.                 if (abs(IntegerConversion.convertFourBytesToInt2(binary[4], binary[5], binary[6], binary[7]) - constractTimeStampSeconds()) > 3)
  123.                     sendTimeStamp()
  124.             } else if (msgType == (UUIDS.SIGNAL)) {
  125.                 sendProxIllumination(binary[0].toUINT8())
  126.             } else if (msgType != null) {
  127.                 completionSubject.onNext(binary)
  128.             } else {
  129.                 Log.i("onxWsStrangeMsg", Arrays.toString(binary))
  130.             }
  131.         }
  132.  
  133.         override fun onDisconnected(websocket: WebSocket?, serverCloseFrame: WebSocketFrame?, clientCloseFrame: WebSocketFrame?, closedByServer: Boolean) {
  134.             super.onDisconnected(websocket, serverCloseFrame, clientCloseFrame, closedByServer)
  135.             Log.i("onxDisconnected", "here")
  136.             // reconnect goes here
  137.             // it's wise to delay attempts (at least 2-3 seconds, I suppose)
  138.         }
  139.  
  140.         //TODO: error handling to be implemented
  141.         override fun onError(websocket: WebSocket?, cause: WebSocketException?) {
  142.             super.onError(websocket, cause)
  143.             Log.i("onxWsError", cause.toString())
  144.         }
  145.     }
  146.  
  147.     override fun connect(): Completable {
  148.         Log.d("onxConnectionKeeperTest", "connect()")
  149.         return Completable.create {
  150.             try {
  151.                 if (context.getWifiIp() != "") {
  152.                     webSocket = connectWs()
  153.                     webSocket?.socket?.soTimeout = 5000
  154.                 }
  155.                 it.onCompleted()
  156.             } catch (e: OpeningHandshakeException) {
  157.                 it.onError(e)
  158.                 Log.i("onxHandshakeException", e.toString())
  159.             } catch (e: HostnameUnverifiedException) {
  160.                 it.onError(e)
  161.                 Log.i("onxHostnameException", e.toString())
  162.             } catch (e: WebSocketException) {
  163.                 it.onError(e)
  164.                 Log.i("onxWebSocketException", e.toString())
  165.             }
  166.         }
  167.     }
  168.  
  169.     private fun connectWs(): WebSocket {
  170.         Log.d("onxWifiIp", context.getWifiIp())
  171.         Log.d("onxConnectionKeeperTest", "IP: " + context.getWifiIp())
  172.         return WebSocketFactory()
  173.                 .createSocket(context.getWifiIp(), CONNECT_TIMEOUT)
  174.                 .addListener(webSocketListener)
  175.                 .addExtension(WebSocketExtension.PERMESSAGE_DEFLATE)
  176.                 .connect()
  177.     }
  178.  
  179.     override fun disconnect(): Completable {
  180.         return Completable.fromCallable { webSocket?.disconnect() }
  181.     }
  182.  
  183.     override fun getBlocksLogValues(): Observable<Int> {
  184.         return blocksLogValusesSubject
  185.     }
  186.  
  187.     override fun isDeviceConnectedFiltered() = Observable.interval(200, TimeUnit.MILLISECONDS).map { getWebSocketState() }
  188.  
  189.     private fun getWebSocketState(): Boolean {
  190.         if (webSocket == null) {
  191.             return false
  192.         }
  193.  
  194.         return webSocket?.isOpen ?: false
  195.     }
  196.  
  197.     override fun singleState(): Boolean {
  198.         return getWebSocketState()
  199.     }
  200.  
  201.     override fun getConnectionState(): Observable<WSState> = Observable.interval(1, TimeUnit.SECONDS).map { getFullWSState() }
  202.  
  203.     private fun getFullWSState(): WSState {
  204.         webSocket?.pingInterval
  205.         if (!isWifiOn()) {
  206.             return WSState.DISCONNECTED
  207.         }
  208.         if (isWifiOn() && webSocket?.isOpen != true) {
  209.             return WSState.RECONNECTING
  210.         }
  211.         if (isWifiOn() && webSocket?.isOpen == true) {
  212.             return WSState.CONNECTED
  213.         }
  214.         return WSState.RECONNECTING
  215.     }
  216.  
  217.     override fun getDevicePresetsState(): Single<Int> {
  218.         return devicePresetsStateSubject.take(1).toSingle()
  219.     }
  220.  
  221.     // TODO: check threading
  222.     override fun listenForTemperatureSensorsInfo(): Observable<SensorsResponseEntity> {
  223.         return tempSubject
  224.                 .doOnSubscribe {
  225.                     triggerTempRequests();
  226.                 }
  227.                 .doOnUnsubscribe { tempRequestsSubscription.unsubscribe(); }
  228.     }
  229.  
  230.     override fun listenForBatteryLevel(): Observable<Byte> {
  231.         return batteyLevelSubject
  232.     }
  233.  
  234.     // TODO: implement unsubscription mechanism
  235.     private fun triggerTempRequests() {
  236.         tempRequestsSubscription = Observable.interval(1, TimeUnit.SECONDS).subscribe {
  237.             webSocket?.sendBinary(UUIDS.TEMP.uuidArray)
  238.         }
  239.     }
  240.  
  241.     override fun listenForSignalQuality(): Observable<Byte> {
  242.         return signalQualitySubject
  243.     }
  244.  
  245.     override fun listenForLog(probeId: Int): Observable<List<Int>> {
  246.         logLength = 0
  247.         logIndex = 0
  248.         logArray = ArrayList()
  249.         return logSubject
  250.                 .doOnSubscribe {
  251.                     readLogLength(probeId);
  252.                     Log.d("adsjfks", "subscribed" + "$logIndex, $logLength")
  253.                 }
  254.                 .doOnNext {
  255.                     Log.d("adsjfks", "send")
  256.                     Log.d("adsjfks", "send" + "$logIndex, $logLength")
  257.                 }
  258.                 .doOnUnsubscribe { Log.d("adsjfks", "unsubscribed") }
  259.     }
  260.  
  261.     private fun readLogLength(probeId: Int) {
  262.         this.probeId = probeId
  263.         webSocket?.sendBinary(UUIDS.LOG_LENGTH.uuidArray + constractLogLengthByteArray(probeId))
  264.     }
  265.  
  266.     private fun trigerLogRequests(probeId: Int, numLow: Int, numHigh: Int) {
  267.         Log.d("onxPoints1", "triger" + numLow.toString() + " " + numHigh.toString())
  268.         webSocket?.sendBinary(UUIDS.LOG.uuidArray + constractLogByteArray(probeId, numLow, numHigh))
  269.     }
  270.  
  271.     override fun listenForTimeStamp(): Completable {
  272.         webSocket?.sendBinary(UUIDS.TIME_STAMP.uuidArray)
  273.         return Completable.complete()
  274.     }
  275.  
  276.     override fun readProxIllumination(): Completable {
  277.         webSocket?.sendBinary(UUIDS.SIGNAL.uuidArray)
  278.         return Completable.complete()
  279.     }
  280.  
  281.     private fun listenForUpdateRequest(): Completable {
  282.         webSocket?.sendBinary(UUIDS.UPDATE.uuidArray + 0x55.toByte())
  283.         return Completable.complete()
  284.     }
  285.  
  286.     override fun getWifiName(): Single<String> {
  287.         return wifiNameSubject.toSingle()
  288.                 .doOnSubscribe { trigerWifiName() }
  289.     }
  290.  
  291.     private fun trigerWifiName() {
  292.         webSocket?.sendBinary(UUIDS.WIFI_NAME.uuidArray)
  293.     }
  294.  
  295.     override fun getFirmwareVersion(): Single<String> {
  296.         return firmwareSubject.toSingle()
  297.                 .doOnSubscribe { trigerFirmwareVersion() }
  298.     }
  299.  
  300.     private fun trigerFirmwareVersion() {
  301.         webSocket?.sendBinary(UUIDS.FIRMWARE.uuidArray)
  302.     }
  303.  
  304.     override fun getSerialNumber(): Single<String> {
  305.         return serialSubject.toSingle()
  306.                 .doOnSubscribe { trigerSerialNumber() }
  307.     }
  308.  
  309.     private fun trigerSerialNumber() {
  310.         webSocket?.sendBinary(UUIDS.SERIAL.uuidArray)
  311.     }
  312.  
  313.     override fun listenForAlarms(): Observable<List<AlarmNotificationModel>> {
  314.         return alarmNotificationSubject
  315.     }
  316.  
  317.     override fun getUpdateRequestResult(): Single<Int> = updateSubject.take(1).toSingle().doOnSubscribe { listenForUpdateRequest()
  318.     Log.i("Update", "listenForUpdatesRequest - doOnSubscribe()")}
  319.  
  320.     override fun sendProxIllumination(quality: Int) = sendAndListen(UUIDS.SIGNAL, kotlin.ByteArray(1) {
  321.         when (it) {
  322.             0 -> quality.toByte()
  323.             else -> 0
  324.         }
  325.     })
  326.  
  327.     override fun sendMinTemp(list: List<ProbeModel>) = sendAndListen(UUIDS.MINTEMP,
  328.             constructMinTempByteArray(list, context))
  329.  
  330.     override fun sendMaxTemp(list: List<ProbeModel>) = sendAndListen(UUIDS.MAXTEMP,
  331.             constructMaxTempByteArray(list, context))
  332.  
  333.     override fun sendAlarmRequest() = sendAndListen(UUIDS.ALARM,
  334.             constructAlarmByteArray())
  335.  
  336.     override fun sendNamesRequest(probeModel: ProbeModel) = sendAndListen(UUIDS.NAMES,
  337.             constructSetNameByteArray(probeModel))
  338.  
  339.     override fun sendTimerRequest(probeId: Int, timerCmd: Int, timerVal: Long) = sendAndListen(UUIDS.TIMER,
  340.             constructSetTimerByteArray(probeId, timerCmd, timerVal))
  341.  
  342.     override fun sendCoolDownTime(coolDownTime: Int) = sendAndListen(UUIDS.COOL_DOWN,
  343.             constractCoolDownByteArray(coolDownTime))
  344.  
  345.     override fun sendSettings(tempUnit: Int, muteGrill: Int, disconnected: Int) = sendAndListen(UUIDS.SETTINGS,
  346.             constractSettingsByteArray(tempUnit, muteGrill, disconnected))
  347.  
  348.     override fun clearDevice(): Completable {
  349.  
  350.         //clear wifi
  351.         sendAndListen(UUIDS.UPDATE,
  352.                 constructClearWifiFROMWIFIByteArray()).subscribe(
  353.                 {
  354.                     Log.d("onxDeleteDevice", "WIFI, wifi was cleared")
  355.                     disconnect().subscribe(
  356.                             { Log.d("onxDeleteDevice", "WIFI, device was disconnected") },
  357.                             { Log.d("onxDeleteDeviceErr", "WIFI, device wasn't disconnect") }
  358.                     )
  359.                 },
  360.                 { Log.d("onxDeleteDeviceErr", "WIFI, wifi wasn't cleared") })
  361. //        //clear presets
  362. //        sendAndListen(UUIDS.UPDATE,
  363. //                constructClearPresetsByteArray()).subscribe(
  364. //                { Log.d("onxDeleteDevice", "WIFI, Presets were cleared") },
  365. //                { Log.d("onxDeleteDeviceErr", "WIFI, Presets were not cleared") })
  366.  
  367.  
  368.         return Completable.complete()
  369.     }
  370.  
  371.     private fun sendTimeStamp() = sendAndListen(UUIDS.TIME_STAMP, constructTimeStampByteArray())
  372.  
  373.     private fun sendAndListen(msgType: UUIDS, byteArray: ByteArray) =
  374.             Completable.create { completionObj ->
  375.                 completionSubject
  376.                         .subscribeOn(Schedulers.io())
  377.                         .takeFirst { wsMessagesParser.getMessageTypeFromBytes(it) == msgType }
  378.                         .subscribe(
  379.                                 {
  380.                                     if (msgType == UUIDS.TIMER) Log.d("onxTimerSet", "timer request sended, it= $it , it size: ${it.size}")
  381.                                     else if (msgType == UUIDS.UPDATE) Log.d("onxDeleteDeviceTest", "OnDeviceDeleteCompleted")
  382.                                     completionObj.onCompleted()
  383.                                 },
  384.                                 {
  385.                                     if (msgType == UUIDS.TIMER) Log.d("onxTimerSetErr", "timer send request error, it: $it")
  386.                                     else if (msgType == UUIDS.UPDATE) Log.d("onxDeleteDeviceTestErr", "device deleting request error, it: $it")
  387.                                     completionObj.onError(it)
  388.                                 }
  389.                         )
  390.  
  391.                 webSocket?.sendBinary(msgType.uuidArray + byteArray)
  392.                 Log.i("onxWebSocketSendListen", Arrays.toString(byteArray))
  393.             }
  394.  
  395.     private fun isWifiOn(): Boolean {
  396.         val connManager = context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
  397.         return connManager.getNetworkInfo(ConnectivityManager.TYPE_WIFI).isConnected
  398.     }
  399. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement