azuretext-to-speechlarge-language-model

How to parse Azure TTS streaming text input response with websocket?


I want to use this feature to build my app, but since I'm coding with JS and there is no streaming text input in JS sdk, so I'm tring to made it on my own, I find some way to get the response from the Azure service, it looks like:

X-RequestId:0db80383ac5d418397fc63f49be940a9
Content-Type:application/json; charset=utf-8
Path:response

{"context":{"serviceTag":.....
X-RequestId:0db80383ac5d418397fc63f49be940a9
Content-Type:audio/mpeg
X-StreamId:BD89C03A20734E14BD8E12EDCEA7A129
Path:audio
[binary content]
...other chunks

Then I tried to write all Path:audio chunks binary content body to a file, but the file can not open with audio player.

What should I do with these response chunks?


Solution

  • Finally I find my way to make it happen, so I'm here to put my solution, is case someone is facing the same problem as me.

    Because we need to send some special headers to Azure service when create the websoket connection, so we need a proxy server (native Websocket in browser cannot send coustom headers).

    server.ts:

    import http from "http";
    import * as WebSocket from "ws";
    import crypto from "crypto";
    import fs from "fs";
    import path from "path";
    
    // Azure tts
    const URL =
      "wss://<your_azure_service_origin>.tts.speech.microsoft.com/cognitiveservices/websocket/v2";
    const KEY = "your_azure_service_key";
    
    const server = http.createServer((req, res) => {
      res.end("Server is Running");
    });
    
    server.on("upgrade", (req, socket, head) => {
      const remote = new WebSocket.WebSocket(URL, {
        headers: {
          "ocp-apim-subscription-key": KEY,
          "x-connectionid": crypto.randomUUID().replace(/-/g, ""),
        },
      });
      remote.on("open", () => {
        console.log("remote open");
    
        const requestId = crypto.randomUUID().replace(/-/g, "");
        const now = new Date().toISOString();
        // send speech.config
        remote.send(
          [
            `X-Timestamp:${now}`,
            "Path:speech.config",
            "",
            `${JSON.stringify({})}`,
          ].join("\r\n"),
        );
    
        // send synthesis.context
        remote.send(
          [
            `X-Timestamp:${now}`,
            "Path:synthesis.context",
            `X-RequestId:${requestId}`,
            "",
            `${JSON.stringify({
              synthesis: {
                audio: {
                  // outputFormat: "audio-16khz-32kbitrate-mono-mp3",
                  outputFormat: "raw-16khz-16bit-mono-pcm",
                  metadataOptions: {
                    visemeEnabled: false,
                    bookmarkEnabled: false,
                    wordBoundaryEnabled: false,
                    punctuationBoundaryEnabled: false,
                    sentenceBoundaryEnabled: false,
                    sessionEndEnabled: true,
                  },
                },
                language: { autoDetection: false },
                input: {
                  bidirectionalStreamingMode: true,
                  voiceName: "zh-CN-YunxiNeural",
                  language: "",
                },
              },
            })}`,
          ].join("\r\n"),
        );
    
        const client = new WebSocket.WebSocketServer({ noServer: true });
        client.handleUpgrade(req, socket, head, (clientWs) => {
          clientWs.on("message", (data: Buffer) => {
            const json = JSON.parse(data.toString("utf8")) as {
              type: "data" | "end";
              data?: string;
            };
            console.log("Client:", json);
            remote.send(
              [
                `X-Timestamp:${new Date().toISOString()}`,
                `Path:text.${json.type === "data" ? "piece" : "end"}`,
                "Content-Type:text/plain",
                `X-RequestId:${requestId}`,
                "", // empty line
                json.data || "",
              ].join("\r\n"),
            );
          });
    
          const file = createWAVFile(`speech/${Date.now()}.wav`);
          remote.on("message", (data: Buffer, isBinary) => {
            // console.log("Remote, isBinary:", isBinary);
            const { headers, content } = parseChunk(data);
            console.log({ headers });
            if (isBinary) {
              if (headers.Path === "audio") {
                // why we need to skip the first byte
                const audioContent = content.subarray(1);
                if (audioContent.length) {
                  file.write(audioContent);
                  clientWs.send(audioContent);
                }
              }
            } else if (headers.Path === "turn.end") {
              file.end();
            }
          });
    
          clientWs.on("close", () => {
            console.log("client close");
            remote.close();
          });
          clientWs.on("error", (error) => {
            console.log("client error", error);
          });
        });
        remote.on("close", (code, reason) => {
          console.log("remote close", reason.toString());
        });
        remote.on("error", (error) => {
          console.log("remote error", error);
        });
      });
    });
    
    function parseChunk(buffer: Buffer) {
      const len = buffer.length;
      const headers: string[][] = [];
      // skip first 2 bytes
      //? what means the first 2 bytes?
      let i = 2;
      let temp: number[] = [];
      let curr: string[] = [];
      let contentPosition: number;
      for (; i < len; i++) {
        if (buffer[i] === 0x3a) {
          // :
          curr.push(Buffer.from(temp).toString());
          temp = [];
        } else if (buffer[i] === 0x0d && buffer[i + 1] === 0x0a) {
          // \r\n
          // maybe empty line
          if (temp.length) {
            curr.push(Buffer.from(temp).toString());
            temp = [];
            headers.push(curr);
            curr = [];
          }
          i += 1; // skip \n
          contentPosition = i;
          if (headers.at(-1)?.[0] === "Path") {
            // if we get `Path`
            break;
          }
        } else {
          temp.push(buffer[i]);
        }
      }
    
      const obj: Record<string, string> = {};
      for (const [key, value] of headers) {
        obj[key] = value;
      }
    
      const content = buffer.subarray(contentPosition!);
    
      return { headers: obj, content };
    }
    
    // for test
    function createWAVFile(
      filename: string,
      sampleRate = 16000,
      bitDepth = 16,
      channels = 1,
    ) {
      let dataLength = 0;
      let data = Buffer.alloc(0);
      return {
        write(chunk: Buffer) {
          dataLength += chunk.length;
          data = Buffer.concat([data, chunk]);
        },
        end() {
          const byteRate = sampleRate * (bitDepth / 8) * channels;
          const blockAlign = (bitDepth / 8) * channels;
    
          // WAV head
          const buffer = Buffer.alloc(44);
          buffer.write("RIFF", 0); // ChunkID
          buffer.writeUInt32LE(36 + dataLength, 4); // ChunkSize
          buffer.write("WAVE", 8); // Format
          buffer.write("fmt ", 12); // Subchunk1ID
          buffer.writeUInt32LE(16, 16); // Subchunk1Size (16 for PCM)
          buffer.writeUInt16LE(1, 20); // AudioFormat (1 = PCM)
          buffer.writeUInt16LE(channels, 22); // Channels
          buffer.writeUInt32LE(sampleRate, 24); // SampleRate
          buffer.writeUInt32LE(byteRate, 28); // ByteRate
          buffer.writeUInt16LE(blockAlign, 32); // BlockAlign
          buffer.writeUInt16LE(bitDepth, 34); // BitsPerSample
          buffer.write("data", 36); // Subchunk2ID
          buffer.writeUInt32LE(dataLength, 40); // Subchunk2Size
    
          const stream = fs.createWriteStream(filename);
          stream.write(buffer);
          stream.write(data);
          stream.end();
          console.log(`write to file ${filename}`);
        },
      };
    }
    
    server.listen(8080);
    

    player.ts:

    type StreamingAudioPlayerOptions = {
      autoPlay: boolean;
    };
    
    export class StreamingAudioPlayer {
      private context = new AudioContext();
      private chunks: Blob[] = [];
      private decodeChunkIndex = 0;
      private buffers: AudioBuffer[] = [];
      private duration = 0;
      private decoding = false;
      private scheduleIndex = 0;
      private currentDuration = 0; // 粗略记录已播放时长,用于展示,不可用于播放控制
      private state: "play" | "stop" = "stop";
      private isPlaying = false; // 是否真的在播放
      // 跟踪下一个缓冲区的预定播放时间
      private nextScheduledTime = 0;
      // 跟踪已创建的音频源
      private activeSources: AudioBufferSourceNode[] = [];
      private sourceSchedule = new WeakMap<AudioBufferSourceNode, [number]>();
      private beginOffset = 0;
      private timer: number | null;
    
      constructor(private readonly options: StreamingAudioPlayerOptions) {}
    
      private async decodeAudioChunks() {
        if (this.decoding || this.chunks.length === 0) {
          return;
        }
    
        this.decoding = true;
        while (this.decodeChunkIndex < this.chunks.length) {
          const originBuffer =
            await this.chunks[this.decodeChunkIndex].arrayBuffer();
    
          // Step 1: 转成 Int16
          const int16 = new Int16Array(originBuffer);
    
          // Step 2: 转成 Float32
          const float32 = new Float32Array(int16.length);
          for (let i = 0; i < int16.length; i++) {
            float32[i] = int16[i] / 32768; // Normalize to [-1.0, 1.0]
          }
    
          // Step 3: 创建 AudioBuffer (单声道)
          const audioBuffer = this.context.createBuffer(
            1, // mono
            float32.length,
            16000, // sampleRate
          );
    
          audioBuffer.copyToChannel(float32, 0);
          this.buffers.push(audioBuffer);
          this.duration += audioBuffer.duration;
          console.log(
            `chunk ${this.decodeChunkIndex} decoded, total buffer duration: ${this.duration}`,
          );
          this.decodeChunkIndex++;
    
          if (this.state === "play" && !this.isPlaying) {
            console.log("ready to play");
            this._play();
          } else if (this.state === "stop" && this.options.autoPlay) {
            this.play();
          }
        }
        this.decoding = false;
      }
    
      async append(chunk: Blob) {
        this.chunks.push(chunk);
        if (!this.decoding) {
          this.decodeAudioChunks();
        }
      }
    
      private scheduleBuffers() {
        while (this.scheduleIndex < this.buffers.length) {
          if (this.nextScheduledTime - this.context.currentTime > 10) {
            // 缓冲控制在 10s 左右
            break;
          }
          const buffer = this.buffers[this.scheduleIndex];
          const source = this.context.createBufferSource();
          source.buffer = buffer;
          // 记录并更新预定时间
          const startTime = this.nextScheduledTime;
          this.nextScheduledTime += buffer.duration;
    
          source.connect(this.context.destination);
          if (this.beginOffset !== 0) {
            source.start(startTime, this.beginOffset);
            this.beginOffset = 0;
          } else {
            source.start(startTime);
          }
          this.sourceSchedule.set(source, [startTime]);
          console.log(`schedule chunk ${this.scheduleIndex}`);
          this.activeSources.push(source);
          const index = this.scheduleIndex;
          this.scheduleIndex++;
    
          // 监听播放结束来维护状态
          source.addEventListener("ended", () => {
            // 移除已结束的源
            this.activeSources = this.activeSources.filter((s) => s !== source);
            if (this.state !== "play") {
              return;
            }
            console.log(`chunk ${index} play finish`);
            if (this.scheduleIndex < this.buffers.length) {
              // 继续安排未播放的切片
              this.scheduleBuffers();
            } else if (this.activeSources.length === 0) {
              // 如果没有剩余的播放源,那么停止播放
              this._stop();
            }
          });
        }
      }
    
      private _play() {
        // 使用计时器粗略记录已播放时长
        // ?播放卡住了怎么办
        const updatePlayDuration = (timestamp1: number) => {
          return (timestamp2: number) => {
            this.currentDuration += timestamp2 - timestamp1;
            this.timer = requestAnimationFrame(updatePlayDuration(timestamp2));
          };
        };
        this.timer = requestAnimationFrame(updatePlayDuration(performance.now()));
        // 初始化播放时间为当前上下文时间
        this.nextScheduledTime = this.context.currentTime;
        this.isPlaying = true;
        this.scheduleBuffers();
      }
    
      private _stop() {
        if (this.state !== "play") {
          return;
        }
    
        // 停止所有活跃的音频源
        this.activeSources.forEach((source, index) => {
          if (index === 0) {
            // current playing source
            const offset =
              this.context.currentTime - this.sourceSchedule.get(source)![0];
            console.log("offset:", offset);
          }
          source.stop();
        });
    
        cancelAnimationFrame(this.timer!);
        this.timer = null;
    
        this.activeSources = [];
        // 不确定是否加载了全部的音频切片
        this.state = "stop";
        this.isPlaying = false;
        console.log(`played duration: ${this.currentDuration}`);
      }
    
      resume() {
        // 恢复播放应该依据已播放时间
        // 因为已播放时间可以通过时间轴(暂未实现)调整
        this.scheduleIndex = 0;
        let d = 0;
        for (; this.scheduleIndex < this.buffers.length; this.scheduleIndex++) {
          const buffer = this.buffers[this.scheduleIndex];
          if (d + buffer.duration * 1000 > this.currentDuration) {
            break;
          }
          d += buffer.duration * 1000;
        }
        this.state = "play";
        this.beginOffset = (this.currentDuration - d) / 1000;
        console.log("resume offset", this.beginOffset);
        this._play();
      }
    
      play() {
        if (this.state === "play") {
          return;
        }
        this.state = "play";
        this.duration = this.buffers.reduce((total, buffer) => {
          return total + buffer.duration;
        }, 0);
        if (this.duration === 0) {
          console.warn("waiting buffer");
          return;
        }
    
        this.currentDuration = 0;
        this.scheduleIndex = 0;
        console.log(this);
        this._play();
      }
    
      pause() {
        this._stop();
      }
    }
    

    index.js:

    // something like:
    const player = new StreamingAudioPlayer({ autoPlay: true });
    const ws = new Websocket("xxx");
    ws.send('{"type":"data","data":"你好"}');
    ws.send('{"type":"data","data":",世界!"}');
    ws.send('{"type":"end"}');
    ws.addEventListener("message", (e) => {
      player.append(e.data as Blob);
    });
    

    The code is for reference only. If anyone has any better suggestions, please feel free to share your thoughts.