Filter and store poll events

This commit is contained in:
Maxime NATUREL 2023-01-18 15:24:09 +01:00
parent e3a2000e29
commit 7ca532a5f6
4 changed files with 114 additions and 51 deletions

View File

@ -59,6 +59,8 @@ import org.matrix.android.sdk.internal.session.room.directory.DefaultSetRoomDire
import org.matrix.android.sdk.internal.session.room.directory.GetPublicRoomTask
import org.matrix.android.sdk.internal.session.room.directory.GetRoomDirectoryVisibilityTask
import org.matrix.android.sdk.internal.session.room.directory.SetRoomDirectoryVisibilityTask
import org.matrix.android.sdk.internal.session.room.event.DefaultFilterAndStoreEventsTask
import org.matrix.android.sdk.internal.session.room.event.FilterAndStoreEventsTask
import org.matrix.android.sdk.internal.session.room.location.CheckIfExistingActiveLiveTask
import org.matrix.android.sdk.internal.session.room.location.DefaultCheckIfExistingActiveLiveTask
import org.matrix.android.sdk.internal.session.room.location.DefaultGetActiveBeaconInfoForUserTask
@ -369,4 +371,7 @@ internal abstract class RoomModule {
@Binds
abstract fun bindGetLoadedPollsStatusTask(task: DefaultGetLoadedPollsStatusTask): GetLoadedPollsStatusTask
@Binds
abstract fun bindFilterAndStoreEventsTask(task: DefaultFilterAndStoreEventsTask): FilterAndStoreEventsTask
}

View File

@ -0,0 +1,80 @@
/*
* Copyright (c) 2023 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.room.event
import com.zhuinden.monarchy.Monarchy
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.session.room.send.SendState
import org.matrix.android.sdk.internal.crypto.EventDecryptor
import org.matrix.android.sdk.internal.database.mapper.toEntity
import org.matrix.android.sdk.internal.database.model.EventEntity
import org.matrix.android.sdk.internal.database.model.EventInsertType
import org.matrix.android.sdk.internal.database.query.copyToRealmOrIgnore
import org.matrix.android.sdk.internal.database.query.where
import org.matrix.android.sdk.internal.di.SessionDatabase
import org.matrix.android.sdk.internal.task.Task
import org.matrix.android.sdk.internal.util.awaitTransaction
import org.matrix.android.sdk.internal.util.time.Clock
import javax.inject.Inject
internal interface FilterAndStoreEventsTask : Task<FilterAndStoreEventsTask.Params, Unit> {
data class Params(
val roomId: String,
val events: List<Event>,
val filterPredicate: (Event) -> Boolean,
)
}
internal class DefaultFilterAndStoreEventsTask @Inject constructor(
@SessionDatabase private val monarchy: Monarchy,
private val clock: Clock,
private val eventDecryptor: EventDecryptor,
) : FilterAndStoreEventsTask {
override suspend fun execute(params: FilterAndStoreEventsTask.Params) {
val filteredEvents = params.events
.map { decryptEventIfNeeded(it) }
.filter { params.filterPredicate(it) }
addMissingEventsInDB(params.roomId, filteredEvents)
}
private suspend fun addMissingEventsInDB(roomId: String, events: List<Event>) {
monarchy.awaitTransaction { realm ->
val eventIdsToCheck = events.mapNotNull { it.eventId }.filter { it.isNotEmpty() }
if (eventIdsToCheck.isNotEmpty()) {
val existingIds = EventEntity.where(realm, eventIdsToCheck).findAll().toList().map { it.eventId }
events.filterNot { it.eventId in existingIds }
.map { it.toEntity(roomId = roomId, sendState = SendState.SYNCED, ageLocalTs = computeLocalTs(it)) }
.forEach { it.copyToRealmOrIgnore(realm, EventInsertType.PAGINATION) }
}
}
}
private suspend fun decryptEventIfNeeded(event: Event): Event {
if (event.isEncrypted()) {
eventDecryptor.decryptEventAndSaveResult(event, timeline = "")
}
event.ageLocalTs = computeLocalTs(event)
return event
}
private fun computeLocalTs(event: Event) = clock.epochMillis() - (event.unsignedData?.age ?: 0)
}

View File

@ -17,6 +17,8 @@
package org.matrix.android.sdk.internal.session.room.poll
import com.zhuinden.monarchy.Monarchy
import org.matrix.android.sdk.api.session.events.model.isPoll
import org.matrix.android.sdk.api.session.events.model.isPollResponse
import org.matrix.android.sdk.api.session.room.poll.LoadedPollsStatus
import org.matrix.android.sdk.internal.database.model.PollHistoryStatusEntity
import org.matrix.android.sdk.internal.database.query.getOrCreate
@ -24,6 +26,7 @@ import org.matrix.android.sdk.internal.di.SessionDatabase
import org.matrix.android.sdk.internal.network.GlobalErrorReceiver
import org.matrix.android.sdk.internal.network.executeRequest
import org.matrix.android.sdk.internal.session.room.RoomAPI
import org.matrix.android.sdk.internal.session.room.event.FilterAndStoreEventsTask
import org.matrix.android.sdk.internal.session.room.poll.PollConstants.MILLISECONDS_PER_DAY
import org.matrix.android.sdk.internal.session.room.timeline.PaginationDirection
import org.matrix.android.sdk.internal.session.room.timeline.PaginationResponse
@ -44,6 +47,7 @@ internal class DefaultLoadMorePollsTask @Inject constructor(
@SessionDatabase private val monarchy: Monarchy,
private val roomAPI: RoomAPI,
private val globalErrorReceiver: GlobalErrorReceiver,
private val filterAndStoreEventsTask: FilterAndStoreEventsTask,
) : LoadMorePollsTask {
override suspend fun execute(params: LoadMorePollsTask.Params): LoadedPollsStatus {
@ -53,9 +57,10 @@ internal class DefaultLoadMorePollsTask @Inject constructor(
currentPollHistoryStatus = fetchMorePollEventsBackward(params, currentPollHistoryStatus)
}
// TODO
// unmock and check how it behaves when cancelling the process: it should resume where it was stopped
// check how it behaves when cancelling the process: it should resume where it was stopped
// check the network calls done using Flipper
// check forward of error in case of call api failure
// test on large room
return LoadedPollsStatus(
canLoadMore = currentPollHistoryStatus.isEndOfPollsBackward.not(),
@ -89,7 +94,7 @@ internal class DefaultLoadMorePollsTask @Inject constructor(
params: LoadMorePollsTask.Params,
status: PollHistoryStatusEntity
): PollHistoryStatusEntity {
val chunk = executeRequest(globalErrorReceiver) {
val response = executeRequest(globalErrorReceiver) {
roomAPI.getRoomMessagesFrom(
roomId = params.roomId,
from = status.tokenEndBackward,
@ -99,9 +104,18 @@ internal class DefaultLoadMorePollsTask @Inject constructor(
)
}
// TODO decrypt events and filter in only polls to store them in local: see to mutualize with FetchPollResponseEventsTask
filterAndStorePollEvents(roomId = params.roomId, paginationResponse = response)
return updatePollHistoryStatus(roomId = params.roomId, paginationResponse = chunk)
return updatePollHistoryStatus(roomId = params.roomId, paginationResponse = response)
}
private suspend fun filterAndStorePollEvents(roomId: String, paginationResponse: PaginationResponse) {
val filterTaskParams = FilterAndStoreEventsTask.Params(
roomId = roomId,
events = paginationResponse.events,
filterPredicate = { it.isPoll() || it.isPollResponse() }
)
filterAndStoreEventsTask.execute(filterTaskParams)
}
private suspend fun updatePollHistoryStatus(roomId: String, paginationResponse: PaginationResponse): PollHistoryStatusEntity {
@ -124,7 +138,7 @@ internal class DefaultLoadMorePollsTask @Inject constructor(
// start of the timeline is reached, there are no more events
status.isEndOfPollsBackward = true
status.oldestTimestampReachedMs = oldestEventTimestamp
} else if(oldestEventTimestamp != null && currentTargetTimestamp != null && oldestEventTimestamp <= currentTargetTimestamp) {
} else if (oldestEventTimestamp != null && currentTargetTimestamp != null && oldestEventTimestamp <= currentTargetTimestamp) {
// target has been reached
status.oldestTimestampReachedMs = oldestEventTimestamp
status.tokenEndBackward = paginationResponse.end

View File

@ -17,25 +17,14 @@
package org.matrix.android.sdk.internal.session.room.relation.poll
import androidx.annotation.VisibleForTesting
import com.zhuinden.monarchy.Monarchy
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.session.events.model.RelationType
import org.matrix.android.sdk.api.session.events.model.isPollResponse
import org.matrix.android.sdk.api.session.room.send.SendState
import org.matrix.android.sdk.internal.crypto.EventDecryptor
import org.matrix.android.sdk.internal.database.mapper.toEntity
import org.matrix.android.sdk.internal.database.model.EventEntity
import org.matrix.android.sdk.internal.database.model.EventInsertType
import org.matrix.android.sdk.internal.database.query.copyToRealmOrIgnore
import org.matrix.android.sdk.internal.database.query.where
import org.matrix.android.sdk.internal.di.SessionDatabase
import org.matrix.android.sdk.internal.network.GlobalErrorReceiver
import org.matrix.android.sdk.internal.network.executeRequest
import org.matrix.android.sdk.internal.session.room.RoomAPI
import org.matrix.android.sdk.internal.session.room.event.FilterAndStoreEventsTask
import org.matrix.android.sdk.internal.session.room.relation.RelationsResponse
import org.matrix.android.sdk.internal.task.Task
import org.matrix.android.sdk.internal.util.awaitTransaction
import org.matrix.android.sdk.internal.util.time.Clock
import javax.inject.Inject
@VisibleForTesting
@ -54,10 +43,9 @@ internal interface FetchPollResponseEventsTask : Task<FetchPollResponseEventsTas
internal class DefaultFetchPollResponseEventsTask @Inject constructor(
private val roomAPI: RoomAPI,
private val globalErrorReceiver: GlobalErrorReceiver,
@SessionDatabase private val monarchy: Monarchy,
private val clock: Clock,
private val eventDecryptor: EventDecryptor,
) : FetchPollResponseEventsTask {
private val filterAndStoreEventsTask: FilterAndStoreEventsTask,
) : FetchPollResponseEventsTask {
override suspend fun execute(params: FetchPollResponseEventsTask.Params): Result<Unit> = runCatching {
var nextBatch: String? = fetchAndProcessRelatedEventsFrom(params)
@ -70,11 +58,12 @@ internal class DefaultFetchPollResponseEventsTask @Inject constructor(
private suspend fun fetchAndProcessRelatedEventsFrom(params: FetchPollResponseEventsTask.Params, from: String? = null): String? {
val response = getRelatedEvents(params, from)
val filteredEvents = response.chunks
.map { decryptEventIfNeeded(it) }
.filter { it.isPollResponse() }
addMissingEventsInDB(params.roomId, filteredEvents)
val filterTaskParams = FilterAndStoreEventsTask.Params(
roomId = params.roomId,
events = response.chunks,
filterPredicate = { it.isPollResponse() }
)
filterAndStoreEventsTask.execute(filterTaskParams)
return response.nextBatch
}
@ -90,29 +79,4 @@ internal class DefaultFetchPollResponseEventsTask @Inject constructor(
)
}
}
private suspend fun addMissingEventsInDB(roomId: String, events: List<Event>) {
monarchy.awaitTransaction { realm ->
val eventIdsToCheck = events.mapNotNull { it.eventId }.filter { it.isNotEmpty() }
if (eventIdsToCheck.isNotEmpty()) {
val existingIds = EventEntity.where(realm, eventIdsToCheck).findAll().toList().map { it.eventId }
events.filterNot { it.eventId in existingIds }
.map { it.toEntity(roomId = roomId, sendState = SendState.SYNCED, ageLocalTs = computeLocalTs(it)) }
.forEach { it.copyToRealmOrIgnore(realm, EventInsertType.PAGINATION) }
}
}
}
private suspend fun decryptEventIfNeeded(event: Event): Event {
if (event.isEncrypted()) {
eventDecryptor.decryptEventAndSaveResult(event, timeline = "")
}
event.ageLocalTs = computeLocalTs(event)
return event
}
private fun computeLocalTs(event: Event) = clock.epochMillis() - (event.unsignedData?.age ?: 0)
}