Sending: remove events from Worker params by fetching in db instead

This commit is contained in:
ganfra 2020-09-17 18:37:33 +02:00
parent 144d0e56cc
commit b227dc3e5c
18 changed files with 181 additions and 129 deletions

View File

@ -20,6 +20,10 @@ package org.matrix.android.sdk.internal.crypto.store.db
import androidx.lifecycle.LiveData
import androidx.lifecycle.Transformations
import com.zhuinden.monarchy.Monarchy
import io.realm.Realm
import io.realm.RealmConfiguration
import io.realm.Sort
import io.realm.kotlin.where
import org.matrix.android.sdk.api.auth.data.Credentials
import org.matrix.android.sdk.api.session.crypto.crosssigning.MXCrossSigningInfo
import org.matrix.android.sdk.api.session.events.model.Event
@ -85,10 +89,6 @@ import org.matrix.android.sdk.internal.database.mapper.ContentMapper
import org.matrix.android.sdk.internal.di.CryptoDatabase
import org.matrix.android.sdk.internal.di.MoshiProvider
import org.matrix.android.sdk.internal.session.SessionScope
import io.realm.Realm
import io.realm.RealmConfiguration
import io.realm.Sort
import io.realm.kotlin.where
import org.matrix.olm.OlmAccount
import org.matrix.olm.OlmException
import timber.log.Timber
@ -541,7 +541,7 @@ internal class RealmCryptoStore @Inject constructor(
deviceId = it.deviceId
)
}
monarchy.writeAsync { realm ->
doRealmTransactionAsync(realmConfiguration) { realm ->
realm.where<MyDeviceLastSeenInfoEntity>().findAll().deleteAllFromRealm()
entities.forEach {
realm.insertOrUpdate(it)
@ -1191,7 +1191,7 @@ internal class RealmCryptoStore @Inject constructor(
.findAll()
.mapNotNull { entity ->
when (entity.type) {
GossipRequestType.KEY -> {
GossipRequestType.KEY -> {
IncomingRoomKeyRequest(
userId = entity.otherUserId,
deviceId = entity.otherDeviceId,

View File

@ -23,8 +23,8 @@ import androidx.work.WorkerParameters
import com.squareup.moshi.JsonClass
import org.matrix.android.sdk.api.failure.shouldBeRetried
import org.matrix.android.sdk.api.session.crypto.CryptoService
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.internal.crypto.tasks.SendVerificationMessageTask
import org.matrix.android.sdk.internal.session.room.send.LocalEchoRepository
import org.matrix.android.sdk.internal.worker.SessionWorkerParams
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
import org.matrix.android.sdk.internal.worker.getSessionComponent
@ -42,13 +42,16 @@ internal class SendVerificationMessageWorker(context: Context,
@JsonClass(generateAdapter = true)
internal data class Params(
override val sessionId: String,
val event: Event,
val eventId: String,
override val lastFailureMessage: String? = null
) : SessionWorkerParams
@Inject
lateinit var sendVerificationMessageTask: SendVerificationMessageTask
@Inject
lateinit var localEchoRepository: LocalEchoRepository
@Inject
lateinit var cryptoService: CryptoService
@ -63,16 +66,18 @@ internal class SendVerificationMessageWorker(context: Context,
Timber.e("Unknown Session, cannot send message, sessionId: ${params.sessionId}")
}
sessionComponent.inject(this)
val localId = params.event.eventId ?: ""
val localEvent = localEchoRepository.getUpToDateEcho(params.eventId) ?: return Result.success(errorOutputData)
return try {
val eventId = sendVerificationMessageTask.execute(
SendVerificationMessageTask.Params(
event = params.event,
event = localEvent,
cryptoService = cryptoService
)
)
Result.success(Data.Builder().putString(localId, eventId).build())
Result.success(Data.Builder().putString(params.eventId, eventId).build())
} catch (exception: Throwable) {
if (exception.shouldBeRetried()) {
Result.retry()

View File

@ -22,6 +22,9 @@ import androidx.work.Data
import androidx.work.ExistingWorkPolicy
import androidx.work.Operation
import androidx.work.WorkInfo
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.matrix.android.sdk.R
import org.matrix.android.sdk.api.session.crypto.verification.CancelCode
import org.matrix.android.sdk.api.session.crypto.verification.ValidVerificationInfoRequest
@ -52,9 +55,6 @@ import org.matrix.android.sdk.internal.session.room.send.LocalEchoEventFactory
import org.matrix.android.sdk.internal.task.TaskExecutor
import org.matrix.android.sdk.internal.util.StringProvider
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import timber.log.Timber
import java.util.UUID
import java.util.concurrent.TimeUnit
@ -87,7 +87,7 @@ internal class VerificationTransportRoomMessage(
val workerParams = WorkerParamsFactory.toData(SendVerificationMessageWorker.Params(
sessionId = sessionId,
event = event
eventId = event.eventId?:""
))
val enqueueInfo = enqueueSendWork(workerParams)
@ -174,7 +174,7 @@ internal class VerificationTransportRoomMessage(
val workerParams = WorkerParamsFactory.toData(SendVerificationMessageWorker.Params(
sessionId = sessionId,
event = event
eventId = event.eventId ?: ""
))
val workRequest = workManagerProvider.matrixOneTimeWorkRequestBuilder<SendVerificationMessageWorker>()
@ -229,7 +229,7 @@ internal class VerificationTransportRoomMessage(
)
val workerParams = WorkerParamsFactory.toData(SendVerificationMessageWorker.Params(
sessionId = sessionId,
event = event
eventId = event.eventId ?: ""
))
enqueueSendWork(workerParams)
}
@ -249,7 +249,7 @@ internal class VerificationTransportRoomMessage(
)
val workerParams = WorkerParamsFactory.toData(SendVerificationMessageWorker.Params(
sessionId = sessionId,
event = event
eventId = event.eventId ?: ""
))
val enqueueInfo = enqueueSendWork(workerParams)

View File

@ -24,7 +24,6 @@ import androidx.work.WorkerParameters
import com.squareup.moshi.JsonClass
import org.matrix.android.sdk.api.extensions.tryThis
import org.matrix.android.sdk.api.session.content.ContentAttachmentData
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.session.events.model.toContent
import org.matrix.android.sdk.api.session.events.model.toModel
import org.matrix.android.sdk.api.session.room.model.message.MessageAudioContent
@ -34,9 +33,13 @@ import org.matrix.android.sdk.api.session.room.model.message.MessageImageContent
import org.matrix.android.sdk.api.session.room.model.message.MessageVideoContent
import org.matrix.android.sdk.internal.crypto.attachments.MXEncryptedAttachments
import org.matrix.android.sdk.internal.crypto.model.rest.EncryptedFileInfo
import org.matrix.android.sdk.internal.database.mapper.ContentMapper
import org.matrix.android.sdk.internal.database.mapper.asDomain
import org.matrix.android.sdk.internal.network.ProgressRequestBody
import org.matrix.android.sdk.internal.session.DefaultFileService
import org.matrix.android.sdk.internal.session.room.send.CancelSendTracker
import org.matrix.android.sdk.internal.session.room.send.LocalEchoIdentifiers
import org.matrix.android.sdk.internal.session.room.send.LocalEchoRepository
import org.matrix.android.sdk.internal.session.room.send.MultipleEventSendingDispatcherWorker
import org.matrix.android.sdk.internal.worker.SessionWorkerParams
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
@ -61,7 +64,7 @@ internal class UploadContentWorker(val context: Context, params: WorkerParameter
@JsonClass(generateAdapter = true)
internal data class Params(
override val sessionId: String,
val events: List<Event>,
val localEchoIds: List<LocalEchoIdentifiers>,
val attachment: ContentAttachmentData,
val isEncrypted: Boolean,
val compressBeforeSending: Boolean,
@ -73,6 +76,7 @@ internal class UploadContentWorker(val context: Context, params: WorkerParameter
@Inject lateinit var fileService: DefaultFileService
@Inject lateinit var cancelSendTracker: CancelSendTracker
@Inject lateinit var imageCompressor: ImageCompressor
@Inject lateinit var localEchoRepository: LocalEchoRepository
override suspend fun doWork(): Result {
val params = WorkerParamsFactory.fromData<Params>(inputData)
@ -100,7 +104,7 @@ internal class UploadContentWorker(val context: Context, params: WorkerParameter
val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success()
sessionComponent.inject(this)
val allCancelled = params.events.all { cancelSendTracker.isCancelRequestedFor(it.eventId, it.roomId) }
val allCancelled = params.localEchoIds.all { cancelSendTracker.isCancelRequestedFor(it.eventId, it.roomId) }
if (allCancelled) {
// there is no point in uploading the image!
return Result.success(inputData)
@ -289,46 +293,48 @@ internal class UploadContentWorker(val context: Context, params: WorkerParameter
)
}
private fun handleSuccess(params: Params,
attachmentUrl: String,
encryptedFileInfo: EncryptedFileInfo?,
thumbnailUrl: String?,
thumbnailEncryptedFileInfo: EncryptedFileInfo?,
newImageAttributes: NewImageAttributes?): Result {
private suspend fun handleSuccess(params: Params,
attachmentUrl: String,
encryptedFileInfo: EncryptedFileInfo?,
thumbnailUrl: String?,
thumbnailEncryptedFileInfo: EncryptedFileInfo?,
newImageAttributes: NewImageAttributes?): Result {
notifyTracker(params) { contentUploadStateTracker.setSuccess(it) }
params.localEchoIds.forEach {
updateEvent(it.eventId, attachmentUrl, encryptedFileInfo, thumbnailUrl, thumbnailEncryptedFileInfo, newImageAttributes)
}
val updatedEvents = params.events
.map {
updateEvent(it, attachmentUrl, encryptedFileInfo, thumbnailUrl, thumbnailEncryptedFileInfo, newImageAttributes)
}
val sendParams = MultipleEventSendingDispatcherWorker.Params(params.sessionId, updatedEvents, params.isEncrypted)
val sendParams = MultipleEventSendingDispatcherWorker.Params(
sessionId = params.sessionId,
localEchoIds = params.localEchoIds,
isEncrypted = params.isEncrypted
)
return Result.success(WorkerParamsFactory.toData(sendParams)).also {
Timber.v("## handleSuccess $attachmentUrl, work is stopped $isStopped")
}
}
private fun updateEvent(event: Event,
url: String,
encryptedFileInfo: EncryptedFileInfo?,
thumbnailUrl: String? = null,
thumbnailEncryptedFileInfo: EncryptedFileInfo?,
newImageAttributes: NewImageAttributes?): Event {
val messageContent: MessageContent = event.content.toModel() ?: return event
val updatedContent = when (messageContent) {
is MessageImageContent -> messageContent.update(url, encryptedFileInfo, newImageAttributes)
is MessageVideoContent -> messageContent.update(url, encryptedFileInfo, thumbnailUrl, thumbnailEncryptedFileInfo)
is MessageFileContent -> messageContent.update(url, encryptedFileInfo)
is MessageAudioContent -> messageContent.update(url, encryptedFileInfo)
else -> messageContent
private suspend fun updateEvent(eventId: String,
url: String,
encryptedFileInfo: EncryptedFileInfo?,
thumbnailUrl: String? = null,
thumbnailEncryptedFileInfo: EncryptedFileInfo?,
newImageAttributes: NewImageAttributes?) {
localEchoRepository.updateEcho(eventId) { _, event ->
val messageContent: MessageContent? = event.asDomain().content.toModel()
val updatedContent = when (messageContent) {
is MessageImageContent -> messageContent.update(url, encryptedFileInfo, newImageAttributes)
is MessageVideoContent -> messageContent.update(url, encryptedFileInfo, thumbnailUrl, thumbnailEncryptedFileInfo)
is MessageFileContent -> messageContent.update(url, encryptedFileInfo)
is MessageAudioContent -> messageContent.update(url, encryptedFileInfo)
else -> messageContent
}
event.content = ContentMapper.map(updatedContent.toContent())
}
return event.copy(content = updatedContent.toContent())
}
private fun notifyTracker(params: Params, function: (String) -> Unit) {
params.events
.mapNotNull { it.eventId }
.forEach { eventId -> function.invoke(eventId) }
params.localEchoIds.forEach { function.invoke(it.eventId) }
}
private fun MessageImageContent.update(url: String,

View File

@ -24,7 +24,6 @@ import com.squareup.moshi.JsonClass
import org.matrix.android.sdk.internal.worker.SessionWorkerParams
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
import org.matrix.android.sdk.internal.worker.getSessionComponent
import timber.log.Timber
import javax.inject.Inject
/**
@ -44,7 +43,6 @@ internal class GetGroupDataWorker(context: Context, params: WorkerParameters) :
override suspend fun doWork(): Result {
val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.failure()
.also { Timber.e("Unable to parse work parameters") }
val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success()
sessionComponent.inject(this)

View File

@ -23,6 +23,7 @@ import org.matrix.android.sdk.internal.di.SessionDatabase
import org.matrix.android.sdk.internal.network.executeRequest
import org.matrix.android.sdk.internal.task.Task
import org.greenrobot.eventbus.EventBus
import org.matrix.android.sdk.internal.util.awaitTransaction
import timber.log.Timber
import javax.inject.Inject
@ -39,7 +40,7 @@ internal class DefaultRefreshUserThreePidsTask @Inject constructor(private val p
Timber.d("Get ${accountThreePidsResponse.threePids?.size} threePids")
// Store the list in DB
monarchy.writeAsync { realm ->
monarchy.awaitTransaction { realm ->
realm.where(UserThreePidEntity::class.java).findAll().deleteAllFromRealm()
accountThreePidsResponse.threePids?.forEach {
val entity = UserThreePidEntity(

View File

@ -202,13 +202,13 @@ internal class DefaultRelationService @AssistedInject constructor(
private fun createEncryptEventWork(event: Event, keepKeys: List<String>?): OneTimeWorkRequest {
// Same parameter
val params = EncryptEventWorker.Params(sessionId, event, keepKeys)
val params = EncryptEventWorker.Params(sessionId, event.eventId!!, keepKeys)
val sendWorkData = WorkerParamsFactory.toData(params)
return timeLineSendEventWorkCommon.createWork<EncryptEventWorker>(sendWorkData, true)
}
private fun createSendEventWork(event: Event, startChain: Boolean): OneTimeWorkRequest {
val sendContentWorkerParams = SendEventWorker.Params(sessionId = sessionId, event = event)
val sendContentWorkerParams = SendEventWorker.Params(sessionId = sessionId, eventId = event.eventId!!)
val sendWorkData = WorkerParamsFactory.toData(sendContentWorkerParams)
return timeLineSendEventWorkCommon.createWork<SendEventWorker>(sendWorkData, startChain)
}

View File

@ -28,6 +28,7 @@ import org.matrix.android.sdk.api.session.room.model.relation.ReactionContent
import org.matrix.android.sdk.api.session.room.model.relation.ReactionInfo
import org.matrix.android.sdk.internal.network.executeRequest
import org.matrix.android.sdk.internal.session.room.RoomAPI
import org.matrix.android.sdk.internal.session.room.send.LocalEchoRepository
import org.matrix.android.sdk.internal.session.room.send.SendResponse
import org.matrix.android.sdk.internal.worker.SessionWorkerParams
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
@ -42,18 +43,18 @@ internal class SendRelationWorker(context: Context, params: WorkerParameters) :
internal data class Params(
override val sessionId: String,
val roomId: String,
val event: Event,
val eventId: String,
val relationType: String? = null,
override val lastFailureMessage: String? = null
) : SessionWorkerParams
@Inject lateinit var roomAPI: RoomAPI
@Inject lateinit var eventBus: EventBus
@Inject lateinit var localEchoRepository: LocalEchoRepository
override suspend fun doWork(): Result {
val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.failure()
.also { Timber.e("Unable to parse work parameters") }
if (params.lastFailureMessage != null) {
// Transmit the error
@ -64,8 +65,8 @@ internal class SendRelationWorker(context: Context, params: WorkerParameters) :
val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success()
sessionComponent.inject(this)
val localEvent = params.event
if (localEvent.eventId == null) {
val localEvent = localEchoRepository.getUpToDateEcho(params.eventId)
if (localEvent?.eventId == null) {
return Result.failure()
}
val relationContent = localEvent.content.toModel<ReactionContent>()

View File

@ -336,7 +336,7 @@ internal class DefaultSendService @AssistedInject constructor(
private fun createEncryptEventWork(event: Event, startChain: Boolean): OneTimeWorkRequest {
// Same parameter
return EncryptEventWorker.Params(sessionId, event)
return EncryptEventWorker.Params(sessionId, event.eventId ?: "")
.let { WorkerParamsFactory.toData(it) }
.let {
workManagerProvider.matrixOneTimeWorkRequestBuilder<EncryptEventWorker>()
@ -360,7 +360,10 @@ internal class DefaultSendService @AssistedInject constructor(
attachment: ContentAttachmentData,
isRoomEncrypted: Boolean,
compressBeforeSending: Boolean): OneTimeWorkRequest {
val uploadMediaWorkerParams = UploadContentWorker.Params(sessionId, allLocalEchos, attachment, isRoomEncrypted, compressBeforeSending)
val localEchoIds = allLocalEchos.map {
LocalEchoIdentifiers(it.roomId!!, it.eventId!!)
}
val uploadMediaWorkerParams = UploadContentWorker.Params(sessionId, localEchoIds, attachment, isRoomEncrypted, compressBeforeSending)
val uploadWorkData = WorkerParamsFactory.toData(uploadMediaWorkerParams)
return workManagerProvider.matrixOneTimeWorkRequestBuilder<UploadContentWorker>()

View File

@ -24,11 +24,13 @@ import com.squareup.moshi.JsonClass
import org.matrix.android.sdk.api.failure.Failure
import org.matrix.android.sdk.api.session.crypto.CryptoService
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.session.events.model.EventType
import org.matrix.android.sdk.api.session.events.model.toContent
import org.matrix.android.sdk.api.session.room.send.SendState
import org.matrix.android.sdk.internal.crypto.MXCRYPTO_ALGORITHM_MEGOLM
import org.matrix.android.sdk.internal.crypto.MXEventDecryptionResult
import org.matrix.android.sdk.internal.crypto.model.MXEncryptEventContentResult
import org.matrix.android.sdk.internal.database.mapper.ContentMapper
import org.matrix.android.sdk.internal.util.awaitCallback
import org.matrix.android.sdk.internal.worker.SessionWorkerParams
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
@ -46,7 +48,7 @@ internal class EncryptEventWorker(context: Context, params: WorkerParameters)
@JsonClass(generateAdapter = true)
internal data class Params(
override val sessionId: String,
val event: Event,
val eventId: String,
/** Do not encrypt these keys, keep them as is in encrypted content (e.g. m.relates_to) */
val keepKeys: List<String>? = null,
override val lastFailureMessage: String? = null
@ -60,20 +62,19 @@ internal class EncryptEventWorker(context: Context, params: WorkerParameters)
Timber.v("Start Encrypt work")
val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.success()
.also { Timber.e("Unable to parse work parameters") }
Timber.v("## SendEvent: Start Encrypt work for event ${params.event.eventId}")
if (params.lastFailureMessage != null) {
// Transmit the error
return Result.success(inputData)
.also { Timber.e("Work cancelled due to input error from parent") }
}
Timber.v("## SendEvent: Start Encrypt work for event ${params.eventId}")
val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success()
sessionComponent.inject(this)
val localEvent = params.event
if (localEvent.eventId == null) {
val localEvent = localEchoRepository.getUpToDateEcho(params.eventId)
if (localEvent?.eventId == null) {
return Result.success()
}
@ -106,15 +107,10 @@ internal class EncryptEventWorker(context: Context, params: WorkerParameters)
modifiedContent[toKeep] = it
}
}
val safeResult = result.copy(eventContent = modifiedContent)
val encryptedEvent = localEvent.copy(
type = safeResult.eventType,
content = safeResult.eventContent
)
// Better handling of local echo, to avoid decrypting transition on remote echo
// Should I only do it for text messages?
if (result.eventContent["algorithm"] == MXCRYPTO_ALGORITHM_MEGOLM) {
val decryptionLocalEcho = MXEventDecryptionResult(
val decryptionLocalEcho = if (result.eventContent["algorithm"] == MXCRYPTO_ALGORITHM_MEGOLM) {
MXEventDecryptionResult(
clearEvent = Event(
type = localEvent.type,
content = localEvent.content,
@ -124,10 +120,18 @@ internal class EncryptEventWorker(context: Context, params: WorkerParameters)
senderCurve25519Key = result.eventContent["sender_key"] as? String,
claimedEd25519Key = crypto.getMyDevice().fingerprint()
)
localEchoRepository.updateEncryptedEcho(localEvent.eventId, safeResult.eventContent, decryptionLocalEcho)
} else {
null
}
localEchoRepository.updateEcho(localEvent.eventId) { _, localEcho ->
localEcho.type = EventType.ENCRYPTED
localEcho.content = ContentMapper.map(modifiedContent)
decryptionLocalEcho?.also {
localEcho.setDecryptionResult(it)
}
}
val nextWorkerParams = SendEventWorker.Params(sessionId = params.sessionId, event = encryptedEvent)
val nextWorkerParams = SendEventWorker.Params(sessionId = params.sessionId, eventId = params.eventId)
return Result.success(WorkerParamsFactory.toData(nextWorkerParams))
} else {
val sendState = when (error) {
@ -138,7 +142,7 @@ internal class EncryptEventWorker(context: Context, params: WorkerParameters)
// always return success, or the chain will be stuck for ever!
val nextWorkerParams = SendEventWorker.Params(
sessionId = params.sessionId,
event = localEvent,
eventId = localEvent.eventId,
lastFailureMessage = error?.localizedMessage ?: "Error"
)
return Result.success(WorkerParamsFactory.toData(nextWorkerParams))

View File

@ -0,0 +1,25 @@
/*
* Copyright (c) 2020 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.room.send
import com.squareup.moshi.JsonClass
/**
* This is used as a holder to pass necessary data to some workers params.
*/
@JsonClass(generateAdapter = true)
data class LocalEchoIdentifiers(val roomId: String, val eventId: String)

View File

@ -18,8 +18,8 @@
package org.matrix.android.sdk.internal.session.room.send
import com.zhuinden.monarchy.Monarchy
import io.realm.Realm
import org.greenrobot.eventbus.EventBus
import org.matrix.android.sdk.api.session.events.model.Content
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.session.events.model.EventType
import org.matrix.android.sdk.api.session.events.model.toModel
@ -27,11 +27,11 @@ import org.matrix.android.sdk.api.session.room.model.message.MessageContent
import org.matrix.android.sdk.api.session.room.model.message.MessageType
import org.matrix.android.sdk.api.session.room.send.SendState
import org.matrix.android.sdk.api.session.room.timeline.TimelineEvent
import org.matrix.android.sdk.internal.crypto.MXEventDecryptionResult
import org.matrix.android.sdk.internal.database.RealmSessionProvider
import org.matrix.android.sdk.internal.database.asyncTransaction
import org.matrix.android.sdk.internal.database.helper.nextId
import org.matrix.android.sdk.internal.database.mapper.ContentMapper
import org.matrix.android.sdk.internal.database.mapper.TimelineEventMapper
import org.matrix.android.sdk.internal.database.mapper.asDomain
import org.matrix.android.sdk.internal.database.mapper.toEntity
import org.matrix.android.sdk.internal.database.model.EventEntity
import org.matrix.android.sdk.internal.database.model.EventInsertEntity
@ -44,11 +44,13 @@ import org.matrix.android.sdk.internal.di.SessionDatabase
import org.matrix.android.sdk.internal.session.room.membership.RoomMemberHelper
import org.matrix.android.sdk.internal.session.room.summary.RoomSummaryUpdater
import org.matrix.android.sdk.internal.session.room.timeline.DefaultTimeline
import org.matrix.android.sdk.internal.task.TaskExecutor
import org.matrix.android.sdk.internal.util.awaitTransaction
import timber.log.Timber
import javax.inject.Inject
internal class LocalEchoRepository @Inject constructor(@SessionDatabase private val monarchy: Monarchy,
private val taskExecutor: TaskExecutor,
private val realmSessionProvider: RealmSessionProvider,
private val roomSummaryUpdater: RoomSummaryUpdater,
private val eventBus: EventBus,
@ -76,12 +78,12 @@ internal class LocalEchoRepository @Inject constructor(@SessionDatabase private
}
val timelineEvent = timelineEventMapper.map(timelineEventEntity)
eventBus.post(DefaultTimeline.OnLocalEchoCreated(roomId = roomId, timelineEvent = timelineEvent))
monarchy.writeAsync { realm ->
taskExecutor.executorScope.asyncTransaction(monarchy) { realm ->
val eventInsertEntity = EventInsertEntity(event.eventId, event.type).apply {
this.insertType = EventInsertType.LOCAL_ECHO
}
realm.insert(eventInsertEntity)
val roomEntity = RoomEntity.where(realm, roomId = roomId).findFirst() ?: return@writeAsync
val roomEntity = RoomEntity.where(realm, roomId = roomId).findFirst() ?: return@asyncTransaction
roomEntity.sendingTimelineEvents.add(0, timelineEventEntity)
roomSummaryUpdater.updateSendingInformation(realm, roomId)
}
@ -89,30 +91,41 @@ internal class LocalEchoRepository @Inject constructor(@SessionDatabase private
fun updateSendState(eventId: String, sendState: SendState) {
Timber.v("## SendEvent: [${System.currentTimeMillis()}] Update local state of $eventId to ${sendState.name}")
monarchy.writeAsync { realm ->
updateEchoAsync(eventId) { realm, sendingEventEntity ->
if (sendState == SendState.SENT && sendingEventEntity.sendState == SendState.SYNCED) {
// If already synced, do not put as sent
} else {
sendingEventEntity.sendState = sendState
}
roomSummaryUpdater.updateSendingInformation(realm, sendingEventEntity.roomId)
}
}
suspend fun updateEcho(eventId: String, block: (realm: Realm, eventEntity: EventEntity) -> Unit) {
monarchy.awaitTransaction { realm ->
val sendingEventEntity = EventEntity.where(realm, eventId).findFirst()
if (sendingEventEntity != null) {
if (sendState == SendState.SENT && sendingEventEntity.sendState == SendState.SYNCED) {
// If already synced, do not put as sent
} else {
sendingEventEntity.sendState = sendState
}
roomSummaryUpdater.updateSendingInformation(realm, sendingEventEntity.roomId)
block(realm, sendingEventEntity)
}
}
}
fun updateEncryptedEcho(eventId: String, encryptedContent: Content, mxEventDecryptionResult: MXEventDecryptionResult) {
monarchy.writeAsync { realm ->
fun updateEchoAsync(eventId: String, block: (realm: Realm, eventEntity: EventEntity) -> Unit) {
taskExecutor.executorScope.asyncTransaction(monarchy) { realm ->
val sendingEventEntity = EventEntity.where(realm, eventId).findFirst()
if (sendingEventEntity != null) {
sendingEventEntity.type = EventType.ENCRYPTED
sendingEventEntity.content = ContentMapper.map(encryptedContent)
sendingEventEntity.setDecryptionResult(mxEventDecryptionResult)
block(realm, sendingEventEntity)
}
}
}
suspend fun getUpToDateEcho(eventId: String): Event? {
// We are using awaitTransaction here to make sure this executes after other transactions
return monarchy.awaitTransaction { realm ->
EventEntity.where(realm, eventId).findFirst()?.asDomain()
}
}
suspend fun deleteFailedEcho(roomId: String, localEcho: TimelineEvent) {
deleteFailedEcho(roomId, localEcho.eventId)
}
@ -150,7 +163,7 @@ internal class LocalEchoRepository @Inject constructor(@SessionDatabase private
return getAllEventsWithStates(roomId, SendState.HAS_FAILED_STATES)
}
fun getAllEventsWithStates(roomId: String, states : List<SendState>): List<TimelineEvent> {
fun getAllEventsWithStates(roomId: String, states: List<SendState>): List<TimelineEvent> {
return realmSessionProvider.withRealm { realm ->
TimelineEventEntity
.findAllInRoomWithSendStates(realm, roomId, states)

View File

@ -23,7 +23,6 @@ import androidx.work.CoroutineWorker
import androidx.work.OneTimeWorkRequest
import androidx.work.WorkerParameters
import com.squareup.moshi.JsonClass
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.session.room.send.SendState
import org.matrix.android.sdk.internal.di.WorkManagerProvider
import org.matrix.android.sdk.internal.session.content.UploadContentWorker
@ -48,7 +47,7 @@ internal class MultipleEventSendingDispatcherWorker(context: Context, params: Wo
@JsonClass(generateAdapter = true)
internal data class Params(
override val sessionId: String,
val events: List<Event>,
val localEchoIds: List<LocalEchoIdentifiers>,
val isEncrypted: Boolean,
override val lastFailureMessage: String? = null
) : SessionWorkerParams
@ -61,42 +60,42 @@ internal class MultipleEventSendingDispatcherWorker(context: Context, params: Wo
Timber.v("## SendEvent: Start dispatch sending multiple event work")
val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.success()
.also { Timber.e("Unable to parse work parameters") }
val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success()
sessionComponent.inject(this)
if (params.lastFailureMessage != null) {
params.events.forEach { event ->
event.eventId?.let { localEchoRepository.updateSendState(it, SendState.UNDELIVERED) }
params.localEchoIds.forEach { localEchoIds ->
localEchoRepository.updateSendState(localEchoIds.eventId, SendState.UNDELIVERED)
}
// Transmit the error if needed?
return Result.success(inputData)
.also { Timber.e("## SendEvent: Work cancelled due to input error from parent ${params.lastFailureMessage}") }
}
// Create a work for every event
params.events.forEach { event ->
params.localEchoIds.forEach { localEchoIds ->
val roomId = localEchoIds.roomId
val eventId = localEchoIds.eventId
if (params.isEncrypted) {
localEchoRepository.updateSendState(event.eventId ?: "", SendState.ENCRYPTING)
Timber.v("## SendEvent: [${System.currentTimeMillis()}] Schedule encrypt and send event ${event.eventId}")
val encryptWork = createEncryptEventWork(params.sessionId, event, true)
localEchoRepository.updateSendState(eventId, SendState.ENCRYPTING)
Timber.v("## SendEvent: [${System.currentTimeMillis()}] Schedule encrypt and send event $eventId")
val encryptWork = createEncryptEventWork(params.sessionId, eventId, true)
// Note that event will be replaced by the result of the previous work
val sendWork = createSendEventWork(params.sessionId, event, false)
timelineSendEventWorkCommon.postSequentialWorks(event.roomId!!, encryptWork, sendWork)
val sendWork = createSendEventWork(params.sessionId, eventId, false)
timelineSendEventWorkCommon.postSequentialWorks(roomId, encryptWork, sendWork)
} else {
localEchoRepository.updateSendState(event.eventId ?: "", SendState.SENDING)
Timber.v("## SendEvent: [${System.currentTimeMillis()}] Schedule send event ${event.eventId}")
val sendWork = createSendEventWork(params.sessionId, event, true)
timelineSendEventWorkCommon.postWork(event.roomId!!, sendWork)
localEchoRepository.updateSendState(eventId, SendState.SENDING)
Timber.v("## SendEvent: [${System.currentTimeMillis()}] Schedule send event $eventId")
val sendWork = createSendEventWork(params.sessionId, eventId, true)
timelineSendEventWorkCommon.postWork(roomId, sendWork)
}
}
return Result.success()
}
private fun createEncryptEventWork(sessionId: String, event: Event, startChain: Boolean): OneTimeWorkRequest {
val params = EncryptEventWorker.Params(sessionId, event)
private fun createEncryptEventWork(sessionId: String, eventId: String, startChain: Boolean): OneTimeWorkRequest {
val params = EncryptEventWorker.Params(sessionId, eventId)
val sendWorkData = WorkerParamsFactory.toData(params)
return workManagerProvider.matrixOneTimeWorkRequestBuilder<EncryptEventWorker>()
@ -107,8 +106,8 @@ internal class MultipleEventSendingDispatcherWorker(context: Context, params: Wo
.build()
}
private fun createSendEventWork(sessionId: String, event: Event, startChain: Boolean): OneTimeWorkRequest {
val sendContentWorkerParams = SendEventWorker.Params(sessionId = sessionId, event = event)
private fun createSendEventWork(sessionId: String, eventId: String, startChain: Boolean): OneTimeWorkRequest {
val sendContentWorkerParams = SendEventWorker.Params(sessionId = sessionId, eventId = eventId)
val sendWorkData = WorkerParamsFactory.toData(sendContentWorkerParams)
return timelineSendEventWorkCommon.createWork<SendEventWorker>(sendWorkData, startChain)

View File

@ -52,7 +52,6 @@ internal class RedactEventWorker(context: Context, params: WorkerParameters) : C
override suspend fun doWork(): Result {
val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.failure()
.also { Timber.e("Unable to parse work parameters") }
if (params.lastFailureMessage != null) {
// Transmit the error

View File

@ -56,7 +56,7 @@ internal class RoomEventSender @Inject constructor(
private fun createEncryptEventWork(event: Event, startChain: Boolean): OneTimeWorkRequest {
// Same parameter
val params = EncryptEventWorker.Params(sessionId, event)
val params = EncryptEventWorker.Params(sessionId, event.eventId!!)
val sendWorkData = WorkerParamsFactory.toData(params)
return workManagerProvider.matrixOneTimeWorkRequestBuilder<EncryptEventWorker>()
@ -68,7 +68,7 @@ internal class RoomEventSender @Inject constructor(
}
private fun createSendEventWork(event: Event, startChain: Boolean): OneTimeWorkRequest {
val sendContentWorkerParams = SendEventWorker.Params(sessionId = sessionId, event = event)
val sendContentWorkerParams = SendEventWorker.Params(sessionId = sessionId, eventId = event.eventId!!)
val sendWorkData = WorkerParamsFactory.toData(sendContentWorkerParams)
return timelineSendEventWorkCommon.createWork<SendEventWorker>(sendWorkData, startChain)

View File

@ -21,11 +21,12 @@ import android.content.Context
import androidx.work.CoroutineWorker
import androidx.work.WorkerParameters
import com.squareup.moshi.JsonClass
import io.realm.RealmConfiguration
import org.greenrobot.eventbus.EventBus
import org.matrix.android.sdk.api.failure.shouldBeRetried
import org.matrix.android.sdk.api.session.events.model.Content
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.session.room.send.SendState
import org.matrix.android.sdk.internal.di.SessionDatabase
import org.matrix.android.sdk.internal.network.executeRequest
import org.matrix.android.sdk.internal.session.room.RoomAPI
import org.matrix.android.sdk.internal.worker.SessionWorkerParams
@ -48,29 +49,25 @@ internal class SendEventWorker(context: Context,
internal data class Params(
override val sessionId: String,
override val lastFailureMessage: String? = null,
val event: Event? = null,
// Keep for compat at the moment, will be removed later
val eventId: String? = null
val eventId: String
) : SessionWorkerParams
@Inject lateinit var localEchoRepository: LocalEchoRepository
@Inject lateinit var roomAPI: RoomAPI
@Inject lateinit var eventBus: EventBus
@Inject lateinit var cancelSendTracker: CancelSendTracker
@SessionDatabase @Inject lateinit var realmConfiguration: RealmConfiguration
override suspend fun doWork(): Result {
val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.success()
.also { Timber.e("## SendEvent: Unable to parse work parameters") }
val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success()
sessionComponent.inject(this)
val event = params.event
val event = localEchoRepository.getUpToDateEcho(params.eventId)
if (event?.eventId == null || event.roomId == null) {
// Old way of sending
if (params.eventId != null) {
localEchoRepository.updateSendState(params.eventId, SendState.UNDELIVERED)
}
localEchoRepository.updateSendState(params.eventId, SendState.UNDELIVERED)
return Result.success()
.also { Timber.e("Work cancelled due to bad input data") }
}

View File

@ -18,9 +18,9 @@
package org.matrix.android.sdk.internal.util
import com.zhuinden.monarchy.Monarchy
import org.matrix.android.sdk.internal.database.awaitTransaction
import io.realm.Realm
import io.realm.RealmModel
import org.matrix.android.sdk.internal.database.awaitTransaction
import java.util.concurrent.atomic.AtomicReference
internal suspend fun <T> Monarchy.awaitTransaction(transaction: suspend (realm: Realm) -> T): T {

View File

@ -18,6 +18,7 @@
package org.matrix.android.sdk.internal.worker
import androidx.work.Data
import org.matrix.android.sdk.api.extensions.tryThis
import org.matrix.android.sdk.internal.di.MoshiProvider
import org.matrix.android.sdk.internal.network.parsing.CheckNumberType
@ -41,7 +42,7 @@ internal object WorkerParamsFactory {
return Data.Builder().putString(KEY, json).build()
}
inline fun <reified T> fromData(data: Data): T? {
inline fun <reified T> fromData(data: Data): T? = tryThis("Unable to parse work parameters") {
val json = data.getString(KEY)
return if (json == null) {
null