VoIP: small refacts and cleaning

This commit is contained in:
ganfra 2020-12-07 20:34:29 +01:00
parent b998718142
commit bf6f60c7e5
7 changed files with 356 additions and 243 deletions

View File

@ -71,7 +71,6 @@ internal class EventInsertLiveObserver @Inject constructor(@SessionDatabase real
return@forEach
}
val domainEvent = event.asDomain()
// decryptIfNeeded(domainEvent)
processors.filter {
it.shouldProcess(eventId, domainEvent.getClearType(), eventInsert.insertType)
}.forEach {
@ -83,6 +82,7 @@ internal class EventInsertLiveObserver @Inject constructor(@SessionDatabase real
.findAll()
.deleteAllFromRealm()
}
processors.forEach { it.onPostProcess() }
}
}

View File

@ -25,4 +25,12 @@ internal interface EventInsertLiveProcessor {
fun shouldProcess(eventId: String, eventType: String, insertType: EventInsertType): Boolean
suspend fun process(realm: Realm, event: Event)
/**
* Called after transaction.
* Maybe you prefer to process the events outside of the realm transaction.
*/
suspend fun onPostProcess() {
// Noop by default
}
}

View File

@ -16,19 +16,16 @@
package org.matrix.android.sdk.internal.session.call
import io.realm.Realm
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.session.events.model.EventType
import org.matrix.android.sdk.internal.database.model.EventInsertType
import org.matrix.android.sdk.internal.di.UserId
import org.matrix.android.sdk.internal.session.EventInsertLiveProcessor
import io.realm.Realm
import timber.log.Timber
import javax.inject.Inject
internal class CallEventProcessor @Inject constructor(
@UserId private val userId: String,
private val callService: DefaultCallSignalingService
) : EventInsertLiveProcessor {
internal class CallEventProcessor @Inject constructor(private val callSignalingHandler: CallSignalingHandler)
: EventInsertLiveProcessor {
private val allowedTypes = listOf(
EventType.CALL_ANSWER,
@ -41,6 +38,8 @@ internal class CallEventProcessor @Inject constructor(
EventType.ENCRYPTED
)
private val eventsToPostProcess = mutableListOf<Event>()
override fun shouldProcess(eventId: String, eventType: String, insertType: EventInsertType): Boolean {
if (insertType != EventInsertType.INCREMENTAL_SYNC) {
return false
@ -49,10 +48,17 @@ internal class CallEventProcessor @Inject constructor(
}
override suspend fun process(realm: Realm, event: Event) {
update(realm, event)
eventsToPostProcess.add(event)
}
private fun update(realm: Realm, event: Event) {
override suspend fun onPostProcess() {
eventsToPostProcess.forEach {
dispatchToCallSignalingHandlerIfNeeded(it)
}
eventsToPostProcess.clear()
}
private fun dispatchToCallSignalingHandlerIfNeeded(event: Event) {
val now = System.currentTimeMillis()
// TODO might check if an invite is not closed (hangup/answsered) in the same event batch?
event.roomId ?: return Unit.also {
@ -63,10 +69,6 @@ internal class CallEventProcessor @Inject constructor(
// To old to ring?
return
}
event.ageLocalTs
if (EventType.isCallEvent(event.getClearType())) {
callService.onCallEvent(event)
}
Timber.v("$realm : $userId")
callSignalingHandler.onCallEvent(event)
}
}

View File

@ -0,0 +1,206 @@
/*
* Copyright (c) 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.call
import org.matrix.android.sdk.api.session.call.CallListener
import org.matrix.android.sdk.api.session.call.CallState
import org.matrix.android.sdk.api.session.call.MxCall
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.session.events.model.EventType
import org.matrix.android.sdk.api.session.events.model.toModel
import org.matrix.android.sdk.api.session.room.model.call.CallAnswerContent
import org.matrix.android.sdk.api.session.room.model.call.CallCandidatesContent
import org.matrix.android.sdk.api.session.room.model.call.CallHangupContent
import org.matrix.android.sdk.api.session.room.model.call.CallInviteContent
import org.matrix.android.sdk.api.session.room.model.call.CallNegotiateContent
import org.matrix.android.sdk.api.session.room.model.call.CallRejectContent
import org.matrix.android.sdk.api.session.room.model.call.CallSelectAnswerContent
import org.matrix.android.sdk.api.session.room.model.call.CallSignallingContent
import org.matrix.android.sdk.api.util.Optional
import org.matrix.android.sdk.internal.di.UserId
import org.matrix.android.sdk.internal.session.SessionScope
import timber.log.Timber
import java.math.BigDecimal
import javax.inject.Inject
@SessionScope
internal class CallSignalingHandler @Inject constructor(private val activeCallHandler: ActiveCallHandler,
private val mxCallFactory: MxCallFactory,
@UserId private val userId: String) {
private val callListeners = mutableSetOf<CallListener>()
private val callListenersDispatcher = CallListenersDispatcher(callListeners)
fun addCallListener(listener: CallListener) {
callListeners.add(listener)
}
fun removeCallListener(listener: CallListener) {
callListeners.remove(listener)
}
fun onCallEvent(event: Event) {
when (event.getClearType()) {
EventType.CALL_ANSWER -> {
handleCallAnswerEvent(event)
}
EventType.CALL_INVITE -> {
handleCallInviteEvent(event)
}
EventType.CALL_HANGUP -> {
handleCallHangupEvent(event)
}
EventType.CALL_REJECT -> {
handleCallRejectEvent(event)
}
EventType.CALL_CANDIDATES -> {
handleCallCandidatesEvent(event)
}
EventType.CALL_SELECT_ANSWER -> {
handleCallSelectAnswerEvent(event)
}
EventType.CALL_NEGOTIATE -> {
handleCallNegotiateEvent(event)
}
}
}
private fun handleCallNegotiateEvent(event: Event) {
val content = event.getClearContent().toModel<CallNegotiateContent>() ?: return
val call = content.getCall() ?: return
if (call.ourPartyId == content.partyId) {
// Ignore remote echo
return
}
callListenersDispatcher.onCallNegotiateReceived(content)
}
private fun handleCallSelectAnswerEvent(event: Event) {
val content = event.getClearContent().toModel<CallSelectAnswerContent>() ?: return
val call = content.getCall() ?: return
if (call.ourPartyId == content.partyId) {
// Ignore remote echo
return
}
if (call.isOutgoing) {
Timber.v("Got selectAnswer for an outbound call: ignoring")
return
}
val selectedPartyId = content.selectedPartyId
if (selectedPartyId == null) {
Timber.w("Got nonsensical select_answer with null selected_party_id: ignoring")
return
}
callListenersDispatcher.onCallSelectAnswerReceived(content)
}
private fun handleCallCandidatesEvent(event: Event) {
val content = event.getClearContent().toModel<CallCandidatesContent>() ?: return
val call = content.getCall() ?: return
if (call.ourPartyId == content.partyId) {
// Ignore remote echo
return
}
if (call.opponentPartyId != null && !call.partyIdsMatches(content)) {
Timber.v("Ignoring candidates from party ID ${content.partyId} we have chosen party ID ${call.opponentPartyId}")
return
}
callListenersDispatcher.onCallIceCandidateReceived(call, content)
}
private fun handleCallRejectEvent(event: Event) {
val content = event.getClearContent().toModel<CallRejectContent>() ?: return
val call = content.getCall() ?: return
activeCallHandler.removeCall(content.callId)
// No need to check party_id for reject because if we'd received either
// an answer or reject, we wouldn't be in state InviteSent
if (call.state != CallState.Dialing) {
return
}
callListenersDispatcher.onCallRejectReceived(content)
}
private fun handleCallHangupEvent(event: Event) {
val content = event.getClearContent().toModel<CallHangupContent>() ?: return
val call = content.getCall() ?: return
// party ID must match (our chosen partner hanging up the call) or be undefined (we haven't chosen
// a partner yet but we're treating the hangup as a reject as per VoIP v0)
if (call.opponentPartyId != null && !call.partyIdsMatches(content)) {
Timber.v("Ignoring hangup from party ID ${content.partyId} we have chosen party ID ${call.opponentPartyId}")
return
}
if (call.state != CallState.Terminated) {
activeCallHandler.removeCall(content.callId)
callListenersDispatcher.onCallHangupReceived(content)
}
}
private fun handleCallInviteEvent(event: Event) {
if (event.senderId == userId) {
// ignore invites you send
return
}
if (event.roomId == null || event.senderId == null) {
return
}
val content = event.getClearContent().toModel<CallInviteContent>() ?: return
val incomingCall = mxCallFactory.createIncomingCall(
roomId = event.roomId,
senderId = event.senderId,
content = content
) ?: return
activeCallHandler.addCall(incomingCall)
callListenersDispatcher.onCallInviteReceived(incomingCall, content)
}
private fun handleCallAnswerEvent(event: Event) {
val content = event.getClearContent().toModel<CallAnswerContent>() ?: return
val call = content.getCall() ?: return
if (call.ourPartyId == content.partyId) {
// Ignore remote echo
return
}
if (event.senderId == userId) {
// discard current call, it's answered by another of my session
callListenersDispatcher.onCallManagedByOtherSession(content.callId)
} else {
if (call.opponentPartyId != null) {
Timber.v("Ignoring answer from party ID ${content.partyId} we already have an answer from ${call.opponentPartyId}")
return
}
call.apply {
opponentPartyId = Optional.from(content.partyId)
opponentVersion = content.version?.let { BigDecimal(it).intValueExact() } ?: MxCall.VOIP_PROTO_VERSION
}
callListenersDispatcher.onCallAnswerReceived(content)
}
}
private fun MxCall.partyIdsMatches(contentSignallingContent: CallSignallingContent): Boolean {
return opponentPartyId?.getOrNull() == contentSignallingContent.partyId
}
private fun CallSignallingContent.getCall(): MxCall? {
val currentCall = callId?.let {
activeCallHandler.getCallWithId(it)
}
if (currentCall == null) {
Timber.v("Call for content: $this is null")
}
return currentCall
}
}

View File

@ -16,116 +16,46 @@
package org.matrix.android.sdk.internal.session.call
import android.os.SystemClock
import kotlinx.coroutines.Dispatchers
import org.matrix.android.sdk.api.MatrixCallback
import org.matrix.android.sdk.api.session.call.CallListener
import org.matrix.android.sdk.api.session.call.CallSignalingService
import org.matrix.android.sdk.api.session.call.CallState
import org.matrix.android.sdk.api.session.call.MxCall
import org.matrix.android.sdk.api.session.call.TurnServerResponse
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.session.events.model.EventType
import org.matrix.android.sdk.api.session.events.model.toModel
import org.matrix.android.sdk.api.session.room.model.call.CallAnswerContent
import org.matrix.android.sdk.api.session.room.model.call.CallCandidatesContent
import org.matrix.android.sdk.api.session.room.model.call.CallHangupContent
import org.matrix.android.sdk.api.session.room.model.call.CallInviteContent
import org.matrix.android.sdk.api.session.room.model.call.CallNegotiateContent
import org.matrix.android.sdk.api.session.room.model.call.CallRejectContent
import org.matrix.android.sdk.api.session.room.model.call.CallSelectAnswerContent
import org.matrix.android.sdk.api.session.room.model.call.CallSignallingContent
import org.matrix.android.sdk.api.util.Cancelable
import org.matrix.android.sdk.api.util.NoOpCancellable
import org.matrix.android.sdk.api.util.Optional
import org.matrix.android.sdk.internal.di.DeviceId
import org.matrix.android.sdk.internal.di.UserId
import org.matrix.android.sdk.internal.session.SessionScope
import org.matrix.android.sdk.internal.session.call.model.MxCallImpl
import org.matrix.android.sdk.internal.session.room.send.LocalEchoEventFactory
import org.matrix.android.sdk.internal.session.room.send.queue.EventSenderProcessor
import org.matrix.android.sdk.internal.task.TaskExecutor
import org.matrix.android.sdk.internal.task.configureWith
import org.matrix.android.sdk.internal.task.launchToCallback
import timber.log.Timber
import java.math.BigDecimal
import java.util.UUID
import javax.inject.Inject
@SessionScope
internal class DefaultCallSignalingService @Inject constructor(
@UserId
private val userId: String,
@DeviceId
private val deviceId: String?,
private val callSignalingHandler: CallSignalingHandler,
private val mxCallFactory: MxCallFactory,
private val activeCallHandler: ActiveCallHandler,
private val localEchoEventFactory: LocalEchoEventFactory,
private val eventSenderProcessor: EventSenderProcessor,
private val taskExecutor: TaskExecutor,
private val turnServerTask: GetTurnServerTask
private val turnServerDataSource: TurnServerDataSource
) : CallSignalingService {
private val callListeners = mutableSetOf<CallListener>()
private val callListenersDispatcher = CallListenersDispatcher(callListeners)
private val cachedTurnServerResponse = object {
// Keep one minute safe to avoid considering the data is valid and then actually it is not when effectively using it.
private val MIN_TTL = 60
private val now = { SystemClock.elapsedRealtime() / 1000 }
private var expiresAt: Long = 0
var data: TurnServerResponse? = null
get() = if (expiresAt > now()) field else null
set(value) {
expiresAt = now() + (value?.ttl ?: 0) - MIN_TTL
field = value
}
}
override fun getTurnServer(callback: MatrixCallback<TurnServerResponse>): Cancelable {
if (cachedTurnServerResponse.data != null) {
cachedTurnServerResponse.data?.let { callback.onSuccess(it) }
return NoOpCancellable
return taskExecutor.executorScope.launchToCallback(Dispatchers.Default, callback) {
turnServerDataSource.getTurnServer()
}
return turnServerTask
.configureWith(GetTurnServerTask.Params) {
this.callback = object : MatrixCallback<TurnServerResponse> {
override fun onSuccess(data: TurnServerResponse) {
cachedTurnServerResponse.data = data
callback.onSuccess(data)
}
override fun onFailure(failure: Throwable) {
callback.onFailure(failure)
}
}
}
.executeBy(taskExecutor)
}
override fun createOutgoingCall(roomId: String, otherUserId: String, isVideoCall: Boolean): MxCall {
val call = MxCallImpl(
callId = UUID.randomUUID().toString(),
isOutgoing = true,
roomId = roomId,
userId = userId,
ourPartyId = deviceId ?: "",
opponentUserId = otherUserId,
isVideoCall = isVideoCall,
localEchoEventFactory = localEchoEventFactory,
eventSenderProcessor = eventSenderProcessor
)
activeCallHandler.addCall(call).also {
return call
return mxCallFactory.createOutgoingCall(roomId, otherUserId, isVideoCall).also {
activeCallHandler.addCall(it)
}
}
override fun addCallListener(listener: CallListener) {
callListeners.add(listener)
callSignalingHandler.addCallListener(listener)
}
override fun removeCallListener(listener: CallListener) {
callListeners.remove(listener)
callSignalingHandler.removeCallListener(listener)
}
override fun getCallWithId(callId: String): MxCall? {
@ -137,154 +67,6 @@ internal class DefaultCallSignalingService @Inject constructor(
return activeCallHandler.getActiveCallsLiveData().value?.isNotEmpty() == true
}
internal fun onCallEvent(event: Event) {
when (event.getClearType()) {
EventType.CALL_ANSWER -> {
handleCallAnswerEvent(event)
}
EventType.CALL_INVITE -> {
handleCallInviteEvent(event)
}
EventType.CALL_HANGUP -> {
handleCallHangupEvent(event)
}
EventType.CALL_REJECT -> {
handleCallRejectEvent(event)
}
EventType.CALL_CANDIDATES -> {
handleCallCandidatesEvent(event)
}
EventType.CALL_SELECT_ANSWER -> {
handleCallSelectAnswerEvent(event)
}
EventType.CALL_NEGOTIATE -> {
handleCallNegotiateEvent(event)
}
}
}
private fun handleCallNegotiateEvent(event: Event) {
val content = event.getClearContent().toModel<CallNegotiateContent>() ?: return
val call = content.getCall() ?: return
if (call.ourPartyId == content.partyId) {
// Ignore remote echo
return
}
callListenersDispatcher.onCallNegotiateReceived(content)
}
private fun handleCallSelectAnswerEvent(event: Event) {
val content = event.getClearContent().toModel<CallSelectAnswerContent>() ?: return
val call = content.getCall() ?: return
if (call.ourPartyId == content.partyId) {
// Ignore remote echo
return
}
if (call.isOutgoing) {
Timber.v("Got selectAnswer for an outbound call: ignoring")
return
}
val selectedPartyId = content.selectedPartyId
if (selectedPartyId == null) {
Timber.w("Got nonsensical select_answer with null selected_party_id: ignoring")
return
}
callListenersDispatcher.onCallSelectAnswerReceived(content)
}
private fun handleCallCandidatesEvent(event: Event) {
val content = event.getClearContent().toModel<CallCandidatesContent>() ?: return
val call = content.getCall() ?: return
if (call.ourPartyId == content.partyId) {
// Ignore remote echo
return
}
if (call.opponentPartyId != Optional.from(content.partyId)) {
Timber.v("Ignoring candidates from party ID ${content.partyId} we have chosen party ID ${call.opponentPartyId}")
return
}
callListenersDispatcher.onCallIceCandidateReceived(call, content)
}
private fun handleCallRejectEvent(event: Event) {
val content = event.getClearContent().toModel<CallRejectContent>() ?: return
val call = content.getCall() ?: return
activeCallHandler.removeCall(content.callId)
// No need to check party_id for reject because if we'd received either
// an answer or reject, we wouldn't be in state InviteSent
if (call.state != CallState.Dialing) {
return
}
callListenersDispatcher.onCallRejectReceived(content)
}
private fun handleCallHangupEvent(event: Event) {
val content = event.getClearContent().toModel<CallHangupContent>() ?: return
val call = content.getCall() ?: return
if (call.state != CallState.Terminated) {
// Need to check for party_id?
activeCallHandler.removeCall(content.callId)
callListenersDispatcher.onCallHangupReceived(content)
}
}
private fun handleCallInviteEvent(event: Event) {
if (event.senderId == userId) {
// ignore invites you send
return
}
val content = event.getClearContent().toModel<CallInviteContent>() ?: return
val incomingCall = MxCallImpl(
callId = content.callId ?: return,
isOutgoing = false,
roomId = event.roomId ?: return,
userId = userId,
ourPartyId = deviceId ?: "",
opponentUserId = event.senderId ?: return,
isVideoCall = content.isVideo(),
localEchoEventFactory = localEchoEventFactory,
eventSenderProcessor = eventSenderProcessor
).apply {
opponentPartyId = Optional.from(content.partyId)
opponentVersion = content.version?.let { BigDecimal(it).intValueExact() } ?: MxCall.VOIP_PROTO_VERSION
}
activeCallHandler.addCall(incomingCall)
callListenersDispatcher.onCallInviteReceived(incomingCall, content)
}
private fun handleCallAnswerEvent(event: Event) {
val content = event.getClearContent().toModel<CallAnswerContent>() ?: return
val call = content.getCall() ?: return
if (call.ourPartyId == content.partyId) {
// Ignore remote echo
return
}
if (event.senderId == userId) {
// discard current call, it's answered by another of my session
callListenersDispatcher.onCallManagedByOtherSession(content.callId)
} else {
if (call.opponentPartyId != null) {
Timber.v("Ignoring answer from party ID ${content.partyId} we already have an answer from ${call.opponentPartyId}")
return
}
call.apply {
opponentPartyId = Optional.from(content.partyId)
opponentVersion = content.version?.let { BigDecimal(it).intValueExact() } ?: MxCall.VOIP_PROTO_VERSION
}
callListenersDispatcher.onCallAnswerReceived(content)
}
}
private fun CallSignallingContent.getCall(): MxCall? {
val currentCall = callId?.let {
activeCallHandler.getCallWithId(it)
}
if (currentCall == null) {
Timber.v("Call for content: $this is null")
}
return currentCall
}
companion object {
const val CALL_TIMEOUT_MS = 120_000
}

View File

@ -0,0 +1,69 @@
/*
* Copyright (c) 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.call
import org.matrix.android.sdk.api.session.call.MxCall
import org.matrix.android.sdk.api.session.room.model.call.CallInviteContent
import org.matrix.android.sdk.api.util.Optional
import org.matrix.android.sdk.internal.di.DeviceId
import org.matrix.android.sdk.internal.di.UserId
import org.matrix.android.sdk.internal.session.call.model.MxCallImpl
import org.matrix.android.sdk.internal.session.room.send.LocalEchoEventFactory
import org.matrix.android.sdk.internal.session.room.send.queue.EventSenderProcessor
import java.math.BigDecimal
import java.util.UUID
import javax.inject.Inject
internal class MxCallFactory @Inject constructor(
@DeviceId private val deviceId: String?,
private val localEchoEventFactory: LocalEchoEventFactory,
private val eventSenderProcessor: EventSenderProcessor,
@UserId private val userId: String
) {
fun createIncomingCall(roomId: String, senderId: String, content: CallInviteContent): MxCall? {
if (content.callId == null) return null
return MxCallImpl(
callId = content.callId,
isOutgoing = false,
roomId = roomId,
userId = userId,
ourPartyId = deviceId ?: "",
opponentUserId = senderId,
isVideoCall = content.isVideo(),
localEchoEventFactory = localEchoEventFactory,
eventSenderProcessor = eventSenderProcessor
).apply {
opponentPartyId = Optional.from(content.partyId)
opponentVersion = content.version?.let { BigDecimal(it).intValueExact() } ?: MxCall.VOIP_PROTO_VERSION
}
}
fun createOutgoingCall(roomId: String, otherUserId: String, isVideoCall: Boolean): MxCall {
return MxCallImpl(
callId = UUID.randomUUID().toString(),
isOutgoing = true,
roomId = roomId,
userId = userId,
ourPartyId = deviceId ?: "",
opponentUserId = otherUserId,
isVideoCall = isVideoCall,
localEchoEventFactory = localEchoEventFactory,
eventSenderProcessor = eventSenderProcessor
)
}
}

View File

@ -0,0 +1,46 @@
/*
* Copyright (c) 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.call
import android.os.SystemClock
import org.matrix.android.sdk.api.session.call.TurnServerResponse
import javax.inject.Inject
internal class TurnServerDataSource @Inject constructor(private val turnServerTask: GetTurnServerTask) {
private val cachedTurnServerResponse = object {
// Keep one minute safe to avoid considering the data is valid and then actually it is not when effectively using it.
private val MIN_TTL = 60
private val now = { SystemClock.elapsedRealtime() / 1000 }
private var expiresAt: Long = 0
var data: TurnServerResponse? = null
get() = if (expiresAt > now()) field else null
set(value) {
expiresAt = now() + (value?.ttl ?: 0) - MIN_TTL
field = value
}
}
suspend fun getTurnServer(): TurnServerResponse {
return cachedTurnServerResponse.data ?: turnServerTask.execute(GetTurnServerTask.Params).also {
cachedTurnServerResponse.data = it
}
}
}