ReactiveLiveData

Additional

Language
Kotlin
Version
v1.0.3 (Sep 4, 2018)
Created
Jul 13, 2018
Updated
Sep 4, 2018 (Retired)
Owner
Bobek Bobekos (bobekos)
Contributor
Bobek Bobekos (bobekos)
1
Activity
Badge
Generate
Download
Source code

ReactiveLiveData

An RxJava Extension for the LiveData observer introduced by Google. Provides the ability to perform single actions using RxJava and takes advantage of an automatic subscription of the Lifecycle owner. Mainly designed to used Room CRUD commands with RxJava.

Why this lib?

Medium article

Getting Started

Setting up the dependency

implementation 'com.github.bobekos:reactivelivedata:x.x.x'

Usage

Just use one of the available reactiveSource classes:

CompletableReactiveSource

//ViewModel
class UserViewModel(private val dao: UserDao) : ViewModel() {

    fun insert(id: Int, name: String): LiveData<Optional<Nothing>> {
        return CompletableReactiveSource.fromAction {
            dao.insert(UserEntity(id, name))
        }
    }
}

//Activity/Fragment/etc.
...
//short
viewModel.insert(1, "User").subscribeCompletable(this)
//or with callback
viewModel.insert(1, "Bobekos").subscribeCompletable(this,
                    //optional
                    onComplete = {
                        showToast("User inserted")
                    },
                    //optional
                    onError = {
                        showToast(it.message)
                    })

SingleReactiveSource

//ViewModel
class UserViewModel(private val dao: UserDao) : ViewModel() {

    fun getFromSingle(id: Int): LiveData<Optional<UserEntity>> {
        return SingleReactiveSource.from(dao.getByIdAsSingle(id))
    }
}

//Activity/Fragment/etc.
...
viewModel.getFromSingle(1).subscribeSingle(this,
                    onSuccess = {
                        showToast("User ${it.username} loaded")
                    },
                    //optional
                    onError = {
                        showToast(it.message)
                    })

MaybeReactiveSource

//ViewModel
class UserViewModel(private val dao: UserDao) : ViewModel() {

    fun getFromMaybe(id: Int): LiveData<Optional<UserEntity>> {
        return MaybeReactiveSource.from(dao.getByIdAsMaybe(id))
    }
}

//Activity/Fragment/etc.
...
viewModel.getFromMaybe(1).subscribeMaybe(this,
                    onSuccess = {
                        showToast("User ${it.username} loaded")
                    },
                    //optional
                    onError = {
                        showToast(it.message)
                    },
                    //optional
                    onComplete = {
                        showToast("No user found")
                    })

FlowableReactiveSource

//ViewModel
class UserViewModel(private val dao: UserDao) : ViewModel() {

    fun getFromFlowable(): LiveData<Optional<UserEntity>> {
        return FlowableReactiveSource.from(dao.getUsers())
    }
}

//Activity/Fragment/etc.
...
viewModel.getFromFlowable().subscribeFlowable(this,
                    onNext = {
                        showToast("User ${it.name} loaded")
                    },
                    //optional
                    onError = {
                        showToast(it.message)
                    },
                    //optional
                    onComplete = {
                        showToast("No user found")
                    })

NullSafe extension for LiveDataReactiveStreams

But without exception support. Use FlowableReactiveStream instead.

//ViewModel
class UserViewModel(private val dao: UserDao) : ViewModel() {

    fun loadUser(): LiveData<UserEntity> {
        return LiveDataReactiveStreams.fromPublisher(dao.getUsers())
    }
}

//Activity/Fragment/etc.
...
//short
viewModel.loadUser().nonNullObserver(this, observer = {
            showToast("I'm not null ${it.username}")
        })
//or with null callback
viewModel.loadUser().nonNullObserver(this,
                observer = {
                    showToast("I'm observing ${it.username}")
                },
                //optional
                nullObserver = {
                    showToast("Value is null")
                })

Testing

For each reactive source there is a specific test method.

liveData.testCompletableSubscribe(...)
liveData.testMaybeSubscribe(...)
liveData.testSingleSubscribe(...)
liveData.testFlowableSubscribe(...)

Make sure to include the 'InstantTastExecutorRule' (core-testing) into your tests. Furthermore, the default IoSchedulerHandler (or the scheduler which you used) should be overwritten.

@RunWith(JUnit4::class)
class UserViewModelTest {
    
    private inline fun <reified T> lambdaMock(): T = Mockito.mock(T::class.java)
    
    @get:Rule
    var rule: TestRule = InstantTaskExecutorRule()
    
    private val userDao = mock(UserDao::class.java)
    private val viewModel = UserViewModel(userDao)
    
    @Before
    fun setup() {
        RxJavaPlugins.setIoSchedulerHandler {
            Schedulers.trampoline()
        }
    }
    
    @Test
    fun testGetFromSingleSuccess() {
        val testObject = UserEntity(1, "Bobekos")

        `when`(userDao.getByIdAsSingle(1)).then { Single.just(testObject) }

        val lifecycle = LifecycleRegistry(mock(LifecycleOwner::class.java))
        lifecycle.handleLifecycleEvent(Lifecycle.Event.ON_RESUME)

        val observer = lambdaMock<(t: UserEntity) -> Unit>()

        viewModel.getFromSingle(1).testSingleSubscribe(lifecycle, onSuccess = observer)

        verify(observer).invoke(testObject)
    }
    
    @Test
    fun testGetFromSingleError() {
        val testObject = SQLiteConstraintException()

        `when`(userDao.getByIdAsSingle(1)).then { Single.error<UserEntity>(testObject) }

        val lifecycle = LifecycleRegistry(mock(LifecycleOwner::class.java))
        lifecycle.handleLifecycleEvent(Lifecycle.Event.ON_RESUME)

        val observer = lambdaMock<(e: Throwable) -> Unit>()

        viewModel.getFromSingle(1).testSingleSubscribe(lifecycle, onError = observer)

        verify(observer).invoke(testObject)
    }

For more tests look into the sample app

Resources and Credits

License

Copyright 2018 Bobek Bobekos

Licensed under the Apache License, Version 2.0 (the "License"); 
you may not use this file except in compliance with the License. 
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and 
limitations under the License.