Multiple Pipelines

Hello,

I’ve managed to implement a pipeline using Azure ASR with our custom TF models for nlu and wakeword. So far so good! My next objective was to add an intent “MakeANote”, which should begin recording the users audio untill they stop for a specific timeout. I made a new pipeline noteRecorder, that is withoutTts, withoutWakeword, withoutNlu and implements VADTriggerAzureASR. Both of these are instantiated together with build().
When “MakeAnote”. is triggered, spokestack.stop(); is called then noteRecorder.activate() From here, I wish to record the audio into wav format, save the transcript via context, stop noterecorder, and continue the spokestack pipeline. But the noteRecorder does not recognize, partially recognize or throw any errors. It just activates and stays activated forever. Am I following the current program flow?

1 Like

Trying to juggle management of system audio resources among multiple pipelines is bound to cause headaches. Spokestack was designed for only one pipeline running at a time—when you call start(), the pipeline takes control of the microphone (in most cases; it depends on your pipeline’s input class), and Android doesn’t make sharing it very easy.

For your use case, here’s what I’d suggest:

  • Use one pipeline
  • Instead of withoutNlu, build it using withoutAutoClassification
  • Use an internal flag somewhere to represent whether you’re currently transcribing a note (whether the last intent the system recognized was MakeANote)
  • In your speech event handler, when you get a RECOGNIZE event, check that flag; if you’re not currently transcribing a note, send the utterance to the NLU for classification (and reset the flag); if you are, send the note wherever it should go
1 Like

thank you for prompt response, I am able to follow this program flow! How would you recommend I convert these audio frame Dequeue buffers into a wav file? Is the output from this Dequeue Interface usable in the Android AudioTrack (is it PCM) and how would this be handled on the iOS side?

Recording audio is outside the scope of Spokestack. Since privacy is a core priority for the library, Spokestack only processes audio, not seeking to store it for later use.

To record audio, you’d want to add a new pipeline stage that uses its process() method to either send audio frames as events to an external component or directly append them to a file based on certain criteria (whether the pipeline is active, whether app logic dictates that it should be recording the current audio, etc.) Spokestack reads directly from the device microphone via the AudioRecord interface. By default, each frame represents 20ms of audio.

You can find instructions for going from AudioRecord samples to a .wav file online; keep in mind that you’ll probably need to write the wav header for each file manually unless you find another library that’ll take care of this for you.

For iOS, @noelweichbrodt would be better able to answer specifics.

1 Like

Thank you for the info! I will look into making a specific stage for sending the frames. Just for some further clarification, when I obtain the buffer via context.getBuffer(), this contains a Dequeue Interface containing ByteBuffer elements. Is this queue updated with each frame, consequently inserting it’s ByteBuffer? Then I should be able to extract the final context.getBuffer() once asr recognize has been called, right?

That buffer is a ring buffer—it can’t grow without bound as audio is being processed, or it could eventually become a memory problem if the timeout was set too high. Frames are inserted into it sequentially, but when it’s full, it starts overwriting frames.

This is why I suggested doing the work inside a pipeline stage—the stages receive each frame of audio as a ByteBuffer as it is read from the microphone, so you don’t need to deal with the deque. context.getBuffer() is unlikely to contain the entire utterance, so you want to record each frame as it comes in so you don’t miss any.

1 Like

Thank you for the amazing support! This solves my questions.

Happy to help.

A brief correction: I wrote the last response a little too quickly. The deque in the speech context isn’t exactly a ring buffer (those are used in other components), but it is bounded in size, so the rest of my advice still applies :slight_smile: . Just wanted to update in case I accidentally caused any confusion there.

Handling audio on iOS is a big topic! To begin I would recommend becoming familiar with the concepts of AudioSession, AudioComponent and AudioUnit: Audio - Apple Developer contains some good WWDC sessions that provide important understanding for iOS audio. Additionally, you can reference how Spokestack handles iOS audio by examining AudioController.swift. Hope this helps.

Hi Josh, I have implemented the buffer recorder using your suggestion to directly append to the file, I do this with a fileoutputstream to a pcm file (will add headers later on). When I open the file, the audio is not correct. I have tried rewinding the buffer as well as not rewinding. Can you please verify if my Speech Processor is handling these buffers correctly and if rewinding of buffers is necessary?

import com.elementa.elementa.RecordWaveTask

import android.content.Context
import android.os.SystemClock
import android.media.AudioFormat;
import android.media.AudioRecord;
import android.media.MediaRecorder.AudioSource;
import androidx.lifecycle.*;
import androidx.lifecycle.Observer;
import android.util.Log
import io.spokestack.spokestack.SpeechConfig
import io.spokestack.spokestack.SpeechContext
import io.spokestack.spokestack.SpeechPipeline
import io.spokestack.spokestack.SpeechProcessor
import java.nio.ByteBuffer
import java.nio.Buffer;
import java.io.FileOutputStream
import java.nio.channels.FileChannel
import java.nio.ByteOrder
import java.io.File
import java.io.FileNotFoundException
import java.io.IOException
/*================ CUSTOM SPEECH PROCESSOR ===========================*/
public class NoteProcessor(speechConfig: SpeechConfig) : SpeechProcessor {
    // private val request: ApiStreamObserver<StreamingRecognizeRequest>? = null
    val logTag = javaClass.simpleName
    var fileWriter: FileOutputStream? = null;
    var fileChannel : FileChannel? = null;
    var frameCount: Int = 0;
    var lastFrame : ByteBuffer? = null;
    var totalBuffer = mutableListOf<ByteBuffer>()
    var rawFilePath: File? = null;
    var filepath : String? = null;
    init {
    }
    //@TODO: MainActivity should call this when makingNote starts
    public fun startRecording(context: Context) {
        Log.i(logTag,"[NOTE PROCESSOR] Starting Recording");
        filepath = context.getExternalFilesDir(null)?.getAbsolutePath() + "recording_" + (System.currentTimeMillis() / 1000).toString() + ".pcm";
        try {
            fileWriter = FileOutputStream(filepath, true); //true = Append
            fileChannel = fileWriter?.getChannel();
        } catch (e: FileNotFoundException) {
            Log.i(logTag, "[NOTE PROCESSOR] FileNotFound")
            e.printStackTrace()
        }
        noteState.startRecording = false;
    }
    public fun stopRecording(){
        Log.i(logTag,"[NOTE PROCESSOR] Stopped Recording with: " + frameCount * 20 + "ms of audio");
        noteState.stopRecording = false;
        frameCount = 0;
        Log.i(logTag, "[NOTE PROCESSOR] Notifying change in audioPath");
        noteState.audioPath.postValue(filepath);
    //    fileWriter?.write(totalBuffer.toByteArray())
        Log.i(logTag, "[NOTE PROCESSOR] Closing file writer.");
        try {
            fileChannel?.close();
            fileWriter?.close();
        } catch (e: IOException) {
            e.printStackTrace()
            Log.i(logTag, "[NOTE PROCESSOR] Error trying to close file writer.");
        }
        fileWriter = null;
        //noteState.buffer.postValue(convertToOne(totalBuffer));
        //Log.i(logTag, "Buffer length: " + noteState.buffer);
        //noteState.justTriggered = false;
        //Reset file path to avoid collision
        //Delegates control back to Main Activity for Buffer Handling
    }
    @Throws(Exception::class)
    /*
    * Each time context is updated, extract the byteBuffer
    * & add to our increasing totalBuffer
    * */
    override fun process(context: SpeechContext, frame: ByteBuffer?) {
        if(noteState.startRecording == true && context.getAndroidContext() != null){
            startRecording(context.getAndroidContext()!!);
            noteState.makingNote = true;
            Log.i(logTag, "[NOTE PROCESSOR] Making Note = false");
        }
        if(noteState.stopRecording == true){
            stopRecording();
            noteState.makingNote = false;
            Log.i(logTag, "[NOTE PROCESSOR] Making Note = false");
        }
        //Check if frame is valid and not already written to disk. Compare buffers last to preserve resources
        if(noteState.makingNote && frame != null) {
            frameCount++;
            //Log.i(logTag,"[NOTE PROCESSOR] Frame (" + frameCount + ") isDirect:" + frame.isDirect());
            //Write the buffer from index 0 till its position. pos = number of bytes written
            //Log.i(logTag, "[NOTE PROCESSOR] Writing Buffer. Size in Bytes: " + frame.array().size);
        
            frame.rewind();
            fileChannel?.write(frame);
           // fileWriter?.write(frame.array(), 0, frame.array().size);
            lastFrame = frame;
        }
    }
    override fun close(){
    }
    /*
    * Convert to ByteBuffer, Stream into Byte Array
    * Returns NEW value for noteState.buffer
    * Complexity: O(N^2) //@TODO: Post-Release: Optimize Buffer Concat
    * Input: ArrayList of Bytebuffers
    * Returns: Combined ByteBuffer*/
    fun convertToOne(src: MutableList<ByteBuffer>): ByteBuffer {
    Log.i(logTag, "Converting List<ByteBuffer> of size: " + src.size);
        var bufferSize = 0; var i=0; var j=0;
        while(i < src.size){
            bufferSize += src[i].limit()
            i++;
        }
        var newBuffer : ByteBuffer = ByteBuffer.allocateDirect(bufferSize);

        Log.i(logTag, "Creating new ByteBuffer of size: " + bufferSize);
        while(j < src.size) {
            newBuffer.put(src[j]);
            j++;
        }
        Log.i(logTag, "Merged " + j + "Buffers into one");
        //Rewind the buffer for re-reading
        //newBuffer.rewind();
        Log.i(logTag, "Rewinding buffer for read (size: " + bufferSize + ")");
        //Send to NoteRecorder
       // noteRecorder.setInputBuffer(totalBuffer);
       // noteRecorder.recordWavStart();

                noteState.bufferSize = bufferSize;
                return newBuffer;
    }
    private fun getByteArrayFromByteBuffer(byteBuffer: ByteBuffer): ByteArray? {
        Log.i(logTag, "getByteArray called.");
        val bytesArray = ByteArray(byteBuffer.remaining())
        Log.i(logTag, "bytesarray assigned");
        byteBuffer.get(bytesArray, 0, bytesArray.size)
        Log.i(logTag, "byteBuffer.get called");
        return bytesArray
    }
    override fun reset() {
        //Reset the buffer for new voice data input
        Log.i(logTag, "Buffer Reset");
        totalBuffer.clear();
    }

}```

First off, yes; the frame should be rewound—but this is already done by the speech pipeline before sending it to each processing stage.

Outside of that, the code’s a little difficult to evaluate—the helper functions at the bottom aren’t being called, so I’m not sure what the final version of the code is, and if you’re not writing the WAV header, I can’t be sure how you’re evaluating the audio after recording it to know it’s not correct (are you listening to it in a program that doesn’t require the WAV header to open a file?).

It turns out, though, that there’s an old speech processor still in the repository that we used for debugging early on: SpeechSampler. Apologies for completely forgetting about it earlier; I think it should be straightforward to repurpose that component for your use case. Note the sample-log-path property that it expects from the configuration; that’s the path to the directory where you’ll find the files recorded by this processor. You can even mount your phone’s filesystem via adb to listen to the recordings directly on your computer with a minimum of fuss.

1 Like

Thank you josh! After implementing the code provided in SpeechSampler, we are able to obtain wavs that are playing correctly in slack and VLC player. but they do not work with normal android player and thus, we cannot use them in the app. (I also tried with flutter_sound and flutter audioplayer plugins). Did you also face this issue, or are there some extra steps you had to take?

/*================ CUSTOM SPEECH PROCESSOR ===========================*/
public class NoteProcessor(config: SpeechConfig) : SpeechProcessor {
    public final var DEFAULT_SAMPLE_MAX = 50; //Max number of notes that can be recorded
    // private val request: ApiStreamObserver<StreamingRecognizeRequest>? = null
    val logTag = javaClass.simpleName
    private var sampleMax = 0
    private var sampleId = 0
    private lateinit var noteDirectory: String;
    var stream : FileOutputStream? = null;
    var frameCount: Int = 0;
    var filepath : String? = null;
    var fileName : String = "initial";
    private var header: ByteBuffer? = null
    init {
        this.noteDirectory = config.getString("sample-log-path")
        this.sampleMax = config.getInteger(
                "sample-log-max-files",
                DEFAULT_SAMPLE_MAX)
        // create the log path if it doesn't exist
        File(noteDirectory).mkdirs()
        // create the wav file header
        val sampleRate: Int = config.getInteger("sample-rate")
        this.header = ByteBuffer
                .allocate(44)
                .order(ByteOrder.LITTLE_ENDIAN)
        // riff/wave header
        val charset = Charsets.US_ASCII
        this.header!!.put("RIFF".toByteArray(charset))
        this.header!!.putInt(Integer.MAX_VALUE)
        this.header!!.put("WAVE".toByteArray(charset))
        // format chunk
        // format chunk
        this.header!!.put("fmt ".toByteArray(charset))
        this.header!!.putInt(16) // size of format chunk
        this.header!!.putShort(1.toShort()) // pcm
        this.header!!.putShort(1.toShort()) // channels
        this.header!!.putInt(sampleRate) // sample rate
        this.header!!.putInt(sampleRate * 2) // byte rate
        this.header!!.putShort(2.toShort()) // block align
        this.header!!.putShort(16.toShort()) // bits per sample
        // data chunk
        this.header!!.put("data".toByteArray(charset))
        this.header!!.putInt(Integer.MAX_VALUE) // size of data chunk

    }

    @Throws(Exception::class)
    /*
    * Each time context is updated, extract the byteBuffer
    * & add to our increasing totalBuffer
    * */
    override fun process(context: SpeechContext, frame: ByteBuffer?) {
        if (context.isSpeech() && this.stream == null && noteState.startRecording) {
            //First instance of note speech recognized
            noteState.makingNote = true;
            noteState.startRecording = false;
            fileName = String.format("%05d.wav", this.sampleId++ % this.sampleMax)
            val file = File(this.noteDirectory, fileName)
            this.stream = FileOutputStream(file)
            this.stream!!.write(header?.array())
            Log.i(logTag, "[NOTE PROCESSOR] Initial buffer zone setup. ");
        } else if (!context.isSpeech() && this.stream != null) {
            /* Stream is null when not making a note.
            * this.stream != null === noteState.makingNote == true*/
            // speech falling edge, flush changes and close
                // Audio is no longer speech and we were in a note process
            this.stream!!.close()
            this.stream = null;
            noteState.audioPath.postValue(this.noteDirectory + "/" + fileName);
            fileName = "notRecording";
            noteState.makingNote = false;
            Log.i(logTag, "[NOTE PROCESSOR] Note finished with (" + frameCount * 20 + ") frames");
            frameCount = 0;
        }
        // write the current audio frame to the wav file if we still have valid speech audio
        if (this.stream != null && frame != null) {
            val data = ByteArray(frame.remaining())
            frame.get(data)
            this.stream!!.write(data)
            frameCount++;
        }
    }
    override fun close(){
        this.stream!!.close();
    }
    override fun reset() {
        //Reset the buffer for new voice data input
        close();
    }

}

Since the original purpose of that component was verifying/debugging the speech pipeline, we probably didn’t play the clips on a device, since a computer’s more convenient for that. It looks like you’re not the only one who’s had trouble playing wavs on their native audio player, though the format is supported.

I can’t really speak for the capabilities of Flutter audio plugins, but if you’re having trouble finding one that handles wavs, you might just look into recording the audio in a different format.

We use ExoPlayer for TTS playback, and they claim to have added wav support a few years back.

1 Like