Task to ensure aggregation of all poll responses when receiving ending poll event

This commit is contained in:
Maxime NATUREL 2022-12-14 11:26:17 +01:00
parent 8c88140b3c
commit 8b7c8e3351
4 changed files with 170 additions and 2 deletions

View File

@ -388,7 +388,13 @@ fun Event.isLocationMessage(): Boolean {
}
}
fun Event.isPoll(): Boolean = getClearType() in EventType.POLL_START.values || getClearType() in EventType.POLL_END.values
fun Event.isPoll(): Boolean = isPollStart() || isPollEnd()
fun Event.isPollStart(): Boolean = getClearType() in EventType.POLL_START.values
fun Event.isPollResponse(): Boolean = getClearType() in EventType.POLL_RESPONSE.values
fun Event.isPollEnd(): Boolean = getClearType() in EventType.POLL_END.values
fun Event.isSticker(): Boolean = getClearType() == EventType.STICKER

View File

@ -99,6 +99,8 @@ import org.matrix.android.sdk.internal.session.room.relation.DefaultUpdateQuickR
import org.matrix.android.sdk.internal.session.room.relation.FetchEditHistoryTask
import org.matrix.android.sdk.internal.session.room.relation.FindReactionEventForUndoTask
import org.matrix.android.sdk.internal.session.room.relation.UpdateQuickReactionTask
import org.matrix.android.sdk.internal.session.room.relation.poll.DefaultFetchPollResponseEventsTask
import org.matrix.android.sdk.internal.session.room.relation.poll.FetchPollResponseEventsTask
import org.matrix.android.sdk.internal.session.room.relation.threads.DefaultFetchThreadSummariesTask
import org.matrix.android.sdk.internal.session.room.relation.threads.DefaultFetchThreadTimelineTask
import org.matrix.android.sdk.internal.session.room.relation.threads.FetchThreadSummariesTask
@ -354,4 +356,7 @@ internal abstract class RoomModule {
@Binds
abstract fun bindRedactLiveLocationShareTask(task: DefaultRedactLiveLocationShareTask): RedactLiveLocationShareTask
@Binds
abstract fun bindFetchPollResponseEventsTask(task: DefaultFetchPollResponseEventsTask): FetchPollResponseEventsTask
}

View File

@ -17,6 +17,7 @@
package org.matrix.android.sdk.internal.session.room.aggregation.poll
import io.realm.Realm
import kotlinx.coroutines.launch
import org.matrix.android.sdk.api.extensions.orFalse
import org.matrix.android.sdk.api.session.Session
import org.matrix.android.sdk.api.session.events.model.Event
@ -40,9 +41,14 @@ import org.matrix.android.sdk.internal.database.model.PollResponseAggregatedSumm
import org.matrix.android.sdk.internal.database.query.create
import org.matrix.android.sdk.internal.database.query.getOrCreate
import org.matrix.android.sdk.internal.database.query.where
import org.matrix.android.sdk.internal.session.room.relation.poll.FetchPollResponseEventsTask
import org.matrix.android.sdk.internal.task.TaskExecutor
import javax.inject.Inject
class DefaultPollAggregationProcessor @Inject constructor() : PollAggregationProcessor {
internal class DefaultPollAggregationProcessor @Inject constructor(
private val taskExecutor: TaskExecutor,
private val fetchPollResponseEventsTask: FetchPollResponseEventsTask,
) : PollAggregationProcessor {
override fun handlePollStartEvent(realm: Realm, event: Event): Boolean {
val content = event.getClearContent()?.toModel<MessagePollContent>()
@ -174,6 +180,10 @@ class DefaultPollAggregationProcessor @Inject constructor() : PollAggregationPro
aggregatedPollSummaryEntity.sourceEvents.add(event.eventId)
}
if (!isLocalEcho) {
ensurePollIsFullyAggregated(roomId, pollEventId)
}
return true
}
@ -200,4 +210,21 @@ class DefaultPollAggregationProcessor @Inject constructor() : PollAggregationPro
eventAnnotationsSummaryEntity.pollResponseSummary = it
}
}
// TODO add unit tests
/**
* Check that all related votes to a given poll are all retrieved and aggregated.
*/
private fun ensurePollIsFullyAggregated(
roomId: String,
pollEventId: String
) {
taskExecutor.executorScope.launch {
val params = FetchPollResponseEventsTask.Params(
roomId = roomId,
startPollEventId = pollEventId,
)
fetchPollResponseEventsTask.execute(params)
}
}
}

View File

@ -0,0 +1,130 @@
/*
* Copyright (c) 2022 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.room.relation.poll
import com.zhuinden.monarchy.Monarchy
import org.matrix.android.sdk.api.extensions.tryOrNull
import org.matrix.android.sdk.api.session.crypto.model.OlmDecryptionResult
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.session.events.model.RelationType
import org.matrix.android.sdk.api.session.events.model.isPollResponse
import org.matrix.android.sdk.api.session.room.send.SendState
import org.matrix.android.sdk.internal.crypto.EventDecryptor
import org.matrix.android.sdk.internal.database.mapper.toEntity
import org.matrix.android.sdk.internal.database.model.EventEntity
import org.matrix.android.sdk.internal.database.model.EventInsertType
import org.matrix.android.sdk.internal.database.query.copyToRealmOrIgnore
import org.matrix.android.sdk.internal.database.query.where
import org.matrix.android.sdk.internal.di.SessionDatabase
import org.matrix.android.sdk.internal.network.GlobalErrorReceiver
import org.matrix.android.sdk.internal.network.executeRequest
import org.matrix.android.sdk.internal.session.room.RoomAPI
import org.matrix.android.sdk.internal.session.room.relation.RelationsResponse
import org.matrix.android.sdk.internal.task.Task
import org.matrix.android.sdk.internal.util.awaitTransaction
import org.matrix.android.sdk.internal.util.time.Clock
import javax.inject.Inject
private const val FETCH_RELATED_EVENTS_LIMIT = 50
/**
* Task to fetch all the vote events to ensure full aggregation for a given poll.
*/
internal interface FetchPollResponseEventsTask : Task<FetchPollResponseEventsTask.Params, Result<Unit>> {
data class Params(
val roomId: String,
val startPollEventId: String,
)
}
// TODO add unit tests
internal class DefaultFetchPollResponseEventsTask @Inject constructor(
private val roomAPI: RoomAPI,
private val globalErrorReceiver: GlobalErrorReceiver,
@SessionDatabase private val monarchy: Monarchy,
private val clock: Clock,
private val eventDecryptor: EventDecryptor,
) : FetchPollResponseEventsTask {
override suspend fun execute(params: FetchPollResponseEventsTask.Params): Result<Unit> = runCatching {
var nextBatch: String? = fetchAndProcessRelatedEventsFrom(params)
while (nextBatch?.isNotEmpty() == true) {
nextBatch = fetchAndProcessRelatedEventsFrom(params, from = nextBatch)
}
}
private suspend fun fetchAndProcessRelatedEventsFrom(params: FetchPollResponseEventsTask.Params, from: String? = null): String? {
val response = getRelatedEvents(params, from)
val filteredEvents = response.chunks
.map { decryptEventIfNeeded(it) }
.filter { it.isPollResponse() }
addMissingEventsInDB(params.roomId, filteredEvents)
return response.nextBatch
}
private suspend fun getRelatedEvents(params: FetchPollResponseEventsTask.Params, from: String? = null): RelationsResponse {
return executeRequest(globalErrorReceiver, canRetry = true) {
roomAPI.getRelations(
roomId = params.roomId,
eventId = params.startPollEventId,
relationType = RelationType.REFERENCE,
from = from,
limit = FETCH_RELATED_EVENTS_LIMIT,
)
}
}
private suspend fun addMissingEventsInDB(roomId: String, events: List<Event>) {
monarchy.awaitTransaction { realm ->
val eventIdsToCheck = events.mapNotNull { it.eventId }.filter { it.isNotEmpty() }
val existingIds = EventEntity.where(realm, eventIdsToCheck).findAll().toList().map { it.eventId }
events.filterNot { it.eventId in existingIds }
.map {
val ageLocalTs = clock.epochMillis() - (it.unsignedData?.age ?: 0)
it.toEntity(roomId = roomId, sendState = SendState.SYNCED, ageLocalTs = ageLocalTs)
}
.forEach { it.copyToRealmOrIgnore(realm, EventInsertType.PAGINATION) }
}
}
private suspend fun decryptEventIfNeeded(event: Event): Event {
// TODO move into a reusable task
if (event.isEncrypted()) {
tryOrNull(message = "Unable to decrypt the event") {
eventDecryptor.decryptEvent(event, "")
}
?.let { result ->
event.mxDecryptionResult = OlmDecryptionResult(
payload = result.clearEvent,
senderKey = result.senderCurve25519Key,
keysClaimed = result.claimedEd25519Key?.let { mapOf("ed25519" to it) },
forwardingCurve25519KeyChain = result.forwardingCurve25519KeyChain,
isSafe = result.isSafe
)
}
}
event.ageLocalTs = clock.epochMillis() - (event.unsignedData?.age ?: 0)
return event
}
}