RxWebSocket

General

Category
Free
Tag
Networking
License
MIT License
Registered
Jun 26, 2020
Favorites
1
Link
https://github.com/FutureMind/rxwebsocket
See also
DMNetworking
cling
Android Networking
Trustkit-Android
Velocity

Additional

Language
Kotlin
Version
2.0 (Jul 11, 2020)
Created
May 25, 2020
Updated
Jul 11, 2020
Owner
Future Mind (FutureMind)
Contributors
Michał Klimczak (micHar)
Vladislav P (vladp)
2
Activity
Badge
Generate
Download
Source code
APK file

Announcement

RxWebSocket

RxWebSocket is a simple, lightweight, reactive wrapper around OkHttp WebSocket, inspired by RxAndroidBle.

Instead of implementing a WebSocketListener like you would normally do with WebSocket, you can subscribe to it and when it is connected, subscribe to its messages. When you're done with the connection, you can simply unsubscribe and it takes care of closing the connection for you.

Usage

TL;DR, take me to a real life example.

Connecting

Simply prepare you OkhttpClient and a regular okhttp3.Request.

val okHttpClient = OkHttpClient.Builder().build()
val request = Request.Builder().url(...).build()

RxWebSocket(okHttpClient, request)
    .connect()
    .subscribe()

Socket states

The RxWebSocket.connect() returns a Flowable with different socket states to subscribe to.

Rx event State Description
onNext Connecting Returned immediately after subscribing. You can start sending at this stage and the messages will be queued for when the socket connects. Corresponds to the state before onOpen in WebSocketListener.


onNext Connected Returned when the socket has successfully opened connection. You can send messages while in this state. You can subscribe to Corresponds to onOpen in WebSocketListener.messageFlowable or byteMessageFlowable in this state.



onNext Disconnecting Corresponds to onClosing in WebSocketListener.
onNext Disconnected Corresponds to onClosed in WebSocketListener. Right after this event, onCompleted is published.
onCompleted - The Flowable is completed right after Disconnected (onClosed).
onError - The Flowable signals SocketConnectionException which contains the original exception that caused the issue and appropriate okhttp3.Response.Corresponds to onFailure in WebSocketListener.

So the whole flow can look something like this.

RxWebSocket(okHttpClient, request)
    .connect()
    .switchMap { state ->when(state) {
            isSocketState.Connecting->TODO()
            isSocketState.Connected->TODO()
            SocketState.Disconnecting->TODO()
            SocketState.Disconnected->TODO()
        }
    }
    .doOnError { TODO("Handle socket connection failed") }
    .doOnComplete { TODO("Handle socket connection closed gracefully") }
    .subscribe()


It's good to use switchMap here to make sure that when the state changes, you unsubscribe from e.g. Connected.messageFlowable.

Sending messages

Connecting and Connected implement SendCapable. In both these states you can send messages, although in Connecting the messages will be queued and sent when it's possible (okhttp exposes such mechanism).

RxWebSocket(okHttpClient, request)
    .connect()
    .ofType(SocketState.SendCapable::class.java)
    .flatMapCompletable { state ->
        Completable.fromCallable { state.send("Hello world") }
    }
    .subscribe()

Receiving messages

RxWebSocket(okHttpClient, request)
    .connect()
    .switchMap { state ->
        when (state) {
            is SocketState.Connected -> state.messageFlowable()
            else -> Flowable.never()
        }
    }
    .subscribe { msg -> handleMessage(msg) }

Dealing with disconnection

Because socket failures cause flowable error and graceful disconnections cause it to complete, you can leverage the power of RxJava's retry and repeat functions to implement your reconnection logic in a very elegant way.

RxWebSocket(okHttpClient, request)
    .connect()
    .retryWhen { it.delay(3, TimeUnit.SECONDS) }
    .subscribe()

Real life example

In real life, you will probably have some ViewModel which observes incoming messages to pass them to UI and also accepts new messages e.g. incoming from some input field. Here's a sample implementation of such ViewModel (you can aso find it in sample directory).

class MainViewModel : ViewModel() {

    private val okHttpClient = OkHttpClient.Builder().build()
    private val request = Request.Builder().url("wss://echo.websocket.org").build()

    private val rxSocket = RxWebSocket(okHttpClient, request)

    private val socketConnection: Flowable<SocketState> = rxSocket
        .connect()
        .retryWhen { it.delay(3, TimeUnit.SECONDS) }
        .replay(1)
        .autoConnect()

    private val outgoingMessagesProcessor = PublishProcessor.create<String>()

    private val outgoingMessagesDisposable = socketConnection
        .ofType(SocketState.SendCapable::class.java)
        .switchMap { state ->
            outgoingMessagesProcessor.doOnNext { state.send(it) }
        }
        .subscribe()

    fun observeSocketState(): Flowable<SocketState> = socketConnection

    fun observeMessages(): Flowable<String> = socketConnection
        .ofType(SocketState.Connected::class.java)
        .switchMap { it.messageFlowable() }

    fun sendMessage(message: String) = outgoingMessagesProcessor.onNext(message)

    override fun onCleared() {
        rxSocket.disconnect(1000, "")
        outgoingMessagesDisposable.dispose()
    }

}

Let's break it down piece by piece.

First, we prepare our RxWebSocket for connection.

private val okHttpClient = OkHttpClient.Builder().build()
private val request = Request.Builder().url("wss://echo.websocket.org").build()

private val rxSocket = RxWebSocket(okHttpClient, request)

Then we introduce our connection flowable.

privateval socketConnection:Flowable<SocketState>= rxSocket .connect() .retryWhen { it.delay(3, TimeUnit.SECONDS) } .replay(1) .autoConnect()

.replay(1).autoConnect() is used to multicast our flowable so that our socket connetion can be easily used by different subscribers. replay(1) makes sure that whenever new subscriber arrives, he immediately receives the current state the socket is in.

Notice the


retryWhen - it is used to make sure, that whenever our connection breaks it is reconnected. Of course this is a very simple example, you can use a much more sophisticated reconnection logic in your retry operator.privateval outgoingMessagesProcessor =PublishProcessor.create<String>() privateval outgoingMessagesDisposable = socketConnection .ofType(SocketState.SendCapable::class.java) .switchMap { state -> outgoingMessagesProcessor.doOnNext { state.send(it) } } .subscribe()

This code is responsible for processing incoming messages and sending them through the websocket.

This is the interface of our

ViewModel which should be pretty self-explanatory.funobserveSocketState():Flowable<SocketState>= socketConnection funobserveMessages():Flowable<String>= socketConnection .ofType(SocketState.Connected::class.java) .switchMap { it.messageFlowable() } funsendMessage(message:String) = outgoingMessagesProcessor.onNext(message)

Last but not least, remeber to disconnect from websocket when you're done with it. Calling

disconnect gracefully closes the socket and completes our socketConnection.overridefunonCleared() { rxSocket.disconnect(1000, "") outgoingMessagesDisposable.dispose() }