Spracherkennung für: .kt vermutete Sprache: Unknown {[0] [0] [0]} [Methode: Schwerpunktbildung, einfache Gewichte, sechs Dimensionen]
package ch.threema.app.webrtc
import android.annotation.SuppressLint
import androidx.annotation.AnyThread
import ch.threema.android.createDelayedExecutor
import ch.threema.app.voip.groupcall.GroupCallException
import ch.threema.base.utils.getThreemaLogger
import java.nio.ByteBuffer
import java.util.concurrent.CancellationException
import java.util.concurrent.CompletableFuture
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import kotlin.time.Duration.Companion.seconds
import kotlinx.coroutines.CompletableDeferred
import org.webrtc.*
import org.webrtc.CameraVideoCapturer
@SuppressLint("LoggerName")
private val logger = getThreemaLogger("GroupCall.PeerConnectionObserver")
/**
* Implements the interface but ignores all observed events unless overridden.
*/
@AnyThread
open class DefaultNoopPeerConnectionObserver : PeerConnection.Observer {
override fun onRenegotiationNeeded() {
/* noop */
}
override fun onSignalingChange(state: PeerConnection.SignalingState?) {
/* noop */
}
override fun onIceGatheringChange(state: PeerConnection.IceGatheringState?) {
/* noop */
}
override fun onIceConnectionChange(state: PeerConnection.IceConnectionState?) {
/* noop */
}
override fun onIceConnectionReceivingChange(receiving: Boolean) {
/* noop */
}
override fun onIceCandidate(candidate: IceCandidate?) {
/* noop */
}
override fun onIceCandidatesRemoved(candidatesRemoved: Array<out IceCandidate>?) {
/* noop */
}
override fun onAddStream(stream: MediaStream?) {
/* noop */
}
override fun onRemoveStream(stream: MediaStream?) {
/* noop */
}
override fun onDataChannel(dataChannel: DataChannel?) {
/* noop */
}
}
/**
* Maps the PeerConnection.Observer into a more sane structure with non-nullable types and some
* non-standard and irrelevant events removed.
*/
@AnyThread
interface SanePeerConnectionObserver {
fun onDetach() {
/* noop */
}
fun onRenegotiationNeeded() {
/* noop */
}
fun onSignalingChange(state: PeerConnection.SignalingState) {
/* noop */
}
fun onConnectionChange(state: PeerConnection.PeerConnectionState) {
/* noop */
}
fun onIceGatheringChange(state: PeerConnection.IceGatheringState) {
/* noop */
}
fun onIceConnectionChange(state: PeerConnection.IceConnectionState) {
/* noop */
}
fun onIceCandidate(candidate: IceCandidate) {
/* noop */
}
fun onSelectedCandidatePairChanged(event: CandidatePairChangeEvent) {
/* noop */
}
fun onDataChannel(channel: DataChannel) {
/* noop */
}
fun onTransceiver(transceiver: RtpTransceiver) {
/* noop */
}
}
/**
* Wraps a PeerConnection.Observer to dispatch to the more SanePeerConnectionObserver.
*
* If no observer is attached, all events will be buffered and flushed once an observer is being
* attached.
*
* Since libwebrtc has a... weird thread model, applies a ReentrantLock every time one of
* the events fires.
*/
@AnyThread
class WrappedPeerConnectionObserver(
private var observer: SanePeerConnectionObserver? = null,
) : DefaultNoopPeerConnectionObserver() {
private val lock = ReentrantLock()
private val events = mutableListOf<Any>()
fun replace(newObserver: SanePeerConnectionObserver?) {
lock.withLock {
observer?.onDetach()
observer = newObserver
if (newObserver == null) {
return@withLock
}
events.forEach {
when (it) {
is Unit -> newObserver.onRenegotiationNeeded()
is PeerConnection.SignalingState -> newObserver.onSignalingChange(it)
is PeerConnection.PeerConnectionState -> newObserver.onConnectionChange(it)
is PeerConnection.IceGatheringState -> newObserver.onIceGatheringChange(it)
is PeerConnection.IceConnectionState -> newObserver.onIceConnectionChange(it)
is IceCandidate -> newObserver.onIceCandidate(it)
is CandidatePairChangeEvent -> newObserver.onSelectedCandidatePairChanged(it)
is DataChannel -> newObserver.onDataChannel(it)
is RtpTransceiver -> newObserver.onTransceiver(it)
else -> throw Error("Unexpected peer connection event type: ${it.javaClass}")
}
}
events.clear()
}
}
override fun onRenegotiationNeeded() = lock.withLock {
if (observer.let { it?.onRenegotiationNeeded() == null }) {
events.add(Unit)
}
}
override fun onSignalingChange(state: PeerConnection.SignalingState?) {
checkNotNull(state)
lock.withLock {
if (observer.let { it?.onSignalingChange(state) == null }) {
events.add(state)
}
}
}
override fun onConnectionChange(state: PeerConnection.PeerConnectionState?) {
checkNotNull(state)
lock.withLock {
if (observer.let { it?.onConnectionChange(state) == null }) {
events.add(state)
}
}
}
override fun onIceGatheringChange(state: PeerConnection.IceGatheringState?) {
checkNotNull(state)
lock.withLock {
if (observer.let { it?.onIceGatheringChange(state) == null }) {
events.add(state)
}
}
}
override fun onIceConnectionChange(state: PeerConnection.IceConnectionState?) {
checkNotNull(state)
lock.withLock {
if (observer.let { it?.onIceConnectionChange(state) == null }) {
events.add(state)
}
}
}
override fun onIceCandidate(candidate: IceCandidate?) {
checkNotNull(candidate)
lock.withLock {
if (observer.let { it?.onIceCandidate(candidate) == null }) {
events.add(candidate)
}
}
}
override fun onSelectedCandidatePairChanged(event: CandidatePairChangeEvent?) {
checkNotNull(event)
lock.withLock {
if (observer.let { it?.onSelectedCandidatePairChanged(event) == null }) {
events.add(event)
}
}
}
override fun onDataChannel(dataChannel: DataChannel?) {
checkNotNull(dataChannel)
lock.withLock {
if (observer.let { it?.onDataChannel(dataChannel) == null }) {
events.add(dataChannel)
}
}
}
override fun onTrack(transceiver: RtpTransceiver?) {
checkNotNull(transceiver)
lock.withLock {
if (observer.let { it?.onTransceiver(transceiver) == null }) {
events.add(transceiver)
}
}
}
}
internal class PeerConnectionObserver(
private val addTransceiver: ((transceiver: RtpTransceiver) -> Unit),
private val failedSignal: CompletableDeferred<*>,
) : SanePeerConnectionObserver {
private val lock = ReentrantLock()
private var iceFailedSignal: CompletableFuture<Unit>? = null
override fun onDetach() = cancelIceFailedTimer()
// region Event handlers
override fun onRenegotiationNeeded() = logger.trace("Negotiation needed")
override fun onSignalingChange(state: PeerConnection.SignalingState) =
logger.trace("Signaling state: {}", state.name)
override fun onConnectionChange(state: PeerConnection.PeerConnectionState) =
logger.debug("Connection state: {}", state.name)
override fun onIceGatheringChange(state: PeerConnection.IceGatheringState) =
logger.trace("ICE gathering state: {}", state.name)
override fun onIceCandidate(candidate: IceCandidate) =
logger.trace("ICE candidate: {}", candidate.sdp)
override fun onIceConnectionChange(state: PeerConnection.IceConnectionState) = lock.withLock {
logger.debug("ICE connection state: {}", state.name)
when (state) {
PeerConnection.IceConnectionState.CONNECTED ->
cancelIceFailedTimer()
PeerConnection.IceConnectionState.DISCONNECTED ->
scheduleIceFailedTimer()
PeerConnection.IceConnectionState.FAILED ->
iceFailed(GroupCallException("ICE failed explicitly"))
else -> {
/* noop */
}
}
}
override fun onSelectedCandidatePairChanged(event: CandidatePairChangeEvent) =
logger.debug("Selected candidate: {} -> {}", event.local.sdp, event.remote.sdp)
override fun onDataChannel(channel: DataChannel) =
logger.warn("Unexpected data channel (label='{}')", channel.label())
override fun onTransceiver(transceiver: RtpTransceiver) = lock.withLock {
logger.trace(
"New transceiver (kind='{}', mid='{}')",
transceiver.mediaType.name,
transceiver.mid,
)
addTransceiver(transceiver)
}
// endregion
// region ICE failed timer
private fun cancelIceFailedTimer() {
if (iceFailedSignal?.cancel(true) == true) {
logger.trace("ICE failed timer cancelled")
}
iceFailedSignal = null
}
private fun scheduleIceFailedTimer() {
cancelIceFailedTimer()
iceFailedSignal = CompletableFuture.supplyAsync<Unit>({
throw Error("ICE remained disconnected for 10 seconds")
}, createDelayedExecutor(10.seconds))
.exceptionally { error ->
when (error) {
is CancellationException -> {
/* noop */
}
else -> {
logger.trace("ICE failed timer expired")
iceFailed(GroupCallException("ICE failed due to timeout"))
}
}
}
logger.trace("ICE failed timer started")
}
private fun iceFailed(error: GroupCallException) {
failedSignal.completeExceptionally(error)
}
// endregion
}
/**
* Sane DataChannelObserver.
*/
@AnyThread
interface SaneDataChannelObserver {
fun onDetach() {
/* noop */
}
fun onBufferedAmountChange(bufferedAmount: ULong) {
/* noop */
}
fun onStateChange(state: DataChannel.State) {
/* noop */
}
fun onMessage(buffer: DataChannel.Buffer) {
/* noop */
}
}
/**
* Wraps a DataChannel.Observer to a SaneDataChannelObserver.
*
* If no observer is attached, all events will be buffered and flushed once an observer is being
* attached.
*
* Since libwebrtc has a... weird thread model, applies a ReentrantLock every time one of
* the events fires.
*/
@AnyThread
class WrappedDataChannelObserver(
private val state: () -> DataChannel.State,
private var observer: SaneDataChannelObserver? = null,
) : DataChannel.Observer {
private val lock = ReentrantLock()
private val events = mutableListOf<Any>()
fun replace(newObserver: SaneDataChannelObserver?) = lock.withLock {
observer?.onDetach()
observer = newObserver
if (newObserver == null) {
return
}
events.forEach {
when (it) {
is ULong -> newObserver.onBufferedAmountChange(it)
is DataChannel.State -> newObserver.onStateChange(it)
is DataChannel.Buffer -> newObserver.onMessage(it)
else -> throw Error("Unexpected data channel event type: ${it.javaClass}")
}
}
events.clear()
}
override fun onBufferedAmountChange(bufferedAmountLong: Long) {
val bufferedAmount = bufferedAmountLong.toULong()
lock.withLock {
if (observer.let { it?.onBufferedAmountChange(bufferedAmount) == null }) {
events.add(bufferedAmount)
}
}
}
override fun onStateChange() = lock.withLock {
val state = state()
if (observer.let { it?.onStateChange(state) == null }) {
events.add(state)
}
}
override fun onMessage(message: DataChannel.Buffer?) {
checkNotNull(message)
lock.withLock {
if (observer.let { it?.onMessage(message) == null }) {
// Copy the message since the ByteBuffer will be reused immediately
val copy = with(ByteBuffer.allocate(message.data.remaining())) {
put(message.data)
flip()
this
}
events.add(DataChannel.Buffer(copy, message.binary))
}
}
}
}
suspend fun PeerConnection.setLocalDescription(description: SessionDescription) {
val future = CompletableDeferred<Unit>()
this.setLocalDescription(
object : SdpObserver {
override fun onCreateSuccess(description: SessionDescription?) {
future.completeExceptionally(Error("Unexpected onCreateSuccess event"))
}
override fun onSetSuccess() {
future.complete(Unit)
}
override fun onCreateFailure(reason: String?) {
future.completeExceptionally(Error("Unexpected onCreateFailure event"))
}
override fun onSetFailure(reason: String?) {
checkNotNull(reason)
future.completeExceptionally(Error("Setting remote description failed, reason: $reason"))
}
},
description,
)
return future.await()
}
suspend fun PeerConnection.setRemoteDescription(description: SessionDescription) {
val future = CompletableDeferred<Unit>()
this.setRemoteDescription(
object : SdpObserver {
override fun onCreateSuccess(description: SessionDescription?) {
future.completeExceptionally(GroupCallException("Unexpected onCreateSuccess event"))
}
override fun onSetSuccess() {
future.complete(Unit)
}
override fun onCreateFailure(reason: String?) {
future.completeExceptionally(GroupCallException("Unexpected onCreateFailure event"))
}
override fun onSetFailure(reason: String?) {
checkNotNull(reason)
future.completeExceptionally(GroupCallException("Setting remote description failed, reason: $reason"))
}
},
description,
)
return future.await()
}
suspend fun PeerConnection.createAnswer(constraints: MediaConstraints? = null): SessionDescription {
val future = CompletableDeferred<SessionDescription>()
this.createAnswer(
object : SdpObserver {
override fun onCreateSuccess(description: SessionDescription?) {
checkNotNull(description)
future.complete(description)
}
override fun onSetSuccess() {
future.completeExceptionally(Error("Unexpected onSetSuccess event"))
}
override fun onCreateFailure(reason: String?) {
future.completeExceptionally(Error("Creating answer failed, reason: $reason"))
}
override fun onSetFailure(reason: String?) {
future.completeExceptionally(Error("Unexpected onSetFailure event"))
}
},
constraints ?: MediaConstraints(),
)
return future.await()
}
/**
* Add an ICE candidate and maps the result to a CompletableFuture.
*/
suspend fun PeerConnection.addIceCandidateAsync(candidate: IceCandidate) {
val future = CompletableDeferred<Unit>()
this.addIceCandidate(
candidate,
object : AddIceObserver {
override fun onAddSuccess() {
future.complete(Unit)
}
override fun onAddFailure(reason: String?) {
checkNotNull(reason)
future.completeExceptionally(Error("Unable to add ICE candidate: '$reason'"))
}
},
)
return future.await()
}
/**
* Maps the PeerConnection.Observer into a more sane structure with non-nullable types.
*/
interface SaneCameraEventsHandler {
fun onCameraError(error: String) {
/* noop */
}
fun onCameraDisconnected() {
/* noop */
}
fun onCameraFreeze(error: String) {
/* noop */
}
fun onCameraOpening(cameraName: String) {
/* noop */
}
fun onFirstFrameAvailable() {
/* noop */
}
fun onCameraClosed() {
/* noop */
}
}
/**
* Wraps a CameraVideoCapturer.CameraEventsHandler to a SaneCameraEventsHandler.
*/
class WrappedCameraEventsHandler(
private val observer: SaneCameraEventsHandler,
) : CameraVideoCapturer.CameraEventsHandler {
override fun onCameraError(error: String?) {
observer.onCameraError(checkNotNull(error))
}
override fun onCameraDisconnected() {
observer.onCameraDisconnected()
}
override fun onCameraFreezed(error: String?) {
observer.onCameraFreeze(checkNotNull(error))
}
override fun onCameraOpening(cameraName: String?) {
observer.onCameraOpening(checkNotNull(cameraName))
}
override fun onFirstFrameAvailable() {
observer.onFirstFrameAvailable()
}
override fun onCameraClosed() {
observer.onCameraClosed()
}
}
[Dauer der Verarbeitung: 0.23 Sekunden, vorverarbeitet 2026-04-27]