Align with web implementation

This commit is contained in:
David Langley 2022-03-25 22:51:34 +00:00
parent 21e46c5840
commit 7e930472e8
7 changed files with 66 additions and 122 deletions

View File

@ -21,13 +21,9 @@ import org.matrix.android.sdk.api.util.JsonDict
interface LiveEventListener {
fun onLiveEvent(roomId: String, event: Event)
fun onEventDecrypted(event: Event)
fun onPaginatedEvent(roomId: String, event: Event)
fun onEventDecrypted(eventId: String, roomId: String, clearEvent: JsonDict)
fun onEventDecryptionError(eventId: String, roomId: String, throwable: Throwable)
fun onEventDecryptionError(event: Event, throwable: Throwable)
fun onLiveToDeviceEvent(event: Event)

View File

@ -113,7 +113,7 @@ internal class MXMegolmDecryption(private val userId: String,
forwardingCurve25519KeyChain = olmDecryptionResult.forwardingCurve25519KeyChain
.orEmpty()
).also {
liveEventManager.get().dispatchLiveEventDecrypted(event, it)
liveEventManager.get().dispatchLiveEventDecrypted(event)
}
} else {
throw MXCryptoError.Base(MXCryptoError.ErrorType.MISSING_FIELDS, MXCryptoError.MISSING_FIELDS_REASON)

View File

@ -42,36 +42,12 @@ internal class StreamEventsManager @Inject constructor() {
listeners.remove(listener)
}
fun dispatchLiveEventReceived(event: Event, roomId: String, initialSync: Boolean) {
Timber.v("## dispatchLiveEventReceived ${event.eventId}")
coroutineScope.launch {
if (!initialSync) {
listeners.forEach {
tryOrNull {
it.onLiveEvent(roomId, event)
}
}
}
}
}
fun dispatchPaginatedEventReceived(event: Event, roomId: String) {
Timber.v("## dispatchPaginatedEventReceived ${event.eventId}")
coroutineScope.launch {
listeners.forEach {
tryOrNull {
it.onPaginatedEvent(roomId, event)
}
}
}
}
fun dispatchLiveEventDecrypted(event: Event, result: MXEventDecryptionResult) {
fun dispatchLiveEventDecrypted(event: Event) {
Timber.v("## dispatchLiveEventDecrypted ${event.eventId}")
coroutineScope.launch {
listeners.forEach {
tryOrNull {
it.onEventDecrypted(event.eventId ?: "", event.roomId ?: "", result.clearEvent)
it.onEventDecrypted(event)
}
}
}
@ -82,7 +58,7 @@ internal class StreamEventsManager @Inject constructor() {
coroutineScope.launch {
listeners.forEach {
tryOrNull {
it.onEventDecryptionError(event.eventId ?: "", event.roomId ?: "", error)
it.onEventDecryptionError(event, error)
}
}
}

View File

@ -186,7 +186,7 @@ internal class TokenChunkEventPersistor @Inject constructor(
}
roomMemberContentsByUser[event.stateKey] = contentToUse.toModel<RoomMemberContent>()
}
liveEventManager.get().dispatchPaginatedEventReceived(event, roomId)
currentChunk.addTimelineEvent(
roomId = roomId,
eventEntity = eventEntity,

View File

@ -382,7 +382,6 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
}
eventIds.add(event.eventId)
liveEventService.get().dispatchLiveEventReceived(event, roomId, insertType == EventInsertType.INITIAL_SYNC)
val isInitialSync = insertType == EventInsertType.INITIAL_SYNC

View File

@ -17,23 +17,31 @@
package im.vector.app
import android.content.SharedPreferences
import androidx.lifecycle.asFlow
import im.vector.app.core.di.ActiveSessionHolder
import im.vector.app.features.rageshake.BugReporter
import im.vector.app.features.rageshake.ReportType
import im.vector.app.features.session.coroutineScope
import im.vector.app.features.settings.VectorPreferences
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.cancellable
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.subscribe
import kotlinx.coroutines.launch
import org.matrix.android.sdk.api.session.Session
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.session.events.model.toContent
import org.matrix.android.sdk.api.session.initsync.SyncStatusService
import org.matrix.android.sdk.api.session.sync.SyncState
import org.matrix.android.sdk.flow.flow
import timber.log.Timber
import javax.inject.Inject
import javax.inject.Singleton
@ -62,10 +70,11 @@ class AutoRageShaker @Inject constructor(
private val e2eDetectedFlow = MutableSharedFlow<E2EMessageDetected>(replay = 0)
private val matchingRSRequestFlow = MutableSharedFlow<Event>(replay = 0)
var hasSynced = false
var preferenceEnabled = false
fun initialize() {
observeActiveSession()
enable(vectorPreferences.labsAutoReportUISI())
preferenceEnabled = vectorPreferences.labsAutoReportUISI()
// It's a singleton...
vectorPreferences.subscribeToChanges(this)
@ -74,7 +83,7 @@ class AutoRageShaker @Inject constructor(
e2eDetectedFlow
.onEach {
sendRageShake(it)
delay(2_000)
delay(60_000)
}
.catch { cause ->
Timber.w(cause, "Failed to RS")
@ -84,7 +93,7 @@ class AutoRageShaker @Inject constructor(
matchingRSRequestFlow
.onEach {
sendMatchingRageShake(it)
delay(2_000)
delay(60_000)
}
.catch { cause ->
Timber.w(cause, "Failed to send matching rageshake")
@ -93,14 +102,7 @@ class AutoRageShaker @Inject constructor(
}
override fun onSharedPreferenceChanged(sharedPreferences: SharedPreferences?, key: String?) {
enable(vectorPreferences.labsAutoReportUISI())
}
var _enabled = false
fun enable(enabled: Boolean) {
if (enabled == _enabled) return
_enabled = enabled
detector.enabled = enabled
preferenceEnabled = vectorPreferences.labsAutoReportUISI()
}
private fun observeActiveSession() {
@ -115,7 +117,6 @@ class AutoRageShaker @Inject constructor(
}
fun decryptionErrorDetected(target: E2EMessageDetected) {
if (target.source == UISIEventSource.INITIAL_SYNC) return
if (activeSessionHolder.getSafeActiveSession()?.sessionId != currentActiveSessionId) return
val shouldSendRS = synchronized(alreadyReportedUisi) {
val reportInfo = ReportInfo(target.roomId, target.sessionId)
@ -148,7 +149,6 @@ class AutoRageShaker @Inject constructor(
append("\"room_id\": \"${target.roomId}\",")
append("\"sender_key\": \"${target.senderKey}\",")
append("\"device_id\": \"${target.senderDeviceId}\",")
append("\"source\": \"${target.source}\",")
append("\"user_id\": \"${target.senderUserId}\",")
append("\"session_id\": \"${target.sessionId}\"")
append("}")
@ -245,6 +245,9 @@ class AutoRageShaker @Inject constructor(
override val reciprocateToDeviceEventType: String
get() = AUTO_RS_REQUEST
override val enabled: Boolean
get() = this@AutoRageShaker.preferenceEnabled && this@AutoRageShaker.hasSynced
override fun uisiDetected(source: E2EMessageDetected) {
decryptionErrorDetected(source)
}
@ -261,7 +264,14 @@ class AutoRageShaker @Inject constructor(
return
}
this.currentActiveSessionId = sessionId
this.detector.enabled = _enabled
hasSynced = session.hasAlreadySynced()
session.getSyncStatusLive()
.asFlow()
.onEach {
hasSynced = it !is SyncStatusService.Status.Progressing
}
.launchIn(session.coroutineScope)
activeSessionIds.add(sessionId)
session.addListener(this)
session.addEventStreamListener(detector)

View File

@ -16,6 +16,7 @@
package im.vector.app
import org.matrix.android.sdk.api.extensions.orFalse
import org.matrix.android.sdk.api.session.LiveEventListener
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.session.events.model.toModel
@ -26,23 +27,17 @@ import java.util.Timer
import java.util.TimerTask
import java.util.concurrent.Executors
enum class UISIEventSource {
INITIAL_SYNC,
INCREMENTAL_SYNC,
PAGINATION
}
data class E2EMessageDetected(
val eventId: String,
val roomId: String,
val senderUserId: String,
val senderDeviceId: String,
val senderKey: String,
val sessionId: String,
val source: UISIEventSource) {
val sessionId: String
) {
companion object {
fun fromEvent(event: Event, roomId: String, source: UISIEventSource): E2EMessageDetected {
fun fromEvent(event: Event, roomId: String): E2EMessageDetected {
val encryptedContent = event.content.toModel<EncryptedEventContent>()
return E2EMessageDetected(
@ -51,8 +46,7 @@ data class E2EMessageDetected(
senderUserId = event.senderId ?: "",
senderDeviceId = encryptedContent?.deviceId ?: "",
senderKey = encryptedContent?.senderKey ?: "",
sessionId = encryptedContent?.sessionId ?: "",
source = source
sessionId = encryptedContent?.sessionId ?: ""
)
}
}
@ -61,6 +55,7 @@ data class E2EMessageDetected(
class UISIDetector : LiveEventListener {
interface UISIDetectorCallback {
val enabled: Boolean
val reciprocateToDeviceEventType: String
fun uisiDetected(source: E2EMessageDetected)
fun uisiReciprocateRequest(source: Event)
@ -68,30 +63,16 @@ class UISIDetector : LiveEventListener {
var callback: UISIDetectorCallback? = null
private val trackedEvents = mutableListOf<Pair<E2EMessageDetected, TimerTask>>()
private val trackedEvents = mutableMapOf<String, TimerTask>()
private val executor = Executors.newSingleThreadExecutor()
private val timer = Timer()
private val timeoutMillis = 30_000L
var enabled = false
val enabled: Boolean get() = callback?.enabled.orFalse()
override fun onLiveEvent(roomId: String, event: Event) {
if (!enabled) return
if (!event.isEncrypted()) return
executor.execute {
handleEventReceived(E2EMessageDetected.fromEvent(event, roomId, UISIEventSource.INCREMENTAL_SYNC))
}
}
override fun onPaginatedEvent(roomId: String, event: Event) {
if (!enabled) return
if (!event.isEncrypted()) return
executor.execute {
handleEventReceived(E2EMessageDetected.fromEvent(event, roomId, UISIEventSource.PAGINATION))
}
}
override fun onEventDecrypted(eventId: String, roomId: String, clearEvent: JsonDict) {
if (!enabled) return
override fun onEventDecrypted(event: Event) {
val eventId = event.eventId
val roomId = event.roomId
if (!enabled || eventId == null || roomId == null) return
executor.execute {
unTrack(eventId, roomId)
}
@ -104,57 +85,39 @@ class UISIDetector : LiveEventListener {
}
}
override fun onEventDecryptionError(eventId: String, roomId: String, throwable: Throwable) {
if (!enabled) return
executor.execute {
unTrack(eventId, roomId)?.let {
triggerUISI(it)
}
// if (throwable is MXCryptoError.OlmError) {
// if (throwable.olmException.message == "UNKNOWN_MESSAGE_INDEX") {
// unTrack(eventId, roomId)?.let {
// triggerUISI(it)
// }
// }
// }
}
}
override fun onEventDecryptionError(event: Event, throwable: Throwable) {
val eventId = event.eventId
val roomId = event.roomId
if (!enabled || eventId == null || roomId == null) return
private fun handleEventReceived(detectorEvent: E2EMessageDetected) {
if (!enabled) return
if (trackedEvents.any { it.first == detectorEvent }) {
Timber.w("## UISIDetector: Event ${detectorEvent.eventId} is already tracked")
} else {
// track it and start timer
val timeoutTask = object : TimerTask() {
override fun run() {
executor.execute {
unTrack(detectorEvent.eventId, detectorEvent.roomId)
Timber.v("## UISIDetector: Timeout on ${detectorEvent.eventId} ")
triggerUISI(detectorEvent)
}
val trackerId: String = trackerId(eventId, roomId)
if (trackedEvents.containsKey(trackerId)) {
Timber.w("## UISIDetector: Event $eventId is already tracked")
return
}
// track it and start timer
val timeoutTask = object : TimerTask() {
override fun run() {
executor.execute {
unTrack(eventId, roomId)
Timber.v("## UISIDetector: Timeout on $eventId")
triggerUISI(E2EMessageDetected.fromEvent(event, roomId))
}
}
trackedEvents.add(detectorEvent to timeoutTask)
timer.schedule(timeoutTask, timeoutMillis)
}
trackedEvents[trackerId] = timeoutTask
timer.schedule(timeoutTask, timeoutMillis)
}
private fun trackerId(eventId: String, roomId: String): String = "$roomId-$eventId"
private fun triggerUISI(source: E2EMessageDetected) {
if (!enabled) return
Timber.i("## UISIDetector: Unable To Decrypt $source")
callback?.uisiDetected(source)
}
private fun unTrack(eventId: String, roomId: String): E2EMessageDetected? {
val index = trackedEvents.indexOfFirst { it.first.eventId == eventId && it.first.roomId == roomId }
return if (index != -1) {
trackedEvents.removeAt(index).let {
it.second.cancel()
it.first
}
} else {
null
}
private fun unTrack(eventId: String, roomId: String) {
trackedEvents.remove(trackerId(eventId, roomId))?.cancel()
}
}