Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.grilleye.pro.data.repository
- import android.content.Context
- import android.net.ConnectivityManager
- import android.util.Log
- import com.grilleye.pro.data.entities.SensorsResponseEntity
- import com.grilleye.pro.data.entities.WSState
- import com.grilleye.pro.data.mappers.*
- import com.grilleye.pro.data.models.AlarmNotificationModel
- import com.grilleye.pro.data.models.ProbeModel
- import com.grilleye.pro.extentions.*
- import com.neovisionaries.ws.client.*
- import rx.Completable
- import rx.Observable
- import rx.Single
- import rx.schedulers.Schedulers
- import rx.subjects.PublishSubject
- import rx.subscriptions.Subscriptions
- import java.lang.Math.abs
- import java.util.*
- import java.util.concurrent.TimeUnit
- import kotlin.collections.ArrayList
- class WebSocketsRepositoryImpl(private val wsMessagesParser: WsMessagesParser,
- private val temperatureSensorsMapper: TemperatureSensorsMapper,
- private val context: Context,
- private val logMapper: LogMapper,
- private val alarmsMapper: AlarmsMapper
- ) : WebSocketsRepository {
- companion object {
- val CONNECT_TIMEOUT = 10_000
- val HARDCODED_ADDR = "ws://10.1.43.158"
- } //ws://10.1.43.177
- private var tempSubject: PublishSubject<SensorsResponseEntity> = PublishSubject.create()
- private var completionSubject: PublishSubject<ByteArray> = PublishSubject.create()
- private var tempRequestsSubscription = Subscriptions.unsubscribed()
- private var webSocket: WebSocket? = null
- private var logLength = 0
- private var logIndex = 0
- private var logArray: ArrayList<Byte> = ArrayList()
- private var logSubject: PublishSubject<List<Int>> = PublishSubject.create()
- private var signalQualitySubject: PublishSubject<Byte> = PublishSubject.create()
- private var batteyLevelSubject: PublishSubject<Byte> = PublishSubject.create()
- private var updateSubject: PublishSubject<Int> = PublishSubject.create()
- private var wifiNameSubject: PublishSubject<String> = PublishSubject.create()
- private var firmwareSubject: PublishSubject<String> = PublishSubject.create()
- private var serialSubject: PublishSubject<String> = PublishSubject.create()
- private var devicePresetsStateSubject: PublishSubject<Int> = PublishSubject.create()
- private var alarmNotificationSubject: PublishSubject<List<AlarmNotificationModel>> = PublishSubject.create()
- private var blocksLogValusesSubject: PublishSubject<Int> = PublishSubject.create()
- private var probeId = -1
- private val webSocketListener = object : WebSocketAdapter() {
- override fun onBinaryMessage(websocket: WebSocket?, binary: ByteArray) {
- val msgType = wsMessagesParser.getMessageTypeFromBytes(binary)
- Log.d("UpdateTesting", "onBinaryMessage, msgType: $msgType, binary[5] = ${binary[5].toUINT8()}")
- if (msgType == (UUIDS.ALARM)) {
- alarmNotificationSubject.onNext(alarmsMapper.map(Arrays.copyOfRange(binary, 4, binary.size)))
- } else if (msgType == (UUIDS.UPDATE)) {
- if (binary[4] == 0x55.toByte()) {
- updateSubject.onNext(binary[5].toUINT8())
- }
- } else if (msgType == (UUIDS.WIFI_NAME)) {
- wifiNameSubject.onNext(String(Arrays.copyOfRange(binary, 4, binary.size)))
- wifiNameSubject.onCompleted()
- } else if (msgType == (UUIDS.SERIAL)) {
- serialSubject.onNext(getHexValue(Arrays.copyOfRange(binary, 4, binary.size)))
- serialSubject.onCompleted()
- } else if (msgType == (UUIDS.FIRMWARE)) {
- firmwareSubject.onNext(String(Arrays.copyOfRange(binary, 4, binary.size)))
- firmwareSubject.onCompleted()
- } else if (msgType == (UUIDS.TEMP)) {
- Log.d("onxTempMap", Arrays.toString(binary))
- val mapped = temperatureSensorsMapper.map(Arrays.copyOfRange(binary, 4, binary.size - 2))
- tempSubject.onNext(mapped)
- signalQualitySubject.onNext(binary[binary.size - 3])
- batteyLevelSubject.onNext(binary[binary.size - 2])
- devicePresetsStateSubject.onNext(binary[binary.size - 1].toInt())
- } else if (msgType == (UUIDS.LOG_LENGTH)) {
- logLength = IntegerConversion.convertTwoBytesToInt2(binary[5], binary[6])
- blocksLogValusesSubject.onNext(logLength)
- Log.d("onxPoints1", "Len $logLength")
- trigerLogRequests(binary[4].toInt(), 0, 0)
- } else if (msgType == (UUIDS.LOG)) {
- Log.d("onxPoints1", "triggered")
- if (binary[4].toInt() == probeId && probeId != -1) {
- Arrays.copyOfRange(binary, 7, binary.size).forEach {
- Log.d("onxPoint3", "$it")
- logArray.add(it)
- }
- }
- blocksLogValusesSubject.onNext(logIndex)
- logIndex++
- if (logIndex <= logLength) {
- Log.d("onxPoints1", "Len $logLength Index $logIndex")
- Log.d("onxPoints6", Arrays.toString(binary))
- trigerLogRequests(binary[4].toInt(), createBufferArrayForTemp(logIndex)[0].toInt(),
- createBufferArrayForTemp(logIndex)[1].toInt())
- } else if (logIndex == logLength + 1 && logLength != 0) {
- Log.d("onxPoints1", "else")
- Log.d("onxPoint3", logArray.toString())
- Log.d("adsjfks", "send12345" + "$logIndex, $logLength")
- // val mappedList = ArrayList<Int>()
- // logMapper.map(logArray.toByteArray()).forEach {
- // mappedList.add(tempUnitMapper.mapTempUnit(it))
- // }
- logSubject.onNext(logMapper.map(logArray.toByteArray()))
- }
- } else if (msgType == (UUIDS.TIME_STAMP)) {
- if (abs(IntegerConversion.convertFourBytesToInt2(binary[4], binary[5], binary[6], binary[7]) - constractTimeStampSeconds()) > 3)
- sendTimeStamp()
- } else if (msgType == (UUIDS.SIGNAL)) {
- sendProxIllumination(binary[0].toUINT8())
- } else if (msgType != null) {
- completionSubject.onNext(binary)
- } else {
- Log.i("onxWsStrangeMsg", Arrays.toString(binary))
- }
- }
- override fun onDisconnected(websocket: WebSocket?, serverCloseFrame: WebSocketFrame?, clientCloseFrame: WebSocketFrame?, closedByServer: Boolean) {
- super.onDisconnected(websocket, serverCloseFrame, clientCloseFrame, closedByServer)
- Log.i("onxDisconnected", "here")
- // reconnect goes here
- // it's wise to delay attempts (at least 2-3 seconds, I suppose)
- }
- //TODO: error handling to be implemented
- override fun onError(websocket: WebSocket?, cause: WebSocketException?) {
- super.onError(websocket, cause)
- Log.i("onxWsError", cause.toString())
- }
- }
- override fun connect(): Completable {
- Log.d("onxConnectionKeeperTest", "connect()")
- return Completable.create {
- try {
- if (context.getWifiIp() != "") {
- webSocket = connectWs()
- webSocket?.socket?.soTimeout = 5000
- }
- it.onCompleted()
- } catch (e: OpeningHandshakeException) {
- it.onError(e)
- Log.i("onxHandshakeException", e.toString())
- } catch (e: HostnameUnverifiedException) {
- it.onError(e)
- Log.i("onxHostnameException", e.toString())
- } catch (e: WebSocketException) {
- it.onError(e)
- Log.i("onxWebSocketException", e.toString())
- }
- }
- }
- private fun connectWs(): WebSocket {
- Log.d("onxWifiIp", context.getWifiIp())
- Log.d("onxConnectionKeeperTest", "IP: " + context.getWifiIp())
- return WebSocketFactory()
- .createSocket(context.getWifiIp(), CONNECT_TIMEOUT)
- .addListener(webSocketListener)
- .addExtension(WebSocketExtension.PERMESSAGE_DEFLATE)
- .connect()
- }
- override fun disconnect(): Completable {
- return Completable.fromCallable { webSocket?.disconnect() }
- }
- override fun getBlocksLogValues(): Observable<Int> {
- return blocksLogValusesSubject
- }
- override fun isDeviceConnectedFiltered() = Observable.interval(200, TimeUnit.MILLISECONDS).map { getWebSocketState() }
- private fun getWebSocketState(): Boolean {
- if (webSocket == null) {
- return false
- }
- return webSocket?.isOpen ?: false
- }
- override fun singleState(): Boolean {
- return getWebSocketState()
- }
- override fun getConnectionState(): Observable<WSState> = Observable.interval(1, TimeUnit.SECONDS).map { getFullWSState() }
- private fun getFullWSState(): WSState {
- webSocket?.pingInterval
- if (!isWifiOn()) {
- return WSState.DISCONNECTED
- }
- if (isWifiOn() && webSocket?.isOpen != true) {
- return WSState.RECONNECTING
- }
- if (isWifiOn() && webSocket?.isOpen == true) {
- return WSState.CONNECTED
- }
- return WSState.RECONNECTING
- }
- override fun getDevicePresetsState(): Single<Int> {
- return devicePresetsStateSubject.take(1).toSingle()
- }
- // TODO: check threading
- override fun listenForTemperatureSensorsInfo(): Observable<SensorsResponseEntity> {
- return tempSubject
- .doOnSubscribe {
- triggerTempRequests();
- }
- .doOnUnsubscribe { tempRequestsSubscription.unsubscribe(); }
- }
- override fun listenForBatteryLevel(): Observable<Byte> {
- return batteyLevelSubject
- }
- // TODO: implement unsubscription mechanism
- private fun triggerTempRequests() {
- tempRequestsSubscription = Observable.interval(1, TimeUnit.SECONDS).subscribe {
- webSocket?.sendBinary(UUIDS.TEMP.uuidArray)
- }
- }
- override fun listenForSignalQuality(): Observable<Byte> {
- return signalQualitySubject
- }
- override fun listenForLog(probeId: Int): Observable<List<Int>> {
- logLength = 0
- logIndex = 0
- logArray = ArrayList()
- return logSubject
- .doOnSubscribe {
- readLogLength(probeId);
- Log.d("adsjfks", "subscribed" + "$logIndex, $logLength")
- }
- .doOnNext {
- Log.d("adsjfks", "send")
- Log.d("adsjfks", "send" + "$logIndex, $logLength")
- }
- .doOnUnsubscribe { Log.d("adsjfks", "unsubscribed") }
- }
- private fun readLogLength(probeId: Int) {
- this.probeId = probeId
- webSocket?.sendBinary(UUIDS.LOG_LENGTH.uuidArray + constractLogLengthByteArray(probeId))
- }
- private fun trigerLogRequests(probeId: Int, numLow: Int, numHigh: Int) {
- Log.d("onxPoints1", "triger" + numLow.toString() + " " + numHigh.toString())
- webSocket?.sendBinary(UUIDS.LOG.uuidArray + constractLogByteArray(probeId, numLow, numHigh))
- }
- override fun listenForTimeStamp(): Completable {
- webSocket?.sendBinary(UUIDS.TIME_STAMP.uuidArray)
- return Completable.complete()
- }
- override fun readProxIllumination(): Completable {
- webSocket?.sendBinary(UUIDS.SIGNAL.uuidArray)
- return Completable.complete()
- }
- private fun listenForUpdateRequest(): Completable {
- webSocket?.sendBinary(UUIDS.UPDATE.uuidArray + 0x55.toByte())
- return Completable.complete()
- }
- override fun getWifiName(): Single<String> {
- return wifiNameSubject.toSingle()
- .doOnSubscribe { trigerWifiName() }
- }
- private fun trigerWifiName() {
- webSocket?.sendBinary(UUIDS.WIFI_NAME.uuidArray)
- }
- override fun getFirmwareVersion(): Single<String> {
- return firmwareSubject.toSingle()
- .doOnSubscribe { trigerFirmwareVersion() }
- }
- private fun trigerFirmwareVersion() {
- webSocket?.sendBinary(UUIDS.FIRMWARE.uuidArray)
- }
- override fun getSerialNumber(): Single<String> {
- return serialSubject.toSingle()
- .doOnSubscribe { trigerSerialNumber() }
- }
- private fun trigerSerialNumber() {
- webSocket?.sendBinary(UUIDS.SERIAL.uuidArray)
- }
- override fun listenForAlarms(): Observable<List<AlarmNotificationModel>> {
- return alarmNotificationSubject
- }
- override fun getUpdateRequestResult(): Single<Int> = updateSubject.take(1).toSingle().doOnSubscribe { listenForUpdateRequest()
- Log.i("Update", "listenForUpdatesRequest - doOnSubscribe()")}
- override fun sendProxIllumination(quality: Int) = sendAndListen(UUIDS.SIGNAL, kotlin.ByteArray(1) {
- when (it) {
- 0 -> quality.toByte()
- else -> 0
- }
- })
- override fun sendMinTemp(list: List<ProbeModel>) = sendAndListen(UUIDS.MINTEMP,
- constructMinTempByteArray(list, context))
- override fun sendMaxTemp(list: List<ProbeModel>) = sendAndListen(UUIDS.MAXTEMP,
- constructMaxTempByteArray(list, context))
- override fun sendAlarmRequest() = sendAndListen(UUIDS.ALARM,
- constructAlarmByteArray())
- override fun sendNamesRequest(probeModel: ProbeModel) = sendAndListen(UUIDS.NAMES,
- constructSetNameByteArray(probeModel))
- override fun sendTimerRequest(probeId: Int, timerCmd: Int, timerVal: Long) = sendAndListen(UUIDS.TIMER,
- constructSetTimerByteArray(probeId, timerCmd, timerVal))
- override fun sendCoolDownTime(coolDownTime: Int) = sendAndListen(UUIDS.COOL_DOWN,
- constractCoolDownByteArray(coolDownTime))
- override fun sendSettings(tempUnit: Int, muteGrill: Int, disconnected: Int) = sendAndListen(UUIDS.SETTINGS,
- constractSettingsByteArray(tempUnit, muteGrill, disconnected))
- override fun clearDevice(): Completable {
- //clear wifi
- sendAndListen(UUIDS.UPDATE,
- constructClearWifiFROMWIFIByteArray()).subscribe(
- {
- Log.d("onxDeleteDevice", "WIFI, wifi was cleared")
- disconnect().subscribe(
- { Log.d("onxDeleteDevice", "WIFI, device was disconnected") },
- { Log.d("onxDeleteDeviceErr", "WIFI, device wasn't disconnect") }
- )
- },
- { Log.d("onxDeleteDeviceErr", "WIFI, wifi wasn't cleared") })
- // //clear presets
- // sendAndListen(UUIDS.UPDATE,
- // constructClearPresetsByteArray()).subscribe(
- // { Log.d("onxDeleteDevice", "WIFI, Presets were cleared") },
- // { Log.d("onxDeleteDeviceErr", "WIFI, Presets were not cleared") })
- return Completable.complete()
- }
- private fun sendTimeStamp() = sendAndListen(UUIDS.TIME_STAMP, constructTimeStampByteArray())
- private fun sendAndListen(msgType: UUIDS, byteArray: ByteArray) =
- Completable.create { completionObj ->
- completionSubject
- .subscribeOn(Schedulers.io())
- .takeFirst { wsMessagesParser.getMessageTypeFromBytes(it) == msgType }
- .subscribe(
- {
- if (msgType == UUIDS.TIMER) Log.d("onxTimerSet", "timer request sended, it= $it , it size: ${it.size}")
- else if (msgType == UUIDS.UPDATE) Log.d("onxDeleteDeviceTest", "OnDeviceDeleteCompleted")
- completionObj.onCompleted()
- },
- {
- if (msgType == UUIDS.TIMER) Log.d("onxTimerSetErr", "timer send request error, it: $it")
- else if (msgType == UUIDS.UPDATE) Log.d("onxDeleteDeviceTestErr", "device deleting request error, it: $it")
- completionObj.onError(it)
- }
- )
- webSocket?.sendBinary(msgType.uuidArray + byteArray)
- Log.i("onxWebSocketSendListen", Arrays.toString(byteArray))
- }
- private fun isWifiOn(): Boolean {
- val connManager = context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
- return connManager.getNetworkInfo(ConnectivityManager.TYPE_WIFI).isConnected
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement