diff --git a/changelog.d/6154.bugfix b/changelog.d/6154.bugfix new file mode 100644 index 0000000000..5c64eb2879 --- /dev/null +++ b/changelog.d/6154.bugfix @@ -0,0 +1 @@ +Fixed /upgraderoom command not doing anything diff --git a/vector/src/main/java/im/vector/app/core/utils/DataSource.kt b/vector/src/main/java/im/vector/app/core/utils/DataSource.kt index f83eda68e9..60ad91272b 100644 --- a/vector/src/main/java/im/vector/app/core/utils/DataSource.kt +++ b/vector/src/main/java/im/vector/app/core/utils/DataSource.kt @@ -25,6 +25,7 @@ interface DataSource { } interface MutableDataSource : DataSource { + fun post(value: T) } @@ -49,10 +50,12 @@ open class BehaviorDataSource(private val defaultValue: T? = null) : MutableD /** * This datasource only emits all subsequent observed values to each subscriber. + * + * bufferSize - number of buffered items before it starts dropping oldest. Should be at least 1 */ -open class PublishDataSource : MutableDataSource { +open class PublishDataSource(bufferSize: Int = 10) : MutableDataSource { - private val mutableFlow = MutableSharedFlow(replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) + private val mutableFlow = MutableSharedFlow(replay = 0, extraBufferCapacity = bufferSize, onBufferOverflow = BufferOverflow.DROP_OLDEST) override fun stream(): Flow { return mutableFlow diff --git a/vector/src/test/java/im/vector/app/core/utils/DataSourceTest.kt b/vector/src/test/java/im/vector/app/core/utils/DataSourceTest.kt new file mode 100644 index 0000000000..46c4406c8c --- /dev/null +++ b/vector/src/test/java/im/vector/app/core/utils/DataSourceTest.kt @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2022 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package im.vector.app.core.utils + +import im.vector.app.test.test +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.test.TestCoroutineScheduler +import kotlinx.coroutines.test.UnconfinedTestDispatcher +import kotlinx.coroutines.test.runTest +import kotlinx.coroutines.withContext +import org.amshove.kluent.shouldContainSame +import org.junit.Test + +class DataSourceTest { + + @Test + fun `given PublishDataSource, when posting values before observing, then no value is observed`() = runTest { + val publishDataSource = PublishDataSource() + publishDataSource.post(0) + publishDataSource.post(1) + + publishDataSource.stream() + .test(this) + .assertNoValues() + .finish() + } + + @Test + fun `given PublishDataSource with a large enough buffer size, when posting values after observing, then all values are observed`() = runTest { + val valuesToPost = listOf(2, 3, 4, 5, 6, 7, 8, 9) + val publishDataSource = PublishDataSource(bufferSize = valuesToPost.size) + publishDataSource.test(testScheduler, valuesToPost, valuesToPost) + } + + @Test + fun `given PublishDataSource with a too small buffer size, when posting values after observing, then we are missing some values`() = runTest { + val valuesToPost = listOf(2, 3, 4, 5, 6, 7, 8, 9) + val expectedValues = listOf(2, 9) + val publishDataSource = PublishDataSource(bufferSize = 1) + publishDataSource.test(testScheduler, valuesToPost, expectedValues) + } + + private suspend fun PublishDataSource.test(testScheduler: TestCoroutineScheduler, valuesToPost: List, expectedValues: List) { + val values = ArrayList() + val job = stream() + .onEach { + // Artificial delay to make consumption longer than production + delay(10) + values.add(it) + } + .launchIn(CoroutineScope(UnconfinedTestDispatcher(testScheduler))) + + valuesToPost.forEach { + post(it) + } + withContext(Dispatchers.Default) { + delay(11L * valuesToPost.size) + } + job.cancel() + + values shouldContainSame expectedValues + } +}