Goodbye Rx, Hello Flow

Most developers use RxJava (or RxKotlin) on Android for two main reasons:

  • switching between ui and background threads
  • observing a continuous stream of data

But there’s a lot of complexity in using Rx and a lot of boilerplate code that you have to keep repeating.

We’ve been able to use Kotlin coroutines for a while now to write asynchronous, non-blocking code, so no need for Rx for this anymore.

And for observing streams of data there is now Kotlin Flow.

Kotlin Flow is an addition to coroutines that adds streams to the native Kotlin language. Similar to Rx, we have to subscribe to a Flow to start receiving data from it (i.e. Kotlin Flows are cold), but because they use coroutines they are lightweight (no threads needed), and the code is sequential so nice and easy to read.


Where would you use it?

Even though my full-time day job is writing apps for other companies, one of my hobbies is writing apps for myself. A personal app I have been working on recently is a multi-player hangman word game, the main reason for writing the app is to perfect a multi-module clean architecture, which I’ve written about in this post.

One of the screens in the app shows the scores of each player in a game, and I want that screen to update in real-time as the players’ scores are updated.


Architecture

When a player finishes a round in a game a Firebase data push message of the player’s score is sent to all the other players in the game. This push message is received in the FirebaseMessagingService service which inserts the new score into the Room database.

All that needs to be done to see this change in the UI is to observe the changes to the database – and this is where Kotlin Flow comes in.

I use an MVVM architecture with a repository pattern. Simplified it looks like this:

Using Flow with Room

To use Flow with the Room database just make the return value of the function in the DAO to be of type Flow. In this case the getGameScores function is returning a Flow of a List of PlayerScore entity records.

@Dao
interface GameDao {

    @Query("SELECT * FROM scores WHERE gameId = :gameId ORDER BY round, score DESC")
    fun getGameScores(gameId: Int): Flow<List<PlayerScore>>
}

Note that the function is not a suspend function. If you use suspend with Flow you’ll get an annotation build error like this

A failure occurred while executing org.jetbrains.kotlin.gradle.internal.KaptExecution

with no clue as to what the actual error is.

I use a data source class in the data layer so that the repository doesn’t talk directly with the DAO, and the data source class implements an interface. This way it separates the repository and data layers cleanly, and makes it easy to unit test the repository. The data source class contains this

@ExperimentalCoroutinesApi
override suspend fun getGameScores(gameId: Int) = database.gameDao().getGameScores(gameId).distinctUntilChanged()

There are three things worthy of note about this one small function:

  • the function is a suspend function
    even though the DAO function didn’t need the suspend keyword (I suspect this is due to the annotation processor automatically adding it for Flow DAO functions)
  • it has a distinctUntilChanged function added to the end of the DAO call
    this prevents a Flow being emitted when changes are made to the table that don’t affect this query
  • it is annotated with @ExperimentalCoroutinesApi
    at the time of writing (July 2020) the distinctUntilChanged behaviour is not final and could change

Repository

The repository will pass the Flow of scores from the data source on to the view model. It also converts the raw database entity objects into a cleaner POJO (or Kotlin data class in this case). The reason it does this conversion is that the database entity classes have annotations related to the Room database and are structured in a way that suits the physical database. The view model only needs to know about business objects.

@ExperimentalCoroutinesApi
override suspend fun getGameScores(gameId: Int): Flow<List<Score>> {
    return localDataStore.getGameScores(gameId).transform { entities ->
        emit(entities.map { it.toScore() })
    }
}

To perform the data mapping we use the transform function. Flow has a whole load of functions like this that are worth checking out. In the transform function we need to emit the transformed value, in this case by using map to convert to another class type. The emit function continues the flow of data.

ViewModel

In the ViewModel we need to get the Flow from the repository and convert it to LiveData so the UI will be updated. To get all the values of a Flow as they are emitted we use the collect function. collect is a suspend function so we need to call it from another suspend function or from a coroutine scope, the easiest way to do that in a view model is by using viewModelScope.

val scores = MutableLiveData<List<Score>>()

viewModelScope.launch {
    repositoryGame.getGameScores(gameId).collect {
        scores.value = it
    }
}

Where the scores LiveData is being observed in the view (in real code you shouldn’t expose a mutable LiveData to the view like this).

That’s the basics of using Kotlin Flow to emit changes to the Room database, collect the data in the ViewModel and publish it to the UI. To improve the UX in a real app, you can catch errors on the flow, and emit a value when we start collecting:

    repositoryGame.getGameScores(gameId)
            .onStart { /* set on loading LiveData */ }
            .catch { /* set error LiveData */ }
            .collect { }

I’ve used this in my latest personal app, I found it works very well, was painless to implement, and would definitely recommend it.


Leave a Reply

Your email address will not be published. Required fields are marked *


The reCAPTCHA verification period has expired. Please reload the page.