Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package com.google.firebase.ai.common.util
import android.media.AudioRecord
import kotlin.time.Duration.Companion.milliseconds
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.isActive
import kotlinx.coroutines.yield

/**
Expand All @@ -35,20 +37,18 @@ internal val AudioRecord.minBufferSize: Int
*
* Will yield when this instance is not recording.
*/
internal fun AudioRecord.readAsFlow() = flow {
internal fun AudioRecord.readAsFlow() = callbackFlow {
val buffer = ByteArray(minBufferSize)

while (true) {
while (isActive) {
if (recordingState != AudioRecord.RECORDSTATE_RECORDING) {
// TODO(vguthal): Investigate if both yield and delay are required.
delay(10.milliseconds)
yield()
delay(0)
continue
}
val bytesRead = read(buffer, 0, buffer.size)
if (bytesRead > 0) {
emit(buffer.copyOf(bytesRead))
send(buffer.copyOf(bytesRead))
}
yield()
delay(0)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ import io.ktor.client.plugins.websocket.DefaultClientWebSocketSession
import io.ktor.websocket.Frame
import io.ktor.websocket.close
import io.ktor.websocket.readBytes
import kotlinx.coroutines.CoroutineName
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.catch
Expand All @@ -49,7 +51,6 @@ import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.yield
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.Serializable
import kotlinx.serialization.encodeToString
Expand Down Expand Up @@ -120,7 +121,6 @@ internal constructor(
functionCallHandler: ((FunctionCallPart) -> FunctionResponsePart)? = null,
enableInterruptions: Boolean = false,
) {

val context = firebaseApp.applicationContext
if (
ContextCompat.checkSelfPermission(context, RECORD_AUDIO) != PackageManager.PERMISSION_GRANTED
Expand All @@ -137,8 +137,8 @@ internal constructor(
)
return@catchAsync
}

scope = CoroutineScope(blockingDispatcher + childJob())
// TODO: maybe it should be THREAD_PRIORITY_AUDIO anyways for playback and recording (not network though)
scope = CoroutineScope(blockingDispatcher + childJob() + CoroutineName("LiveSession Scope"))
audioHelper = AudioHelper.build()

recordUserAudio()
Expand Down Expand Up @@ -201,7 +201,7 @@ internal constructor(
)
}
?.let { emit(it.toPublic()) }
yield()
delay(0)
}
}
.onCompletion { stopAudioConversation() }
Expand Down Expand Up @@ -326,7 +326,10 @@ internal constructor(
?.listenToRecording()
?.buffer(UNLIMITED)
?.accumulateUntil(MIN_BUFFER_SIZE)
?.onEach { sendMediaStream(listOf(MediaData(it, "audio/pcm"))) }
?.onEach {
sendMediaStream(listOf(MediaData(it, "audio/pcm")))
delay(0)
}
?.catch { throw FirebaseAIException.from(it) }
?.launchIn(scope)
}
Expand Down Expand Up @@ -374,7 +377,7 @@ internal constructor(
if (it.interrupted) {
playBackQueue.clear()
} else {
println("Sending audio parts")
println("Queuing audio parts from model")
val audioParts = it.content?.parts?.filterIsInstance<InlineDataPart>().orEmpty()
for (part in audioParts) {
playBackQueue.add(part.inlineData)
Expand All @@ -390,7 +393,7 @@ internal constructor(
}
}
}
.launchIn(CoroutineScope(Dispatchers.IO))
.launchIn(scope)
}

/**
Expand All @@ -401,7 +404,7 @@ internal constructor(
* Launched asynchronously on [scope].
*/
private fun listenForModelPlayback(enableInterruptions: Boolean = false) {
CoroutineScope(Dispatchers.IO).launch {
scope.launch {
while (isActive) {
val playbackData = playBackQueue.poll()
if (playbackData == null) {
Expand All @@ -410,8 +413,9 @@ internal constructor(
if (!enableInterruptions) {
audioHelper?.resumeRecording()
}
yield()
delay(0)
} else {
println("Playing audio data")
/**
* We pause the recording while the model is speaking to avoid interrupting it because of
* no echo cancellation
Expand Down
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ benchmarkMacro = "1.3.4"
browser = "1.3.0"
cardview = "1.0.0"
constraintlayout = "2.1.4"
coroutines = "1.9.0"
coroutines = "1.10.2"
dagger = "2.51" # Don't bump above 2.51 as it causes a bug in AppDistro FeedbackSender JPEG code
datastore = "1.1.7"
dexmaker = "2.28.1"
Expand Down
Loading