ReactiveLiveData
An RxJava Extension for the LiveData observer introduced by Google. Provides the ability to perform single actions using RxJava and takes advantage of an automatic subscription of the Lifecycle owner. Mainly designed to used Room CRUD commands with RxJava.
Why this lib?
Getting Started
Setting up the dependency
implementation 'com.github.bobekos:reactivelivedata:x.x.x'
Usage
Just use one of the available reactiveSource classes:
CompletableReactiveSource
//ViewModel
class UserViewModel(private val dao: UserDao) : ViewModel() {
fun insert(id: Int, name: String): LiveData<Optional<Nothing>> {
return CompletableReactiveSource.fromAction {
dao.insert(UserEntity(id, name))
}
}
}
//Activity/Fragment/etc.
...
//short
viewModel.insert(1, "User").subscribeCompletable(this)
//or with callback
viewModel.insert(1, "Bobekos").subscribeCompletable(this,
//optional
onComplete = {
showToast("User inserted")
},
//optional
onError = {
showToast(it.message)
})
SingleReactiveSource
//ViewModel
class UserViewModel(private val dao: UserDao) : ViewModel() {
fun getFromSingle(id: Int): LiveData<Optional<UserEntity>> {
return SingleReactiveSource.from(dao.getByIdAsSingle(id))
}
}
//Activity/Fragment/etc.
...
viewModel.getFromSingle(1).subscribeSingle(this,
onSuccess = {
showToast("User ${it.username} loaded")
},
//optional
onError = {
showToast(it.message)
})
MaybeReactiveSource
//ViewModel
class UserViewModel(private val dao: UserDao) : ViewModel() {
fun getFromMaybe(id: Int): LiveData<Optional<UserEntity>> {
return MaybeReactiveSource.from(dao.getByIdAsMaybe(id))
}
}
//Activity/Fragment/etc.
...
viewModel.getFromMaybe(1).subscribeMaybe(this,
onSuccess = {
showToast("User ${it.username} loaded")
},
//optional
onError = {
showToast(it.message)
},
//optional
onComplete = {
showToast("No user found")
})
FlowableReactiveSource
//ViewModel
class UserViewModel(private val dao: UserDao) : ViewModel() {
fun getFromFlowable(): LiveData<Optional<UserEntity>> {
return FlowableReactiveSource.from(dao.getUsers())
}
}
//Activity/Fragment/etc.
...
viewModel.getFromFlowable().subscribeFlowable(this,
onNext = {
showToast("User ${it.name} loaded")
},
//optional
onError = {
showToast(it.message)
},
//optional
onComplete = {
showToast("No user found")
})
NullSafe extension for LiveDataReactiveStreams
But without exception support. Use FlowableReactiveStream instead.
//ViewModel
class UserViewModel(private val dao: UserDao) : ViewModel() {
fun loadUser(): LiveData<UserEntity> {
return LiveDataReactiveStreams.fromPublisher(dao.getUsers())
}
}
//Activity/Fragment/etc.
...
//short
viewModel.loadUser().nonNullObserver(this, observer = {
showToast("I'm not null ${it.username}")
})
//or with null callback
viewModel.loadUser().nonNullObserver(this,
observer = {
showToast("I'm observing ${it.username}")
},
//optional
nullObserver = {
showToast("Value is null")
})
Testing
For each reactive source there is a specific test method.
liveData.testCompletableSubscribe(...)
liveData.testMaybeSubscribe(...)
liveData.testSingleSubscribe(...)
liveData.testFlowableSubscribe(...)
Make sure to include the 'InstantTastExecutorRule' (core-testing) into your tests. Furthermore, the default IoSchedulerHandler (or the scheduler which you used) should be overwritten.
@RunWith(JUnit4::class)
class UserViewModelTest {
private inline fun <reified T> lambdaMock(): T = Mockito.mock(T::class.java)
@get:Rule
var rule: TestRule = InstantTaskExecutorRule()
private val userDao = mock(UserDao::class.java)
private val viewModel = UserViewModel(userDao)
@Before
fun setup() {
RxJavaPlugins.setIoSchedulerHandler {
Schedulers.trampoline()
}
}
@Test
fun testGetFromSingleSuccess() {
val testObject = UserEntity(1, "Bobekos")
`when`(userDao.getByIdAsSingle(1)).then { Single.just(testObject) }
val lifecycle = LifecycleRegistry(mock(LifecycleOwner::class.java))
lifecycle.handleLifecycleEvent(Lifecycle.Event.ON_RESUME)
val observer = lambdaMock<(t: UserEntity) -> Unit>()
viewModel.getFromSingle(1).testSingleSubscribe(lifecycle, onSuccess = observer)
verify(observer).invoke(testObject)
}
@Test
fun testGetFromSingleError() {
val testObject = SQLiteConstraintException()
`when`(userDao.getByIdAsSingle(1)).then { Single.error<UserEntity>(testObject) }
val lifecycle = LifecycleRegistry(mock(LifecycleOwner::class.java))
lifecycle.handleLifecycleEvent(Lifecycle.Event.ON_RESUME)
val observer = lambdaMock<(e: Throwable) -> Unit>()
viewModel.getFromSingle(1).testSingleSubscribe(lifecycle, onError = observer)
verify(observer).invoke(testObject)
}
For more tests look into the sample app
Resources and Credits
License
Copyright 2018 Bobek Bobekos
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.