Created
May 30, 2020 09:44
-
-
Save vamjakuldip/eb0d0cb4ecdb82954706fe93e9ec3377 to your computer and use it in GitHub Desktop.
RxWebsocket used for connect to websocket, its used retrofit libraby to connect websocket and handle try connection of websocket.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.websocket | |
import okhttp3.OkHttpClient | |
class Config private constructor() { | |
var client = OkHttpClient() | |
class Builder { | |
private val config: Config = Config() | |
fun setClient(client: OkHttpClient): Builder { | |
config.client = client | |
return this | |
} | |
fun build(): Config { | |
return config | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.websocket | |
import io.reactivex.Observable | |
import okio.ByteString | |
import java.util.concurrent.TimeUnit | |
object RxWebSocket { | |
fun setConfig(config: Config) { | |
val instance = RxWebSocketUtil.getInstance() | |
instance.setClient(config.client) | |
} | |
operator fun get(url: String): Observable<WebSocketInfo> { | |
return RxWebSocketUtil.getInstance().getWebSocketInfo(url) | |
} | |
operator fun get(url: String, timeout: Long, timeUnit: TimeUnit?): Observable<WebSocketInfo> { | |
return RxWebSocketUtil.getInstance().getWebSocketInfo(url, timeout, timeUnit) | |
} | |
fun send(url: String, msg: String) { | |
RxWebSocketUtil.getInstance().send(url, msg) | |
} | |
fun send(url: String, byteString: ByteString) { | |
RxWebSocketUtil.getInstance().send(url, byteString) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.websocket | |
import io.reactivex.Observable | |
import io.reactivex.ObservableEmitter | |
import io.reactivex.ObservableOnSubscribe | |
import io.reactivex.android.schedulers.AndroidSchedulers | |
import io.reactivex.schedulers.Schedulers | |
import okhttp3.* | |
import okio.ByteString | |
import retrofit2.HttpException | |
import java.io.IOException | |
import java.io.InterruptedIOException | |
import java.net.SocketTimeoutException | |
import java.util.concurrent.ConcurrentHashMap | |
import java.util.concurrent.TimeUnit | |
import java.util.concurrent.TimeoutException | |
class RxWebSocketUtil private constructor() { | |
private var client: OkHttpClient | |
private val observableMap: MutableMap<String, Observable<WebSocketInfo>?> | |
private val webSocketMap: MutableMap<String, WebSocket> | |
private var TAG: String = this.javaClass.simpleName | |
/** | |
* set your client | |
* | |
* @param client | |
*/ | |
fun setClient(client: OkHttpClient?) { | |
if (client == null) { | |
throw NullPointerException(" Are you kidding me ? client == null") | |
} | |
this.client = client | |
} | |
fun getWebSocketInfo(url: String, timeout: Long, timeUnit: TimeUnit?): Observable<WebSocketInfo> { | |
var observable = observableMap[url] | |
if (observable == null) { | |
observable = Observable.create(WebSocketOnSubscribe(url)) | |
.timeout(timeout, timeUnit) | |
.retryWhen { retryHandler(it) } | |
.retry { throwable: Throwable -> throwable is IOException || throwable is TimeoutException } | |
.doOnDispose { | |
observableMap.remove(url) | |
webSocketMap.remove(url) | |
} | |
.doOnNext { webSocketInfo: WebSocketInfo -> | |
if (webSocketInfo.isOnOpen) { | |
webSocketMap[url] = webSocketInfo.webSocket!! | |
} | |
} | |
.share() | |
.subscribeOn(Schedulers.io()) | |
.observeOn(AndroidSchedulers.mainThread()) | |
observableMap[url] = observable | |
} else { | |
val webSocket = webSocketMap[url] | |
if (webSocket != null) { | |
observable = observable.startWith(WebSocketInfo(webSocket, true)) | |
} | |
} | |
return observable!!.observeOn(AndroidSchedulers.mainThread()) | |
} | |
fun getWebSocketInfo(url: String): Observable<WebSocketInfo> { | |
return getWebSocketInfo(url, 30, TimeUnit.MINUTES) | |
} | |
fun getWebSocket(url: String): Observable<WebSocket> { | |
return getWebSocketInfo(url) | |
.filter { webSocketInfo: WebSocketInfo -> webSocketInfo.webSocket != null } | |
.map { obj: WebSocketInfo -> obj.webSocket } | |
} | |
fun send(url: String, msg: String) { | |
val webSocket = webSocketMap[url] | |
webSocket?.send(msg) | |
} | |
fun send(url: String, byteString: ByteString) { | |
val webSocket = webSocketMap[url] | |
webSocket?.send(byteString) | |
} | |
private fun getRequest(url: String): Request { | |
return Request.Builder().get().url(url).build() | |
} | |
private inner class WebSocketOnSubscribe(private val url: String) : ObservableOnSubscribe<WebSocketInfo> { | |
private var webSocket: WebSocket? = null | |
@Throws(Exception::class) | |
override fun subscribe(emitter: ObservableEmitter<WebSocketInfo?>) { | |
if (webSocket != null) { | |
if ("main" != Thread.currentThread().name) { | |
emitter.onNext(WebSocketInfo.createReconnect()) | |
} | |
} | |
initWebSocket(emitter) | |
} | |
private fun initWebSocket(emitter: ObservableEmitter<WebSocketInfo?>) { | |
webSocket = client.newWebSocket(getRequest(url), object : WebSocketListener() { | |
override fun onOpen(webSocket: WebSocket, response: Response) { | |
webSocketMap[url] = webSocket | |
if (!emitter.isDisposed) { | |
emitter.onNext(WebSocketInfo(webSocket, true)) | |
} | |
} | |
override fun onMessage(webSocket: WebSocket, text: String) { | |
if (!emitter.isDisposed) { | |
emitter.onNext(WebSocketInfo(webSocket, text)) | |
} | |
} | |
override fun onMessage(webSocket: WebSocket, bytes: ByteString) { | |
if (!emitter.isDisposed) { | |
emitter.onNext(WebSocketInfo(webSocket, bytes)) | |
} | |
} | |
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) { | |
if (!emitter.isDisposed) { | |
emitter.onError(t) | |
} | |
} | |
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) { | |
webSocket.close(1000, null) | |
} | |
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) { | |
} | |
}) | |
emitter.setCancellable { | |
webSocket?.close(3000, "close WebSocket") | |
} | |
} | |
} | |
companion object { | |
var rxWebSocketUtil: RxWebSocketUtil? = null | |
fun getInstance(): RxWebSocketUtil { | |
if (rxWebSocketUtil == null) { | |
synchronized(RxWebSocketUtil::class.java) { | |
if (rxWebSocketUtil == null) { | |
rxWebSocketUtil = RxWebSocketUtil() | |
} | |
} | |
} | |
return rxWebSocketUtil!! | |
} | |
} | |
fun retryHandler(throwable: Observable<Throwable>): Observable<Any> { | |
return throwable.flatMap { error -> | |
if (error is SocketTimeoutException) { | |
return@flatMap Observable.timer(5, TimeUnit.SECONDS) | |
} else if (error is InterruptedIOException) { | |
return@flatMap Observable.timer(5, TimeUnit.SECONDS) | |
} else if (error is IOException) { | |
return@flatMap Observable.timer(5, TimeUnit.SECONDS) | |
} else if (error is HttpException) { | |
val httpException = error as HttpException | |
if (httpException.code() == 502) { | |
return@flatMap Observable.timer(5, TimeUnit.SECONDS) | |
} | |
} | |
return@flatMap Observable.error<Any>(error) | |
} | |
} | |
init { | |
observableMap = ConcurrentHashMap() | |
webSocketMap = ConcurrentHashMap() | |
client = OkHttpClient() | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.websocket | |
import okhttp3.WebSocket | |
import okio.ByteString | |
class WebSocketInfo { | |
var webSocket: WebSocket? = null | |
var string: String? = null | |
var byteString: ByteString? = null | |
var isOnOpen = false | |
private set | |
var isOnReconnect = false | |
private set | |
private constructor() {} | |
internal constructor(webSocket: WebSocket?, onOpen: Boolean) { | |
this.webSocket = webSocket | |
isOnOpen = onOpen | |
} | |
internal constructor(webSocket: WebSocket?, mString: String?) { | |
this.webSocket = webSocket | |
string = mString | |
} | |
internal constructor(webSocket: WebSocket?, byteString: ByteString?) { | |
this.webSocket = webSocket | |
this.byteString = byteString | |
} | |
companion object { | |
fun createReconnect(): WebSocketInfo { | |
val socketInfo = WebSocketInfo() | |
socketInfo.isOnReconnect = true | |
return socketInfo | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.websocket | |
import io.reactivex.Observer | |
import io.reactivex.disposables.Disposable | |
import okhttp3.WebSocket | |
import okio.ByteString | |
abstract class WebSocketSubscriber : Observer<WebSocketInfo> { | |
private var hasOpened = false | |
protected var disposable: Disposable? = null | |
override fun onNext(webSocketInfo: WebSocketInfo) { | |
if (webSocketInfo.isOnOpen) { | |
hasOpened = true | |
onOpen(webSocketInfo.webSocket!!) | |
} else if (webSocketInfo.string != null) { | |
onMessage(webSocketInfo.string!!) | |
} else if (webSocketInfo.byteString != null) { | |
onMessage(webSocketInfo.byteString!!) | |
} else if (webSocketInfo.isOnReconnect) { | |
onReconnect() | |
} | |
} | |
/** | |
* Callback when the WebSocket is opened | |
* | |
* @param webSocket | |
*/ | |
protected open fun onOpen(webSocket: WebSocket) {} | |
protected open fun onMessage(text: String) {} | |
protected fun onMessage(byteString: ByteString) {} | |
/** | |
* Callback when the WebSocket is reconnecting | |
*/ | |
protected fun onReconnect() {} | |
protected open fun onClose() {} | |
override fun onSubscribe(disposable: Disposable) { | |
this.disposable = disposable | |
} | |
fun dispose() { | |
disposable?.dispose() | |
} | |
override fun onComplete() { | |
if (hasOpened) { | |
onClose() | |
} | |
} | |
override fun onError(e: Throwable) { | |
e.printStackTrace() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment