Android

[Kotlin] livedata를 떠나보내고 코틀린 stateflow 나 sharedflow를 써야 할까요?

StanleyKou 2021. 1. 23. 23:17

이 글은 아래 글은 번역한 것입니다.

proandroiddev.com/should-we-choose-kotlins-stateflow-or-sharedflow-to-substitute-for-android-s-livedata-2d69f2bd6fa5

 

Substituting Android's LiveData: StateFlow or SharedFlow?

Kotlin Coroutines recently introduced two Flow types, SharedFlow and StateFlow, and Android's community started wondering about…

proandroiddev.com

원 글 저자:

Patrick Steiger

Professional Android Developer. Working @ CESAR / Motorola. Indie developer on free time. Passionate about programming. https://github.com/psteiger

 

 

 

Kotlin Coroutines recently introduced two Flow types, SharedFlow and StateFlow, and Android’s community started wondering about the possibilities and implications of substituting LiveData with one of those new types, or both. The two main reasons for that are:

 

1.LiveData is closely bound to UI (no natural way to offload work to worker threads), and

2. LiveData is closely bound to the Android platform.

 

 

코틀린 코루틴은 최근 두 가지 Flow 타입을 소개했습니다. SharedFlow 와 StateFlow가 그것입니다. 안드로이드 커뮤니티에서는 이것들을 이용해서 LiveData를 대체할 수 있을지에 대한 기대가 생기기 시작했습니다. 두 가지 중요한 이유가 있는데요, 그것은

 

1. LiveData 는 UI에 밀접하게 붙어있고 (UI가 없는 워크 쓰레드에서 라이브데이터를 쓸 수 있는 자연스러운 방식이 없고),

2. LiveData 는 안드로이드 플랫폼에 밀접하게 붙어있기 때문입니다.

 


We can conclude from those two facts that, in Clean Architecture terms, while LiveData works fine for the Presentation Layer, it does not fit well in the Domain Layer, which should ideally be platform-independent (meaning a pure Kotlin/Java module); and it does not fit very well in the Data Layer either (Repositories implementations and Data Sources), as we usually should offload data access work to worker threads.

 

 

우리는 이 두 가지 사실에 기반하여, (클린 아키텍쳐 용어인) 프레젠테이션 레이어에서는 LiveData가 잘 동작할 것이라는 결론을 내릴  수 있지만, (역시 클린 아키텍쳐 용어인) 도메인 레이어에서 쓰기에는 적절치 않다는 결론을 내릴 수 있습니다. 도메인 레이어는 플랫폼에서 떨어져 있는 (=플랫폼과 무관한, 순수한 코틀린/자바 모듈) 것이 이상적이기 때문입니다. 또한 (마찬가지로 클린 아키텍쳐 용어인) 데이터 레이어에서 쓰기에도 적합하지 않습니다. (레포지토리와 데이터 소스를 말합니다) 일반적으로는 워커 쓰레드에서 (UI 없이) 동작하기 때문입니다.

 

 


We could not just substitute LiveData with pure Flow, though. The main issues with using pure Flow as a LiveData substitute on all app layers are that:

 

1. Flow is stateless (no .value access).

 

2. Flow is declarative (cold): a flow builder merely describes what the flow is, and it is only materialized when collected. However, a new Flow is effectively run (materialized) for each collector, meaning upstream (expensive) database access is redundantly and repeatedly run for each collector.

 

3.Flow, by itself, does not know anything about Android lifecycles, and does not provide automatic pausing and resuming of collectors upon Android lifecycle state changes.

 

 

단순히 LiveData를 Flow로 대체할 수는 없습니다. 순수한 Flow로 LiveData를 대체할 때의 각 레이어에서 발생하는 주요 문제는 아래와 같습니다. 

 

1. Flow 는 상태가 없음 (.value 와 같은 형태로 값을 얻을 수 없음).

 

2. Flow 는 선언적임 (콜드 - 연속해서 계속 들어오는 데이터를 처리할 수 없음): flow builder 는 Flow 가 무엇인지 설명하고 있는데, collect 되었을 때만 "생성되고 값을 반환"(materialized)합니다. 하지만, 새로운 Flow가 성공적으로 실행되면 (materialized) 값을 받는 쪽 (collector) 한 개 마다 하나씩 데이터를 호출합니다. 그 말은 업스트림 (비싼 네트워크 호출 등) 데이터베이스 접근 등의 요청이 collector 마다 한번 씩, 결국 모두 합치면 여러 번 호출된다는 뜻입니다.

 

3. Flow 스스로는 안드로이드 라이브사이클에 대해 알지 못함. 안드로이드 라이프사이클에 따라 중지되거나 재개되지 못합니다.


Those are not to be viewed as pure Flow intrinsic defects: those are just characteristics that makes it not fit well as a LiveData substitute, but can be powerful in other contexts.

For (3), we could already use LifecycleCoroutineScope extensions such as launchWhenStarted for launching coroutines to collect our flows — those collectors will automatically be paused and resumed in sync with the component's Lifecycle.

 

Note: in this text, we use collecting and observing as synonymous concepts. Collecting is the preferred term for Kotlin Flows (we collect a Flow), observing is the preferred term for Android's LiveData (we observe a LiveData).

But what about (1) — acessing current state, and (2) — materializing just once for N >= 1 collectors, and dematerializing for 0 collector?

 

Now, SharedFlow and StateFlow provide a solution for both of those issues.

 

 

 

이것들을 Flow의 근본적인 오류로 보는 것은 적절하지  않습니다. 단지 LiveData를 대체하기에는 적합하지 않다는 뜻이고, 다른 곳에 쓰기에는 여전히 강력합니다.

(3) 의 이유 때문에, 우리는 이미 LifecycleCoroutineScope 확장을 쓰고 있는데, launchWhenStarted같은 메소드를 flow결과 값 collect에 사용하고 있습니다. 이 Collector는 라이프사이클에 따라 자동으로 일시중지되거나 재시작 됩니다.

 

일러두기: 이 글에서, 우리는 collecting 과 observing 을 같은 용어로 쓸 것입니다. Collection은 코틀린 Flow의 용어이고, observing 은 안드로이드 LiveData의 용어입니다.

 

하지만 (1) - 현재 상태에 접근하기 (2) - 여러 개의 (N >=1) collector가 있더라도 리소스 요청은 한 번만 하기, 그리고 collector가 없다면 리소스를 요청하지 않기를 어떻게 할 수 있을까요?

 

 이제 SharedFlow 와 StateFlow가 이 두 가지 이슈에 대한 해답을 줄 것입니다.

 


A practical example

Let's exemplify with a practical use-case. Our use-case is fetching nearby locations. We'll assume a Firebase Realtime Database is used alongside the GeoFire library, which allows for querying nearby locations.

 

실전 예제

 

실제로 사용되는 경우를 한 번 살펴보겠습니다. 여기서 다룰 문제는, 근접했을 때의 위치 값 얻기 입니다. 우리는 Firebase 실시간 데이터베이스를 GeoFire 라이브러리와 함께 사용할 것이고, 이를 이용해서 주변에 있는 장소의 위치를 얻을 것입니다.

 

 

Using LiveData end-to-end

 

 

Let us begin by showcasing the use of LiveData from the data source all the way to our view. The Data Source is responsible for connecting to the Firebase Realtime Database through a GeoQuery. When we receive a onGeoQueryReady() or onGeoQueryError(), we update the LiveData value with the aggregate of the locations entered, exited or moved since the last onGeoQueryReady() .

 

라이브데이터를 처음부터 끝까지 사용하기

 

데이터를 받아온 다음 뷰에 그릴 때 까지, 모든 부분에서 LiveData를 사용하는 예시를 한 번 보겠습니다. 여기에서 Data source 는 GeoQuery 라이브러리를 이용해 Firebase에서 데이터를 받아오는 부분입니다. 우리가 onGeoQueryReady() 또는 onGeoQueryError() 콜백에서 결과 값을 받으면, 우리는 그 결과 값을 모두 LiveData에 업데이트 할 것입니다. onGeoQueryReady 이후 "해당위치에 진입", "해당위치에서 나감", "이동 중" 같은 연속적인 값이 계속 LiveData에 업데이트 될 것입니다.

 

@Singleton
class NearbyUsersDataSource @Inject constructor() {
    // Ideally, those should be constructor-injected.
    val geoFire = GeoFire(FirebaseDatabase.getInstance().getReference("geofire"))
    val geoLocation = GeoLocation(0.0, 0.0)
    val radius = 100.0
    
    val geoQuery = geoFire.queryAtLocation(geoLocation, radius)
    
    // Listener for receiving GeoLocations
    val listener: GeoQueryEventListener = object : GeoQueryEventListener {
        val map = mutableMapOf<Key, GeoLocation>()
        override fun onKeyEntered(key: String, location: GeoLocation) {
            map[key] = location
        }
        override fun onKeyExited(key: String) {
            map.remove(key)
        }
        override fun onKeyMoved(key: String, location: GeoLocation) {
            map[key] = location
        }
        override fun onGeoQueryReady() {
            _locations.value = State.Ready(map.toMap())
        }
        override fun onGeoQueryError(e: DatabaseError) {
            _locations.value = State.Error(map.toMap(), e.toException())
        }
    }

    // Listen for changes only while observed
    private val _locations = object : MutableLiveData<State>() {
        override fun onActive() {
            geoQuery.addGeoQueryEventListener(listener)
        }

        override fun onInactive() {
            geoQuery.removeGeoQueryEventListener(listener)
        }
    }

    // Expose read-only LiveData
    val locations: LiveData<State> by this::_locations
    
    sealed class State(open val value: Map<Key, GeoLocation>) {
        data class Ready(
            override val value: Map<Key, GeoLocation>
        ) : State(value)
        
        data class Error(
            override val value: Map<Key, GeoLocation>,
            val exception: Exception
        ) : State(value)
    }
}

 

Our Repository, ViewModel and Activity should then be as simple as:

 

Repository, ViewModel, Activity는 아래와 같이 최대한 간단하게 구현됩니다:

 

@Singleton
class NearbyUsersRepository @Inject constructor(
    nearbyUsersDataSource: NearbyUsersDataSource
) {
    val locations get() = nearbyUsersDataSource.locations
}
class NearbyUsersViewModel @ViewModelInject constructor(
    nearbyUsersRepository: NearbyUsersRepository
) : ViewModel() {

    val locations get() = nearbyUsersRepository.locations
}
@AndroidEntryPoint
class NearbyUsersActivity : AppCompatActivity() {
    
    private val viewModel: NearbyUsersViewModel by viewModels()
    
    override fun onCreate(savedInstanceState: Bundle?) {
        viewModel.locations.observe(this) { state: State ->
            // Update views with the data.   
        }
    }
}

This approach may work fine, until you decide to make the Domain Layer, which contain the Repository interfaces, platform independent (as it should be). Also, once you need to offload work to worker threads on Data Sources, you will see there is no easy, idiomatic way with LiveData.

 

 

이런 접근 방식을 쓰더라도 동작은 잘 될 것입니다. 도메인 레이어를 만들기로 결정하기 전까지는 말이죠. 도메인 레이어는 Repository 인터페이스를 가지고 있고, 플랫폼과 무관합니다. (그래야만 합니다) 또한, 데이터 소스에서 작업 쓰레드를 이용하여 별도 작업을 해야할 때는 기존에 LiveData를 쓰는 관례적인 방법대로는 그리 쉽지 않은 것을 발견하게 될겁니다.

 

Using flows on Data Source and Repository

 

 

Let us convert our Data Source to use Flow . We have a flow buildercallbackFlow {}, that converts a callback to a cold Flow. When this Flow is collected, it runs the code block passed to the flow builder, adds the GeoQuery listener and reaches awaitClose {}, where it suspends until the Flow is closed (that is, until no one is collecting, or until it is cancelled for whatever uncaught exception). When closed, it removes the listener, and the flow is dematerialized.

 

 

Flow를 데이터소스와 Repository에 사용하기

Data Source가 Flow를 이용하도록 바꿔 보겠습니다. 우리는  flow buildercallbackFlow{} 를 쓸 수 있는데, 이것은 콜백 결과를  콜드 Flow 하나로 변환할 수 있습니다. Flow가 collect 되면, flow builder로 전달되었던 코드 블록을 실행하며, GeoQuery리스너를 추가하고 awaitClose {} 를 실행하게 됩니다. 이 코드는 Flow가 닫히기 전까지는 계속 멈춰있게 (suspend) 됩니다. (멈춰있다는 것은, 아무도 collect를 하지 않았거나, 아무런 에러도 없어서 캔슬되지도 않았기 때문이라는 뜻입니다). Flow가 닫히게 되면, 등록되었던 리스너가 제거되고 flow는 사라집니다.

 

@Singleton
class NearbyUsersDataSource @Inject constructor() {
    // Ideally, those should be constructor-injected.
    val geoFire = GeoFire(FirebaseDatabase.getInstance().getReference("geofire"))
    val geoLocation = GeoLocation(0.0, 0.0)
    val radius = 100.0
    
    val geoQuery = geoFire.queryAtLocation(geoLocation, radius)
    
    private fun GeoQuery.asFlow() = callbackFlow {
        val listener: GeoQueryEventListener = object : GeoQueryEventListener {
            val map = mutableMapOf<Key, GeoLocation>()
            override fun onKeyEntered(key: String, location: GeoLocation) {
                map[key] = location
            }
            override fun onKeyExited(key: String) {
                map.remove(key)
            }
            override fun onKeyMoved(key: String, location: GeoLocation) {
                map[key] = location
            }
            override fun onGeoQueryReady() {
                emit(State.Ready(map.toMap()))
            }
            override fun onGeoQueryError(e: DatabaseError) {
                emit(State.Error(map.toMap(), e.toException()))
            }
        }
        
        addGeoQueryEventListener(listener)
        
        awaitClose { removeGeoQueryEventListener(listener) }
    }

    val locations: Flow<State> = geoQuery.asFlow()
    
    sealed class State(open val value: Map<Key, GeoLocation>) {
        data class Ready(
            override val value: Map<Key, GeoLocation>
        ) : State(value)
        
        data class Error(
            override val value: Map<Key, GeoLocation>,
            val exception: Exception
        ) : State(value)
    }
}

 

Our Repository and ViewModel warrants no changes, but our Activity now receives a Flow and not a LiveData , so it needs to adapt: instead of observing the LiveData , we will collect the Flow.

 

Repository와 ViewModel는 전혀 변경되는 부분이 없지만, Activity는 이제 LiveData가 아닌 Flow를 받게되기 때문에 약간의 수정이 필요합니다. LiveData를 observe하는 대신, Flow를 collect할 것입니다.

 

@AndroidEntryPoint
class NearbyUsersActivity : AppCompatActivity() {
    
    private val viewModel: NearbyUsersViewModel by viewModels()
    
    override fun onCreate(savedInstanceState: Bundle?) {
        lifecycleScope.launchWhenStarted {
            viewModel.locations.collect {
                // Update views with the data.   
            }
        }
    }
}

 

We use launchWhenStarted {} to collect the Flow so the coroutine will be automatically started only when the Activity reaches the onStart() lifecycle state, and will be automatically paused when it reaches the onStop() lifecycle state. This is akin to the automatic handling of Lifecycle that LiveData gives us.

Note: You might choose to keep using LiveData in your Presentation Layer (Activity). In that case, you can easily convert from Flow to LiveData in the ViewModel by using Flow<T>.asLiveData() extension function. This decision has consequences that we'll talk about in the next session, and we'll show that using SharedFlow and StateFlow end-to-end is more versatile and might fit better in your architecture.

우리가 Flow를 collect하기 위해 launchWhenStarted{} 를 사용했기 때문에, 코루틴은 ActivityonStart() 라이프사이클 상태에 도달했을 때 자동으로 실행될 것입니다. 그리고 다시 라이프사이클 상태가 onStop()에 도달하면 멈출 것입니다. 이것은 LiveDataLifeCycle을 자동으로 처리해주는 것과 똑같은 동작입니다.

 

What are the issues with using Flow in the View Layer?

 

The first problem with this approach is the handling of the Lifecycle, which LiveData does automatically for us. We achieved a similar behavior through the use of launchWhenStarted {} in the example above.

But there’s another problem: because the Flow is declarative and is only run (materialized) upon collection, if we have multiple collectors, a new flow will be run for each collector, completely independent from each other. Depending on the operations done, such as database or network operations, this can be very ineffective. It can even result in erroneous states, if we expect the operations to be done only once for correctness. In our practical example, we would have one new GeoQuery listener added for each collector — possibly not a critical issue, but certainly a waste of memory and CPU cycles.

Note: If you convert your Repository Flow to LiveData by using Flow<T>.asLiveData() in the ViewModel, the LiveData becomes the sole collector for the Flow , and no matter how many observers in the Presentation Layer, only one Flow will be collected. However, for that architecture to work well, you’ll need to guarantee every other component of yours access your LiveData from the ViewModel, and never the Flow directly from the Repository. This can prove itself a challenge depending on how decoupled your app is: all components that need the Repository, such as Interactors (use-cases) implementations, would now depend on the Activity instance to get the ViewModel instance, and the scope of those components would need to be limited accordingly.

We only want one GeoQuery listener, no matter how many collectors in the View Layer we have. We can achieve this by sharing the flow between all collectors.

 

 

View Layer에서 Flow를 사용했을 때 생기는 문제점은 무엇인가요?

 

이 방식의  첫 번째 문제점은, 라이프사이클을 처리할 때 발생합니다. (LiveData는 그걸 자동으로 처리해 주었지만...) 우리는 LiveData와 유사한 동작을 launchWhenStarted{} 를 이용하여 위에서 구현했었습니다.

하지만 다른 문제가 있습니다. Flow는 선언적이고, Collect 될 때만 실행됩니다 (Materialized). 만약 우리가 여러 개의 colletor를 가지고 있다면, 새로운 Flow 하나가 실행될 때 마다 다른 모든 Collector가 동작할 것입니다. 그것도 완벽히 독립적으로, 각각 실행될 것입니다.

어떤 동작이 완료되는 것을 기다리는 작업이라면, 그리고 그 동작이 데이터베이스 결과 값이거나 네트워크에서 얻어오는 결과 값이라면, 이것은 굉장히 비효율적인 작업입니다. 게다가 이 결과는, 결과값에 대해 한 번만 에러체크를 한다면 (역주: 한 번의 요청에 동일한 결과 값이 여러 번 오게 되니) 굉장히 이상한 상태가 될 수 있습니다. 우리의 연습예제에서는 이와 같이 각 Collector마다 하나의 listener를 추가해야만 했습니다. 아마 큰 이슈가 아닐 수도 있겠지만, 아마 메모리와 CPU를 낭비하게 될 것 입니다.

 

일러두기: 만약 Repository Flow를 Flow<T>.asLiveData() 를 통해 라이브데이터로 변결한다면, 라이브데이터는 이 Flow의 유일한 Collector가 될 것입니다. 얼마나 많은 Observer가 Presentation Layer에 있든, 오직 하나의 Flow만이 Collect 될 것입니다. 하지만, 이 구조가 잘 동작하기 위해서는 이 결과값을 원하는 다른 컴포넌트에서는 결과값을 "ViewModel에 있는 LiveData" 에서 가져가야 합니다. 절대 Repository 에 있는 Flow에서 가져가면 안됩니다. 당신의 앱이 잘 디커플링 되어있는지를 여기서 확인할 수 있을겁니다. 그 Repository에 의존하고 있는 다른 Interactor들 (use-casee들) 의 구현체는 이제 Repository가 아닌 Activity에 의존하게 될 것이고, 거기에서 ViewModel을 가져가게 될 것입니다. (역주: 오직 한번만 호출하게 하는 것이 목적이기 때문에, ViewModel을 따로 생성하는 것도 안됩니다. 생성되어 있는 ViewModel을 가져가기 위해 Activity에 의존할 수 밖에 없습니다!) 그렇기 때문에, 해당 컴포넌트의 스코프는 상당히 제한될 것입니다.

 

우리는 하나의  GeoQuery Listener를 생성하고 싶고, View Layer에 Collector가 얼마나 많이 있든 간에 오직 그 Listner 하나만을 유지하고 싶습니다. 우리는 sharing을 통해 목표를 달성할 수 있습니다. 이 sharing을 통해 모든 Collector가 결과를 공유하게 할 수 있습니다.

 


SharedFlow to the rescue

SharedFlow is a Flow that allows for sharing itself between multiple collectors, so that only one flow is effectively run (materialized) for all of the simultaneous collectors. If you define a SharedFlow that accesses databases and it is collected by multiple collectors, the database access will only run once, and the resulting data will be shared to all collectors.

 

StateFlow can also be used to achieve the same behavior: it is a specialized SharedFlow with .value (it’s current state) and specific SharedFlow configurations (constraints). We’ll talk about those constraints later.

 

We have an operator for transforming any Flow into a SharedFlow :

 

fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T> (source)

 

도와줘요 SharedFlow

SharedFlow 는 Flow 종류 중 하나로서, 그 자신을 여러 개의 Collector에 공유하기 때문에 오직 하나의 Flow만을 실행 (materized) 하게 해줍니다. 아무리 많은 Collector가 있어도 말이죠. 만약 SharedFlow 한 개를 정의하고 데이터베이스 결과 값을 공유하게 한 뒤, 여러 개의 Collector를 달아 준다면 데이터베이스 접근은 오직 한 번만 일어날 것입니다. 그리고 그 결과는 모든 Collector 들에게 공유될 것입니다.

 

StateFlow도 같은 동작을 하기 위해 사용될 수 있습니다. 이것은 특수한 SharedFlow 로서, ".value" 를 가지고 있습니다. (현재 값 반환) 그리고 SharedFlow의 설정값을 정확하게 정의합니다 (제약사항). 이 제약사항에 대해서는 나중에 얘기하겠습니다. 

 

Flow를 SharedFlow로 변환할 수 있는 오퍼레이터는 아래와 같습니다.

fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T> (source)

 


 

Let's apply this to our Data Source.

 

The scope is where all computation for materializing the Flow will be done. As our Data Source is a @Singleton, we can use the application process’ LifecycleScope , which is a LifecycleCoroutineScope that is created upon process creation and is only destroyed upon process destruction.

 

 

이 코드를 우리의 Data Source에 적용해 봅시다.

 

이 동작의 스코프는 실행중인 (Materializing) 모든 계산 Flow가 완료될 때 까지입니다. 우리의 Data Source가 @Singleton이기 때문에, 우리는 어플리케이션 프로세스의 라이프 사이클 범위를 사용할 수 있습니다. 즉 LifecycleCoroutineScope은 프로세스가 생성될 때 함께 생성되고, 프로세스가 종료될 때만 함께 종료됩니다.

 


For the started parameter, we can use SharingStarted.WhileSubscribed(), which makes our Flow start sharing (materializing) only when the number of subscribers turns from 0 to 1, and stop sharing when the number of subscribers turns from 1 to 0. This is akin to the LiveData behavior we implemented earlier by adding the GeoQuery listener in the onActive() callback and removing the listener on the onInactive() callback. We could also configure it to be started eagerly (immediately materialized and never dematerialized) or lazily (materialized when first collected, and never dematerialized), but we do want it to stop upstream database collection when not being collected downstream.

Note on terminology: just as we use the term observer for LiveData and collector for cold flows, we use the term subscriber for SharedFlow.

For the replay parameter, we can use 1: new subscribers will get the last emitted value immediately upon subscription.

 

 

started 파라메터를 위해, 우리는 SharingStarted.WhileSubscribed()를 사용할 수 있는데, 이것은 Collector 가 0개 에서 1개가 되는 순간에 우리의 Flow의 공유를 시작할 수 있게 해 주며, (materializing) Collector가 0개가 되는 순간 공유를 멈춥니다. 이 동작은 LiveData의 동작과 유사한데, 우리가 GeoQuery에 추가했었던, onActive() 콜백을 받았을 때 리스너를 추가하고, onInactive()일 때 리스너를 제거하는 방식과 비슷합니다. 우리는 이것을 좀 더 적극적으로 시작시킬 수도 있고 (즉시 생성된 후 절대로 제거되지 않도록) 게으르게(lazy) (첫 번째로 결과값을 받았을 때 (collected) 생성되고, 절대 제거되지 않게) 설정할 수도 있습니다. 하지만 우리는 다운스트림이 Collect 되지 않았을 때 (역주: 결과 값을 받는 collector가 없을 때) 업스트림 데이터베이스 Collection을 멈추도록 (역주: 데이터베이스 요청을 하지 않도록) 설정하고 싶습니다.

 

용어 설명: LiveData의 결과 값을 받는 부분을 observer로 부르고, cold Flow의 결과 값을 받는 부분을 collector로 불렀습니다. 마찬가지로, SharedFlow의 결과 값을 받는 부분을 subscriber로 부르겠습니다.

replay 파라메터에는 1 값을 넣겠습니다. 새롭게 추가된 subscriber는 등록되는 즉시 직전 1번째에 받았던 값을 받게 됩니다.

 

@Singleton
class NearbyUsersDataSource @Inject constructor() {
    // Ideally, those should be constructor-injected.
    val geoFire = GeoFire(FirebaseDatabase.getInstance().getReference("geofire"))
    val geoLocation = GeoLocation(0.0, 0.0)
    val radius = 100.0
    
    val geoQuery = geoFire.queryAtLocation(geoLocation, radius)
    
    private fun GeoQuery.asFlow() = callbackFlow {
        val listener: GeoQueryEventListener = object : GeoQueryEventListener {
            val map = mutableMapOf<Key, GeoLocation>()
            override fun onKeyEntered(key: String, location: GeoLocation) {
                map[key] = location
            }
            override fun onKeyExited(key: String) {
                map.remove(key)
            }
            override fun onKeyMoved(key: String, location: GeoLocation) {
                map[key] = location
            }
            override fun onGeoQueryReady() {
                emit(State.Ready(map.toMap())
            }
            override fun onGeoQueryError(e: DatabaseError) {
                emit(State.Error(map.toMap(), e.toException())
            }
        }
        
        addGeoQueryEventListener(listener)
        
        awaitClose { removeGeoQueryEventListener(listener) }
    }.shareIn(
         ProcessLifecycleOwner.get().lifecycleScope,
         SharingStarted.WhileSubscribed(),
         1
    )

    val locations: Flow<State> = geoQuery.asFlow()
                     
    sealed class State(open val value: Map<Key, GeoLocation>) {
        data class Ready(
            override val value: Map<Key, GeoLocation>
        ) : State(value)
        
        data class Error(
            override val value: Map<Key, GeoLocation>,
            val exception: Exception
        ) : State(value)
    }
}

 

It may help to think of a SharedFlow as a flow collector itself, that materializes our cold flow upstream into a hot flow, and shares the collected values between the many collectors downstream. A man in the middle between the cold upstream flow and the multiple downstream collectors.

 

Now, we might be tempted to think our Activity needs no adjustment. Wrong! There is a gotcha: when collecting the flow in a coroutine launched with launchWhenStarted {} , the coroutine will be paused on onStop() and resumed on onStart() , but it will still be subscribed to the flow. For MutableSharedFlow<T>, it means MutableSharedFlow<T>.subscriptionCount will not change for paused coroutines. To leverage the power of SharingStarted.WhileSubscribed() , we need to actually unsubscribe on onStop() , and subscribe again on onStart(). This means cancelling the collection coroutine and recreating it.

 

(See this issue and this issue for more details).

 

Let's create a class for that general purpose:

 

 

이 코드를 읽어보면 SharedFlow를 flow collector 그 자체로 생각하는데 도움이 될 것입니다. 여기에서 SharedFlow는 cold flow를 hot flow로 바꿔주며, 이 스트림을 듣고 있는 많은 colletor에게 값을 공유해주고 있습니다. 위에서 내려오는 (upstream) cold flow를 아래에서 듣고 있는 (downstream) 많은 collector에게 전달해 주는, 중간자 적인 역할을 합니다.

 

이제, 아마 우리의 Activity는 아무런 수정이 필요 없다고 생각할 수도 있습니다. 아닙니다! 해야 할 것이 있는데요, launchWhenStarted{} 로 시작된 코루틴 flow를 collect 할 때는, 코루틴은 onStop() 에서 멈춰져야 하고 onStart() 에서 재시작되어야 하지만, 현재는 계속 flow를 subscribe 하고 있습니다. MutableSharedFlow<T>를 보시면, MutableSharedFlow<T>.subscriptionCount는 멈춰져 있는 코루틴에 대해서는 값이 변경되지 않습니다. SharingStarted.WhileSubscribed() 의 진정한 힘을 이용하기 위해서, 우리는 정확하게 onStop() 에서 unsubscribe 해야 하고, onStart()에서 다시 subscribe 해야 합니다. 이것은 collect 하는 코루틴을 캔슬하고, 다시 생성해야 한다는 것을 의미합니다.

 

(참고로 이 이슈 와 이 이슈를 보시면 더 자세한 내용을 확인하실 수 있습니다.)

 

범용적인 사용을 위한 클래스를 만들어 봅시다.

 

@PublishedApi
internal class ObserverImpl<T> (
    lifecycleOwner: LifecycleOwner,
    private val flow: Flow<T>,
    private val collector: suspend (T) -> Unit
) : DefaultLifecycleObserver {

    private var job: Job? = null

    override fun onStart(owner: LifecycleOwner) {
        job = owner.lifecycleScope.launch {
            flow.collect {
                collector(it)
            }
        }
    }

    override fun onStop(owner: LifecycleOwner) {
        job?.cancel()
        job = null
    }

    init {
        lifecycleOwner.lifecycle.addObserver(this)
    }
}

inline fun <reified T> Flow<T>.observe(
    lifecycleOwner: LifecycleOwner,
    noinline collector: suspend (T) -> Unit
) {
    ObserverImpl(lifecycleOwner, this, collector)
}

inline fun <reified T> Flow<T>.observeIn(
    lifecycleOwner: LifecycleOwner
) {
    ObserverImpl(lifecycleOwner, this, {})
}

Note: if you want to use this custom observer in your projects, you can use this library: https://github.com/psteiger/flow-lifecycle-observer

Now, we can adjust our Activity to use the .observeIn(LifecycleOwner) extension function we just created:

 

일러두기: 만약 이 커스텀 옵져버를 당신의 프로젝트에서 이용하고 싶다면, 이 라이브러리를 사용하시면 됩니다: https://github.com/psteiger/flow-lifecycle-observer

이제, 우리가 방금 만든 .observeIn(LifeCycleOwner) 를 Activity에서 사용하도록 수정해 보겠습니다.

 

@AndroidEntryPoint
class NearbyUsersActivity : AppCompatActivity() {
    
    private val viewModel: NearbyUsersViewModel by viewModels()
    
    override fun onCreate(savedInstanceState: Bundle?) {
        viewModel
            .locations
            .onEach { /* new locations received */ }
            .observeIn(this)
    }
}

The collector coroutine created with observeIn(LifecycleOwner) will be destroyed when the LifecycleOwner 's Lifecycle reaches the CREATED state (right before onStop() call) and will be recreated once it reaches the STARTED state (after onStart() call).

Note: Why CREATED state? Shouldn’t it be STOPPED state? It sounds counterintuitive at first, but it makes perfect sense. Lifecycle.State only has the following states: CREATED, DESTROYED, INITIALIZED, RESUMED, STARTED. There are no STOPPED and PAUSED states. When lifecycle reaches onPause() , instead of going to a new state, it goes back to the STARTED state. When it reaches onStop() , it goes back to the CREATED state.

 

observeIn(LifecycleOwner) 를 통해 생성된 collector 코루틴은 LifecycleOwnerLifecycle CREATED 상태가 되었을 때 detroy 되고, (onStop()가 호출된 직후) LifecycleOwner Lifecycle STARTED 상태가 되었을 때 (onStart() 가 호출된 이후) 다시 생성됩니다.

일러두기: 왜 CREATED 상태인가요? STOPPED 상태일 때 호출되어야 하는게 아닌가요? 이것은 직관적인 것과는 정반대가 아닌가 하는 생각이 들겠지만, 이건 정확하게 맞는 말입니다. Lifecycle.State 은 다음과 같은 상태밖에 없습니다CREATED, DESTROYED, INITIALIZED, RESUMED, STARTED. 
여기에는
 STOPPED 또는 PAUSED 상태가 없습니다. lifecycle 이 onPause() 상태가 되면, 새로운 상태로 변하는 대신 STARTED 상태로 되돌아갑니다. onStop()가 되면, lifeCycle 상태는 CREATED 상태로 되돌아갑니다.

We now have a Data Source that materializes once, but shares its data to all its subscribers. Its upstream collection will stop as soon as there are no subscribers and will restart as soon as the first subscriber reappears. It has no dependency on the Android platform, and it is not tied to the main thread ( Flow transformations can happen in other threads by simply applying the .flowOn() operator: flowOn(Dispatchers.IO) or .flowOn(Dispatchers.Default)).

 

이제 우리는 한 번만 생성(materializes) 되고, 여러 subscriber에 데이터를 공유해 주는 Data Source를 가지게 되었습니다. subscriber가 없다면 데이터를 요청하지 않고, 하나 이상의 subscriber가 다시 추가되면 곧바로 다시 데이터 요청을 시작하게 됩니다. 안드로이드 플랫폼에 의존적이지 않고 메인 쓰레드에 묶여있지 않습니다. (Flow 변환은 다른 쓰레드에서 할 수 있는데, 간단하게 flowOn(Dispatchers.IO) 또는 .flowOn(Dispatchers.Default)같은  .flowOn() 오퍼레이터를 쓰면 됩니다)

 


But what if I need to eventually access the current state of the flow without collecting it?

하지만 만약 현재 Flow를 collect 하지 않으면서도 상태에 접근해야 할 때가 있다면?

 

If we really need to access the Flow state with .value just like we can do with LiveData , we can use StateFlow , which is a specialized, constricted SharedFlow .

 

Instead of applying the shareIn() operator to materialize the flow, we can apply stateIn() :

 

 

만약 우리가 정말 마치 LiveData에서와 같이 Flow의 상태에 접근해야 한다면, 우리는 StateFlow를 이용할 수 있는데, 제약사항이 가해진 SharedFlow 입니다.

 

Flow를 생성하기 위해서 shareIn() 오퍼레이터를 사용하는 대신, 우리는  stateIn() 오퍼레이터를 사용할 수 있습니다.

 

fun <T> Flow<T>.stateIn(
scope: CoroutineScope,
started: SharingStarted,
initialValue: T
): StateFlow<T> (source)

 

As we can see from the methods parameters, there are two basic differences between sharedIn() and stateIn():

 

1. stateIn() has no support for replay customization. StateFlow is a SharedFlow with a fixed replay=1 . That means new subscribers will immediately get the current state upon subscription.

 

2. stateIn() requires an initial value. That means if you don’t have an initial value at the time, you will need to either make the StateFlow<T> type T nullable, or use a sealed class to represent an empty initial value.

 

 

이 메소드 파라메터에서 볼 수 있는 것 처럼, sharedIn()stateIn() 사이에는 두 가지 기본적인 차이점이 있습니다.

 

1. stateIn() 은 replay 값을 설정 할 수 없습니다. StateFlow는 SharedFlow 에서 replay 값이 1로 고정된 것입니다. 이 말은 새로운 subscriber는 즉시 현재 상태 값을 받게 된다는 의미입니다.

 

2. stateIn()은 초기 값이 필요합니다. 이 말은 만약 당신이 처음에 설정해 줄 값이 없다면, StateFlow<T> 에서 T 에 들어갈 타입을 null 일 수 있는 타입으로 설정하거나, sealed class를 설정해서 "비어있는 초기 값"을 정의해 줘야 한다는 의미입니다.

 


 

From the documentation:

State flow is a shared flow

State flow is a special-purpose, high-performance, and efficient implementation of SharedFlow for the narrow, but widely used case of sharing a state. See the SharedFlow documentation for the basic rules, constraints, and operators that are applicable to all shared flows.

State flow always has an initial value, replays one most recent value to new subscribers, does not buffer any more values, but keeps the last emitted one, and does not support resetReplayCache. A state flow behaves identically to a shared flow when it is created with the following parameters and the distinctUntilChanged operator is applied to it:

// MutableStateFlow(initialValue) is a shared flow with the following parameters:
val shared = MutableSharedFlow(
    replay = 1,
    onBufferOverflow = BufferOverflow.DROP_OLDEST
)

shared.tryEmit(initialValue) // emit the initial value

val state = shared.distinctUntilChanged() // get StateFlow-like behavior

Use SharedFlow when you need a StateFlow with tweaks in its behavior such as extra buffering, replaying more values, or omitting the initial value.

However, note the obvious compromise in choosing SharedFlow: you will lose StateFlow<T>.value .

 

 

 

 

// MutableStateFlow(initialValue) is a shared flow with the following parameters:
val shared = MutableSharedFlow(
    replay = 1,
    onBufferOverflow = BufferOverflow.DROP_OLDEST
)

shared.tryEmit(initialValue) // emit the initial value

val state = shared.distinctUntilChanged() // get StateFlow-like behavior

 

SharedFlow 를 사용하기에 적합한 곳은 StateFlow 에서 추가 버퍼가 필요하거나, 여러 개의 최근 값을 반환해야 하거나, 초기값을 무시해야 하는 곳입니다.

하지만, SharedFlow를 선택할 때 명백하게 타협해야 할 부분이 하나 있는데, 그것은 바로 StateFlow<T>.value를 잃게 된다는 것입니다.

 


Which to choose, StateFlow or SharedFlow?

둘 중 어느 것을 선택해야 하나요? StateFlow 아니면 SharedFlow?

 

The easy way to answer this question is trying to answer a few other questions:

“Do I really need to access the flow's current state at any given time with myFlow.value ?”

If the answer to this question is no, you might consider SharedFlow.

“Do I need to support emitting and collecting repeated values?”

If the answer to this question is yes, you will need SharedFlow.

“Do I need to replay more than the latest value for new subscribers?”

If the answer to this question is yes, you will need SharedFlow.

As we can see, StateFlow for everything is not automatically the right answer.

1. It ignores (conflates) repeated values and this is non-configurable. Sometimes you need to not ignore repeated values, e.g.: a connection attempt that stores the attempt result in a flow, and needs to retry after each failure.

2. Also, it requires an initial value. Because SharedFlow does not have .value, it does not need to be instantiated with an initial value — collectors will just suspend until the first value appears, and no one will try to access .value before any value arrives. If you don’t have an initial value for StateFlow you’ll have to make the StateFlow type nullable T? and use null as the initial value (or declare a sealed class for a default no-value value).

3. Also, you might want to tweak the replay value. SharedFlow can replay the last n values for new subscribers. StateFlow has a fixed replay value of 1 — it only shares the current state value.

Both support the SharingStarted ( Eagerly, Lazily or WhileSubscribed()) configuration. I commonly use SharingStarted.WhileSubscribed() and destroy/recreate all my collectors on Activity onStart() / onStop(), so data source upstream collection will stop when the user is not actively using the app (this is akin to removing/re-adding listeners on LiveData onActive() / onInactive())

The constraints that the StateFlow impose on the SharedFlow might not be the best fit for you, you might want to tweak with the behavior and choose to use SharedFlow. Personally, I rarely ever need to access myFlow.value, and I enjoy SharedFlow's flexibility, so I usually choose SharedFlow.

Read more on StateFlow and SharedFlow on the official documentation.

 

 

이 문제에 대한 답을 찾는 쉬운 방법은 다른 질문을 몇 가지 해보는 것입니다.

 

"내가 정말 언제든지 myFlow.value 이런 방식으로 flow의 현재 상태에 접근해야만 하는가? "

 

만약 답이 아니오 라면, SharedFlow고려해도 좋을 것 같습니다.

 

"내가 반복되는 값을 발신하고, 그것을 collect 하는 동작을 지원해야 하는가?"

 

만약 답이 라면, SharedFlow고려하시기 바랍니다.

 

"내가 최신 값 1개가 아닌, 그 이전의 값까지 더 많은 값을 subscriber 에게 제공할 수 있어야 하는가?"

만약 답이  라면, SharedFlow고려하시기 바랍니다.

 

우리가 살펴본 것 처럼, StateFlow를 모든 곳에 사용하는 것은 올바른 답이 아닙니다.

 

1. StateFlow 반복되는 결과 값을 무시합니다. (conflates)그리고 무시하지 않도록 설정할 수도 없습니다. 가끔은 반복해서 들어오는 값을 무시하지 않아야 할 때가 있습니다. 예를 들면, 접속 시도 결과 값 같은 경우에는 매번 실패할 때 마다 재시도해야 할 수 있습니다. 

 

2. 또한, StateFlow는 초기값이 필요합니다. SharedFlow .value같은 것이 존재하지 않기 때문에, 초기 값을 넣어 초기화할 필요가 없습니다. collector는 단시 첫 번째 값이 올 때 까지 기다립니다. 값이 들어오기 전에는 그 누구도 그 값에 접근하려하지 않을 것입니다. 만약 당신이 StateFlow 를 쓰면서 초기값이 없다면, 당신은 StateFlow의 타입 T 를 nullable 하게, T? 와 같이 설정한 다음 초기 값으로 null을 넣어줘야 합니다. (아니면, sealed class 를 생성한 다음, 초기 값으로 값이 없는 상태를 지정해 줘야 합니다)

 

3. 또한, replay 값을 설정해 줘야 합니다. SharedFlow 는 최근 n개의 값을 새로운 subscriber에게 전달해 줄 수 있습니다. StateFlow는 고정된 replay 값 1을 가지고 있고, 이 말은 현재 상태만을 공유할 수 있다는 의미입니다.

 

이 둘은 모두 SharingStarted ( Eagerly, Lazily 또는 WhileSubscribed()) 설정을 지원합니다. 저는 일반적으로 SharingStarted.WhileSubscribed()를 사용하고, Activity 에서 onStart / onStop 될 때 모든 collecter를 파괴 / 재생성 하기 때문에, 사용자가 앱을 사용하지 않을 때 데이터소스가 데이터를 요청하지 않습니다. (이것은 LiveData에서 onActive / onInactive 일 때, 리스너를 제거/다시 추가하는 동작과 비슷합니다)

 

StateFlow가 SharedFlow 보다 더 많이 가지고 있는 제약사항은, 아마 당신의 사용 목적에 정확하게 일치하지는 않을지도 모릅니다. 아마 당신은 좀 더 많은 것을 설정하기를 원할 것이고, SharedFlow를 사용하는 것을 택할지도 모릅니다. 개인적으로는, 저는 거의 myFlow.value와 같이 직접 값에 접근하지 않고, SharedFlow의 유연성을 좋아하기 때문에 SharedFlow를 선택하는 경우가 많습니다.

 

공식 문서에서 StateFlow  SharedFlow 에 대해 더 찾아보세요.


A practical case where SharedFlow instead of StateFlow is needed

StateFlow가 아닌, SharedFlow가 필요한 실제 예제

 

Consider the following wrapper around Google's Billing Client library. We have a MutableSharedFlow billingClientStatus that stores the current connection status to the billing service. We set its initial value to be SERVICE_DISCONNECTED. We collect billingClientStatus, and when it is not OK, we try to startConnection() to the billing service. If the connection attempt fails, we will emit SERVICE_DISCONNECTED.

In that example, if billingClientStatus were a MutableStateFlow instead of a MutableSharedFlow, when its value is already SERVICE_DISCONNECTED and we try to set it to the same (connection retry failed), it would ignore the update, and consequently, it would not try to reconnect again.

 

아래와 같은, 구글 결제 클라이언트 라이브러리가 있다고 가정합시다. 우리는 MutableSharedFlow billingClientStatus 를 가지고 있는데, 이것은 현재 빌링 서비스의 연결 상태를 저장하고 있습니다. 우리는 이 변수의 초기값을 SERVICE_DISCONNECTED으로 설정합니다. 우리는 billingClientStatus 값을 받고, 결과 값이 OK라면 startConnection()를 빌링 서비스에 호출합니다. 연결시도가 실패하면, 우리는 SERVICE_DISCONNECTED를 발신합니다.

 

이 예제에서, billingClientStatus 가 MutableSharedFlow 가 아니라 MutableStateFlow  였다면, 초기 값이 SERVICE_DISCONNECTED 이고 우리가 시도한 요청의 결과 값 또한 동일하게 SERVICE_DISCONNECTED 이기 때문에 (요청이 실패했기 때문에) 업데이트 되지 않고, 결과적으로는 재접속 요청을 하지 않게 될 것 입니다.

 

@Singleton
class Biller @Inject constructor(
    @ApplicationContext private val context: Context,
) : PurchasesUpdatedListener, BillingClientStateListener {
    
    private var billingClient: BillingClient =
        BillingClient.newBuilder(context)
            .setListener(this)
            .enablePendingPurchases()
            .build()
        
    private val billingClientStatus = MutableSharedFlow<Int>(
        replay = 1,
        onBufferOverflow = BufferOverflow.DROP_OLDEST
    )
    
    override fun onBillingSetupFinished(result: BillingResult) {
        billingClientStatus.tryEmit(result.responseCode)
    }

    override fun onBillingServiceDisconnected() {
        billingClientStatus.tryEmit(BillingClient.BillingResponseCode.SERVICE_DISCONNECTED)
    }
    
    // ...
    
    // Suspend until billingClientStatus == BillingClient.BillingResponseCode.OK
    private suspend fun requireBillingClientSetup(): Boolean =
        withTimeoutOrNull(TIMEOUT_MILLIS) {
            billingClientStatus.first { it == BillingClient.BillingResponseCode.OK }
            true
        } ?: false
   
    init {
        billingClientStatus.tryEmit(BillingClient.BillingResponseCode.SERVICE_DISCONNECTED)
        billingClientStatus.observe(ProcessLifecycleOwner.get()) {
            when (it) {
                BillingClient.BillingResponseCode.OK -> with (billingClient) {
                    updateSkuPrices()
                    handlePurchases()
                }
                else -> {
                    delay(RETRY_MILLIS)
                    billingClient.startConnection(this@Biller)
                }
            }
        }
    }

    private companion object {
        private const val TIMEOUT_MILLIS = 2000L
        private const val RETRY_MILLIS = 3000L
    }
}

 

In that case, we need to use a SharedFlow, which supports emitting sequential repeated values.

 

이러한 경우, 우리는 SharedFlow를 써야하고, 이것은 동일한 값이 반복해서 들어오더라도 무시하지 않고 처리합니다.

On the GeoFire use-case

GeoFire 에서 사용했던 사례

If you have practical need to work with GeoFire, I have developed a library, geofire-ktx, that allows for readily converting a GeoQuery object to a Flow . It also supports fetching DataSnapshot located in other DatabaseReference root with the same child key as the GeoFire root, as this is a common use-case with GeoQuery. It also supports fetching this data as an instance of a class instead of a DataSnapshot . This is done through Flow transformations. The library source code completes the examples given in this article.

 

For other Android libraries, check out https://github.com/psteiger.

 

당신이 GeoFire 를 사용할 필요가 있다면, 제가 만든 라이브러리인 geofire-ktx 를 써보시기 바랍니다. 이 라이브러리를 이용하면 GeoQuery의 결과를 Flow로 받을 수 있습니다. 또한 DataSnapshot 요청을 다른 DatabaseReference 루트에 GeoFire 루트와 동일한 child key로 요청할 수 있습니다. 마치 이것이 GeoQuery의 일반적인 사용법인 것 처럼 말입니다. 그리고 결과 값을 DataSnapshot 타입이 아닌 다른 클래스로도 받을 수 있습니다. 이것은 Flow 변환을 통해 제공됩니다. 라이브러리의 source code 는 이 글에 있는 예제의 완전한 버전입니다.

 

다른 안드로이드 라이브러리를 https://github.com/psteiger 에서 확인하실 수 있습니다.