Rebranch timeline + continue clean up strategy

This commit is contained in:
ganfra 2020-07-03 21:11:54 +02:00
parent 3648d6292a
commit 283f32479d
8 changed files with 169 additions and 108 deletions

View File

@ -0,0 +1,98 @@
/*
* Copyright (c) 2020 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.database
import im.vector.matrix.android.internal.database.helper.nextDisplayIndex
import im.vector.matrix.android.internal.database.model.ChunkEntity
import im.vector.matrix.android.internal.database.model.ChunkEntityFields
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.model.RoomEntity
import im.vector.matrix.android.internal.database.model.TimelineEventEntity
import im.vector.matrix.android.internal.database.model.TimelineEventEntityFields
import im.vector.matrix.android.internal.di.SessionDatabase
import im.vector.matrix.android.internal.session.SessionLifecycleObserver
import im.vector.matrix.android.internal.session.room.timeline.PaginationDirection
import im.vector.matrix.android.internal.task.TaskExecutor
import io.realm.Realm
import io.realm.RealmConfiguration
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import timber.log.Timber
import javax.inject.Inject
private const val MAX_NUMBER_OF_EVENTS = 35_000L
private const val MIN_NUMBER_OF_EVENTS_BY_CHUNK = 300
/**
* This class makes sure to stay under a maximum number of events as it makes Realm to be unusable when listening to events
* when the database is getting too big.
*/
internal class DatabaseCleaner @Inject constructor(@SessionDatabase private val realmConfiguration: RealmConfiguration,
private val taskExecutor: TaskExecutor) : SessionLifecycleObserver {
override fun onStart() {
taskExecutor.executorScope.launch(Dispatchers.Default) {
awaitTransaction(realmConfiguration) { realm ->
val allRooms = realm.where(RoomEntity::class.java).findAll()
Timber.v("There are ${allRooms.size} rooms in this session")
cleanUp(realm, MAX_NUMBER_OF_EVENTS / 2L)
}
}
}
private suspend fun cleanUp(realm: Realm, threshold: Long) {
val numberOfEvents = realm.where(EventEntity::class.java).findAll().size
val numberOfTimelineEvents = realm.where(TimelineEventEntity::class.java).findAll().size
Timber.v("Number of events in db: $numberOfEvents | Number of timeline events in db: $numberOfTimelineEvents")
if (threshold <= MIN_NUMBER_OF_EVENTS_BY_CHUNK || numberOfTimelineEvents < MAX_NUMBER_OF_EVENTS) {
Timber.v("Db is low enough")
} else {
val thresholdChunks = realm.where(ChunkEntity::class.java)
.greaterThan(ChunkEntityFields.NUMBER_OF_TIMELINE_EVENTS, threshold)
.findAll()
Timber.v("There are ${thresholdChunks.size} chunks to clean with more than $threshold events")
for (chunk in thresholdChunks) {
val maxDisplayIndex = chunk.nextDisplayIndex(PaginationDirection.FORWARDS)
val thresholdDisplayIndex = maxDisplayIndex - threshold
val eventsToRemove = chunk.timelineEvents.where().lessThan(TimelineEventEntityFields.DISPLAY_INDEX, thresholdDisplayIndex).findAll()
Timber.v("There are ${eventsToRemove.size} events to clean in chunk: ${chunk.identifier()} from room ${chunk.room?.first()?.roomId}")
chunk.numberOfTimelineEvents = chunk.numberOfTimelineEvents - eventsToRemove.size
eventsToRemove.forEach {
val canDeleteRoot = it.root?.stateKey == null
if (canDeleteRoot) {
it.root?.deleteFromRealm()
}
it.readReceipts?.readReceipts?.deleteAllFromRealm()
it.readReceipts?.deleteFromRealm()
it.annotations?.apply {
editSummary?.deleteFromRealm()
pollResponseSummary?.deleteFromRealm()
referencesSummaryEntity?.deleteFromRealm()
reactionsSummary.deleteAllFromRealm()
}
it.annotations?.deleteFromRealm()
it.readReceipts?.deleteFromRealm()
it.deleteFromRealm()
}
// We reset the prevToken so we will need to fetch again.
chunk.prevToken = null
}
cleanUp(realm, (threshold / 1.5).toLong())
}
}
}

View File

@ -164,49 +164,6 @@ internal class DefaultSession @Inject constructor(
}
eventBus.register(this)
timelineEventDecryptor.start()
taskExecutor.executorScope.launch(Dispatchers.Default) {
awaitTransaction(realmConfiguration) { realm ->
val allRooms = realm.where(RoomEntity::class.java).findAll()
val numberOfEvents = realm.where(EventEntity::class.java).findAll().size
val numberOfTimelineEvents = realm.where(TimelineEventEntity::class.java).findAll().size
Timber.v("Number of events in db: $numberOfEvents | Number of timeline events in db: $numberOfTimelineEvents")
Timber.v("Number of rooms in db: ${allRooms.size}")
if (numberOfTimelineEvents < 30_000L) {
Timber.v("Db is low enough")
} else {
val hugeChunks = realm.where(ChunkEntity::class.java).greaterThan(ChunkEntityFields.NUMBER_OF_TIMELINE_EVENTS, 250).findAll()
Timber.v("There are ${hugeChunks.size} chunks to clean")
/*
for (chunk in hugeChunks) {
val maxDisplayIndex = chunk.nextDisplayIndex(PaginationDirection.FORWARDS)
val thresholdDisplayIndex = maxDisplayIndex - 250
val eventsToRemove = chunk.timelineEvents.where().lessThan(TimelineEventEntityFields.DISPLAY_INDEX, thresholdDisplayIndex).findAll()
Timber.v("There are ${eventsToRemove.size} events to clean in chunk: ${chunk.identifier()} from room ${chunk.room?.first()?.roomId}")
chunk.numberOfTimelineEvents = chunk.numberOfTimelineEvents - eventsToRemove.size
eventsToRemove.forEach {
val canDeleteRoot = it.root?.stateKey == null
if (canDeleteRoot) {
it.root?.deleteFromRealm()
}
it.readReceipts?.readReceipts?.deleteAllFromRealm()
it.readReceipts?.deleteFromRealm()
it.annotations?.apply {
editSummary?.deleteFromRealm()
pollResponseSummary?.deleteFromRealm()
referencesSummaryEntity?.deleteFromRealm()
reactionsSummary.deleteAllFromRealm()
}
it.annotations?.deleteFromRealm()
it.readReceipts?.deleteFromRealm()
it.deleteFromRealm()
}
}
*/
}
}
}
}
override fun requireBackgroundSync() {

View File

@ -40,6 +40,7 @@ import im.vector.matrix.android.api.session.typing.TypingUsersTracker
import im.vector.matrix.android.internal.crypto.crosssigning.ShieldTrustUpdater
import im.vector.matrix.android.internal.crypto.secrets.DefaultSharedSecretStorageService
import im.vector.matrix.android.internal.crypto.verification.VerificationMessageProcessor
import im.vector.matrix.android.internal.database.DatabaseCleaner
import im.vector.matrix.android.internal.database.EventInsertLiveObserver
import im.vector.matrix.android.internal.database.SessionRealmConfigurationFactory
import im.vector.matrix.android.internal.di.Authenticated
@ -340,6 +341,10 @@ internal abstract class SessionModule {
@IntoSet
abstract fun bindIdentityService(observer: DefaultIdentityService): SessionLifecycleObserver
@Binds
@IntoSet
abstract fun bindDatabaseCleaner(observer: DatabaseCleaner): SessionLifecycleObserver
@Binds
abstract fun bindInitialSyncProgressService(service: DefaultInitialSyncProgressService): InitialSyncProgressService

View File

@ -62,10 +62,10 @@ import im.vector.matrix.android.internal.session.room.tags.AddTagToRoomTask
import im.vector.matrix.android.internal.session.room.tags.DefaultAddTagToRoomTask
import im.vector.matrix.android.internal.session.room.tags.DefaultDeleteTagFromRoomTask
import im.vector.matrix.android.internal.session.room.tags.DeleteTagFromRoomTask
import im.vector.matrix.android.internal.session.room.timeline.DefaultFetchNextTokenAndPaginateTask
import im.vector.matrix.android.internal.session.room.timeline.DefaultFetchTokenAndPaginateTask
import im.vector.matrix.android.internal.session.room.timeline.DefaultGetContextOfEventTask
import im.vector.matrix.android.internal.session.room.timeline.DefaultPaginationTask
import im.vector.matrix.android.internal.session.room.timeline.FetchNextTokenAndPaginateTask
import im.vector.matrix.android.internal.session.room.timeline.FetchTokenAndPaginateTask
import im.vector.matrix.android.internal.session.room.timeline.GetContextOfEventTask
import im.vector.matrix.android.internal.session.room.timeline.PaginationTask
import im.vector.matrix.android.internal.session.room.typing.DefaultSendTypingTask
@ -176,7 +176,7 @@ internal abstract class RoomModule {
abstract fun bindPaginationTask(task: DefaultPaginationTask): PaginationTask
@Binds
abstract fun bindFetchNextTokenAndPaginateTask(task: DefaultFetchNextTokenAndPaginateTask): FetchNextTokenAndPaginateTask
abstract fun bindFetchNextTokenAndPaginateTask(task: DefaultFetchTokenAndPaginateTask): FetchTokenAndPaginateTask
@Binds
abstract fun bindFetchEditHistoryTask(task: DefaultFetchEditHistoryTask): FetchEditHistoryTask

View File

@ -29,17 +29,14 @@ import im.vector.matrix.android.api.session.room.timeline.TimelineEvent
import im.vector.matrix.android.api.session.room.timeline.TimelineSettings
import im.vector.matrix.android.api.util.CancelableBag
import im.vector.matrix.android.internal.database.mapper.TimelineEventMapper
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.model.ChunkEntity
import im.vector.matrix.android.internal.database.model.ChunkEntityFields
import im.vector.matrix.android.internal.database.model.EventAnnotationsSummaryEntity
import im.vector.matrix.android.internal.database.model.RoomEntity
import im.vector.matrix.android.internal.database.model.TimelineEventEntity
import im.vector.matrix.android.internal.database.model.TimelineEventEntityFields
import im.vector.matrix.android.internal.database.query.TimelineEventFilter
import im.vector.matrix.android.internal.database.query.findAllInRoomWithSendStates
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.database.query.whereInRoom
import im.vector.matrix.android.internal.database.query.whereRoomId
import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.task.configureWith
@ -72,7 +69,7 @@ internal class DefaultTimeline(
private val realmConfiguration: RealmConfiguration,
private val taskExecutor: TaskExecutor,
private val contextOfEventTask: GetContextOfEventTask,
private val fetchNextTokenAndPaginateTask: FetchNextTokenAndPaginateTask,
private val fetchTokenAndPaginateTask: FetchTokenAndPaginateTask,
private val paginationTask: PaginationTask,
private val timelineEventMapper: TimelineEventMapper,
private val settings: TimelineSettings,
@ -98,9 +95,7 @@ internal class DefaultTimeline(
private lateinit var nonFilteredEvents: RealmResults<TimelineEventEntity>
private lateinit var filteredEvents: RealmResults<TimelineEventEntity>
private lateinit var eventRelations: RealmResults<EventAnnotationsSummaryEntity>
private var roomEntity: RoomEntity? = null
private lateinit var sendingEvents: RealmResults<TimelineEventEntity>
private var prevDisplayIndex: Int? = null
private var nextDisplayIndex: Int? = null
@ -119,23 +114,10 @@ internal class DefaultTimeline(
if (!results.isLoaded || !results.isValid) {
return@OrderedRealmCollectionChangeListener
}
results.createSnapshot()
handleUpdates(results, changeSet)
}
private val relationsListener = OrderedRealmCollectionChangeListener<RealmResults<EventAnnotationsSummaryEntity>> { collection, changeSet ->
var hasChange = false
(changeSet.insertions + changeSet.changes).forEach {
val eventRelations = collection[it]
if (eventRelations != null) {
hasChange = rebuildEvent(eventRelations.eventId) { te ->
te.copy(annotations = eventRelations.asDomain())
} || hasChange
}
}
if (hasChange) postSnapshot()
}
// Public methods ******************************************************************************
override fun paginate(direction: Timeline.Direction, count: Int) {
@ -173,15 +155,23 @@ internal class DefaultTimeline(
val realm = Realm.getInstance(realmConfiguration)
backgroundRealm.set(realm)
roomEntity = RoomEntity.where(realm, roomId = roomId).findFirst()
val roomEntity = RoomEntity.where(realm, roomId = roomId).findFirst()
?: throw IllegalStateException("Can't open a timeline without a room")
sendingEvents = roomEntity.sendingTimelineEvents.where().filterEventsWithSettings().findAll()
sendingEvents.addChangeListener { events ->
// Remove in memory as soon as they are known by database
events.forEach { te ->
inMemorySendingEvents.removeAll { te.eventId == it.eventId }
}
postSnapshot()
}
nonFilteredEvents = buildEventQuery(realm).sort(TimelineEventEntityFields.DISPLAY_INDEX, Sort.DESCENDING).findAll()
filteredEvents = nonFilteredEvents.where()
.filterEventsWithSettings()
.findAll()
filteredEvents.addChangeListener(eventsChangeListener)
handleInitialLoad()
eventRelations = EventAnnotationsSummaryEntity.whereInRoom(realm, roomId)
.findAllAsync()
if (settings.shouldHandleHiddenReadReceipts()) {
hiddenReadReceipts.start(realm, filteredEvents, nonFilteredEvents, this)
}
@ -202,9 +192,8 @@ internal class DefaultTimeline(
cancelableBag.cancel()
BACKGROUND_HANDLER.removeCallbacksAndMessages(null)
BACKGROUND_HANDLER.post {
roomEntity?.sendingTimelineEvents?.removeAllChangeListeners()
if (this::eventRelations.isInitialized) {
eventRelations.removeAllChangeListeners()
if (this::sendingEvents.isInitialized) {
sendingEvents.removeAllChangeListeners()
}
if (this::nonFilteredEvents.isInitialized) {
nonFilteredEvents.removeAllChangeListeners()
@ -303,7 +292,7 @@ internal class DefaultTimeline(
listeners.clear()
}
// TimelineHiddenReadReceipts.Delegate
// TimelineHiddenReadReceipts.Delegate
override fun rebuildEvent(eventId: String, readReceipts: List<ReadReceipt>): Boolean {
return rebuildEvent(eventId) { te ->
@ -336,7 +325,7 @@ internal class DefaultTimeline(
}
}
// Private methods *****************************************************************************
// Private methods *****************************************************************************
private fun rebuildEvent(eventId: String, builder: (TimelineEvent) -> TimelineEvent): Boolean {
return builtEventsIdMap[eventId]?.let { builtIndex ->
@ -400,20 +389,16 @@ internal class DefaultTimeline(
}
private fun buildSendingEvents(): List<TimelineEvent> {
val sendingEvents = ArrayList<TimelineEvent>()
val builtSendingEvents = ArrayList<TimelineEvent>()
if (hasReachedEnd(Timeline.Direction.FORWARDS) && !hasMoreInCache(Timeline.Direction.FORWARDS)) {
sendingEvents.addAll(inMemorySendingEvents.filterEventsWithSettings())
roomEntity?.sendingTimelineEvents
?.where()
?.filterEventsWithSettings()
?.findAll()
?.forEach { timelineEventEntity ->
if (sendingEvents.find { it.eventId == timelineEventEntity.eventId } == null) {
sendingEvents.add(timelineEventMapper.map(timelineEventEntity))
}
}
builtSendingEvents.addAll(inMemorySendingEvents.filterEventsWithSettings())
sendingEvents.forEach { timelineEventEntity ->
if (builtSendingEvents.find { it.eventId == timelineEventEntity.eventId } == null) {
builtSendingEvents.add(timelineEventMapper.map(timelineEventEntity))
}
}
}
return sendingEvents
return builtSendingEvents
}
private fun canPaginate(direction: Timeline.Direction): Boolean {
@ -514,19 +499,25 @@ internal class DefaultTimeline(
val currentChunk = getLiveChunk()
val token = if (direction == Timeline.Direction.BACKWARDS) currentChunk?.prevToken else currentChunk?.nextToken
if (token == null) {
if (direction == Timeline.Direction.FORWARDS && currentChunk?.hasBeenALastForwardChunk().orFalse()) {
// We are in the case that next event exists, but we do not know the next token.
// Fetch (again) the last event to get a nextToken
val lastKnownEventId = nonFilteredEvents.firstOrNull()?.eventId
if (direction == Timeline.Direction.BACKWARDS ||
(direction == Timeline.Direction.FORWARDS && currentChunk?.hasBeenALastForwardChunk().orFalse())) {
// We are in the case where event exists, but we do not know the token.
// Fetch (again) the last event to get a token
val lastKnownEventId = if (direction == Timeline.Direction.FORWARDS) {
nonFilteredEvents.firstOrNull()?.eventId
} else {
nonFilteredEvents.lastOrNull()?.eventId
}
if (lastKnownEventId == null) {
updateState(direction) { it.copy(isPaginating = false, requestedPaginationCount = 0) }
} else {
val params = FetchNextTokenAndPaginateTask.Params(
val params = FetchTokenAndPaginateTask.Params(
roomId = roomId,
limit = limit,
direction = direction.toPaginationDirection(),
lastKnownEventId = lastKnownEventId
)
cancelableBag += fetchNextTokenAndPaginateTask
cancelableBag += fetchTokenAndPaginateTask
.configureWith(params) {
this.callback = createPaginationCallback(limit, direction)
}
@ -755,7 +746,7 @@ internal class DefaultTimeline(
}
}
// Extension methods ***************************************************************************
// Extension methods ***************************************************************************
private fun Timeline.Direction.toPaginationDirection(): PaginationDirection {
return if (this == Timeline.Direction.BACKWARDS) PaginationDirection.BACKWARDS else PaginationDirection.FORWARDS

View File

@ -43,7 +43,7 @@ internal class DefaultTimelineService @AssistedInject constructor(@Assisted priv
private val contextOfEventTask: GetContextOfEventTask,
private val eventDecryptor: TimelineEventDecryptor,
private val paginationTask: PaginationTask,
private val fetchNextTokenAndPaginateTask: FetchNextTokenAndPaginateTask,
private val fetchTokenAndPaginateTask: FetchTokenAndPaginateTask,
private val timelineEventMapper: TimelineEventMapper,
private val readReceiptsSummaryMapper: ReadReceiptsSummaryMapper
) : TimelineService {
@ -66,7 +66,7 @@ internal class DefaultTimelineService @AssistedInject constructor(@Assisted priv
hiddenReadReceipts = TimelineHiddenReadReceipts(readReceiptsSummaryMapper, roomId, settings),
eventBus = eventBus,
eventDecryptor = eventDecryptor,
fetchNextTokenAndPaginateTask = fetchNextTokenAndPaginateTask
fetchTokenAndPaginateTask = fetchTokenAndPaginateTask
)
}

View File

@ -28,38 +28,48 @@ import im.vector.matrix.android.internal.util.awaitTransaction
import org.greenrobot.eventbus.EventBus
import javax.inject.Inject
internal interface FetchNextTokenAndPaginateTask : Task<FetchNextTokenAndPaginateTask.Params, TokenChunkEventPersistor.Result> {
internal interface FetchTokenAndPaginateTask : Task<FetchTokenAndPaginateTask.Params, TokenChunkEventPersistor.Result> {
data class Params(
val roomId: String,
val lastKnownEventId: String,
val direction: PaginationDirection,
val limit: Int
)
}
internal class DefaultFetchNextTokenAndPaginateTask @Inject constructor(
internal class DefaultFetchTokenAndPaginateTask @Inject constructor(
private val roomAPI: RoomAPI,
@SessionDatabase private val monarchy: Monarchy,
private val filterRepository: FilterRepository,
private val paginationTask: PaginationTask,
private val eventBus: EventBus
) : FetchNextTokenAndPaginateTask {
) : FetchTokenAndPaginateTask {
override suspend fun execute(params: FetchNextTokenAndPaginateTask.Params): TokenChunkEventPersistor.Result {
override suspend fun execute(params: FetchTokenAndPaginateTask.Params): TokenChunkEventPersistor.Result {
val filter = filterRepository.getRoomFilter()
val response = executeRequest<EventContextResponse>(eventBus) {
apiCall = roomAPI.getContextOfEvent(params.roomId, params.lastKnownEventId, 0, filter)
}
if (response.end == null) {
throw IllegalStateException("No next token found")
val fromToken = if (params.direction == PaginationDirection.FORWARDS) {
response.end
} else {
response.start
}
monarchy.awaitTransaction {
ChunkEntity.findIncludingEvent(it, params.lastKnownEventId)?.nextToken = response.end
?: throw IllegalStateException("No token found")
monarchy.awaitTransaction { realm ->
val chunkToUpdate = ChunkEntity.findIncludingEvent(realm, params.lastKnownEventId)
if (params.direction == PaginationDirection.FORWARDS) {
chunkToUpdate?.nextToken = fromToken
} else {
chunkToUpdate?.prevToken = fromToken
}
}
val paginationParams = PaginationTask.Params(
roomId = params.roomId,
from = response.end,
direction = PaginationDirection.FORWARDS,
from = fromToken,
direction = params.direction,
limit = params.limit
)
return paginationTask.execute(paginationParams)

View File

@ -125,7 +125,7 @@ internal class TimelineHiddenReadReceipts constructor(private val readReceiptsSu
.isNotEmpty(ReadReceiptsSummaryEntityFields.READ_RECEIPTS.`$`)
.filterReceiptsWithSettings()
.findAllAsync()
//.also { it.addChangeListener(hiddenReadReceiptsListener) }
.also { it.addChangeListener(hiddenReadReceiptsListener) }
}
/**