I want to use this feature to build my app, but since I'm coding with JS and there is no streaming text input in JS sdk, so I'm tring to made it on my own, I find some way to get the response from the Azure service, it looks like:
X-RequestId:0db80383ac5d418397fc63f49be940a9
Content-Type:application/json; charset=utf-8
Path:response
{"context":{"serviceTag":.....
X-RequestId:0db80383ac5d418397fc63f49be940a9
Content-Type:audio/mpeg
X-StreamId:BD89C03A20734E14BD8E12EDCEA7A129
Path:audio
[binary content]
...other chunks
Then I tried to write all Path:audio
chunks binary content body to a file, but the file can not open with audio player.
What should I do with these response chunks?
Finally I find my way to make it happen, so I'm here to put my solution, is case someone is facing the same problem as me.
Because we need to send some special headers to Azure service when create the websoket connection, so we need a proxy server (native Websocket in browser cannot send coustom headers).
server.ts
:
import http from "http";
import * as WebSocket from "ws";
import crypto from "crypto";
import fs from "fs";
import path from "path";
// Azure tts
const URL =
"wss://<your_azure_service_origin>.tts.speech.microsoft.com/cognitiveservices/websocket/v2";
const KEY = "your_azure_service_key";
const server = http.createServer((req, res) => {
res.end("Server is Running");
});
server.on("upgrade", (req, socket, head) => {
const remote = new WebSocket.WebSocket(URL, {
headers: {
"ocp-apim-subscription-key": KEY,
"x-connectionid": crypto.randomUUID().replace(/-/g, ""),
},
});
remote.on("open", () => {
console.log("remote open");
const requestId = crypto.randomUUID().replace(/-/g, "");
const now = new Date().toISOString();
// send speech.config
remote.send(
[
`X-Timestamp:${now}`,
"Path:speech.config",
"",
`${JSON.stringify({})}`,
].join("\r\n"),
);
// send synthesis.context
remote.send(
[
`X-Timestamp:${now}`,
"Path:synthesis.context",
`X-RequestId:${requestId}`,
"",
`${JSON.stringify({
synthesis: {
audio: {
// outputFormat: "audio-16khz-32kbitrate-mono-mp3",
outputFormat: "raw-16khz-16bit-mono-pcm",
metadataOptions: {
visemeEnabled: false,
bookmarkEnabled: false,
wordBoundaryEnabled: false,
punctuationBoundaryEnabled: false,
sentenceBoundaryEnabled: false,
sessionEndEnabled: true,
},
},
language: { autoDetection: false },
input: {
bidirectionalStreamingMode: true,
voiceName: "zh-CN-YunxiNeural",
language: "",
},
},
})}`,
].join("\r\n"),
);
const client = new WebSocket.WebSocketServer({ noServer: true });
client.handleUpgrade(req, socket, head, (clientWs) => {
clientWs.on("message", (data: Buffer) => {
const json = JSON.parse(data.toString("utf8")) as {
type: "data" | "end";
data?: string;
};
console.log("Client:", json);
remote.send(
[
`X-Timestamp:${new Date().toISOString()}`,
`Path:text.${json.type === "data" ? "piece" : "end"}`,
"Content-Type:text/plain",
`X-RequestId:${requestId}`,
"", // empty line
json.data || "",
].join("\r\n"),
);
});
const file = createWAVFile(`speech/${Date.now()}.wav`);
remote.on("message", (data: Buffer, isBinary) => {
// console.log("Remote, isBinary:", isBinary);
const { headers, content } = parseChunk(data);
console.log({ headers });
if (isBinary) {
if (headers.Path === "audio") {
// why we need to skip the first byte
const audioContent = content.subarray(1);
if (audioContent.length) {
file.write(audioContent);
clientWs.send(audioContent);
}
}
} else if (headers.Path === "turn.end") {
file.end();
}
});
clientWs.on("close", () => {
console.log("client close");
remote.close();
});
clientWs.on("error", (error) => {
console.log("client error", error);
});
});
remote.on("close", (code, reason) => {
console.log("remote close", reason.toString());
});
remote.on("error", (error) => {
console.log("remote error", error);
});
});
});
function parseChunk(buffer: Buffer) {
const len = buffer.length;
const headers: string[][] = [];
// skip first 2 bytes
//? what means the first 2 bytes?
let i = 2;
let temp: number[] = [];
let curr: string[] = [];
let contentPosition: number;
for (; i < len; i++) {
if (buffer[i] === 0x3a) {
// :
curr.push(Buffer.from(temp).toString());
temp = [];
} else if (buffer[i] === 0x0d && buffer[i + 1] === 0x0a) {
// \r\n
// maybe empty line
if (temp.length) {
curr.push(Buffer.from(temp).toString());
temp = [];
headers.push(curr);
curr = [];
}
i += 1; // skip \n
contentPosition = i;
if (headers.at(-1)?.[0] === "Path") {
// if we get `Path`
break;
}
} else {
temp.push(buffer[i]);
}
}
const obj: Record<string, string> = {};
for (const [key, value] of headers) {
obj[key] = value;
}
const content = buffer.subarray(contentPosition!);
return { headers: obj, content };
}
// for test
function createWAVFile(
filename: string,
sampleRate = 16000,
bitDepth = 16,
channels = 1,
) {
let dataLength = 0;
let data = Buffer.alloc(0);
return {
write(chunk: Buffer) {
dataLength += chunk.length;
data = Buffer.concat([data, chunk]);
},
end() {
const byteRate = sampleRate * (bitDepth / 8) * channels;
const blockAlign = (bitDepth / 8) * channels;
// WAV head
const buffer = Buffer.alloc(44);
buffer.write("RIFF", 0); // ChunkID
buffer.writeUInt32LE(36 + dataLength, 4); // ChunkSize
buffer.write("WAVE", 8); // Format
buffer.write("fmt ", 12); // Subchunk1ID
buffer.writeUInt32LE(16, 16); // Subchunk1Size (16 for PCM)
buffer.writeUInt16LE(1, 20); // AudioFormat (1 = PCM)
buffer.writeUInt16LE(channels, 22); // Channels
buffer.writeUInt32LE(sampleRate, 24); // SampleRate
buffer.writeUInt32LE(byteRate, 28); // ByteRate
buffer.writeUInt16LE(blockAlign, 32); // BlockAlign
buffer.writeUInt16LE(bitDepth, 34); // BitsPerSample
buffer.write("data", 36); // Subchunk2ID
buffer.writeUInt32LE(dataLength, 40); // Subchunk2Size
const stream = fs.createWriteStream(filename);
stream.write(buffer);
stream.write(data);
stream.end();
console.log(`write to file ${filename}`);
},
};
}
server.listen(8080);
player.ts
:
type StreamingAudioPlayerOptions = {
autoPlay: boolean;
};
export class StreamingAudioPlayer {
private context = new AudioContext();
private chunks: Blob[] = [];
private decodeChunkIndex = 0;
private buffers: AudioBuffer[] = [];
private duration = 0;
private decoding = false;
private scheduleIndex = 0;
private currentDuration = 0; // 粗略记录已播放时长,用于展示,不可用于播放控制
private state: "play" | "stop" = "stop";
private isPlaying = false; // 是否真的在播放
// 跟踪下一个缓冲区的预定播放时间
private nextScheduledTime = 0;
// 跟踪已创建的音频源
private activeSources: AudioBufferSourceNode[] = [];
private sourceSchedule = new WeakMap<AudioBufferSourceNode, [number]>();
private beginOffset = 0;
private timer: number | null;
constructor(private readonly options: StreamingAudioPlayerOptions) {}
private async decodeAudioChunks() {
if (this.decoding || this.chunks.length === 0) {
return;
}
this.decoding = true;
while (this.decodeChunkIndex < this.chunks.length) {
const originBuffer =
await this.chunks[this.decodeChunkIndex].arrayBuffer();
// Step 1: 转成 Int16
const int16 = new Int16Array(originBuffer);
// Step 2: 转成 Float32
const float32 = new Float32Array(int16.length);
for (let i = 0; i < int16.length; i++) {
float32[i] = int16[i] / 32768; // Normalize to [-1.0, 1.0]
}
// Step 3: 创建 AudioBuffer (单声道)
const audioBuffer = this.context.createBuffer(
1, // mono
float32.length,
16000, // sampleRate
);
audioBuffer.copyToChannel(float32, 0);
this.buffers.push(audioBuffer);
this.duration += audioBuffer.duration;
console.log(
`chunk ${this.decodeChunkIndex} decoded, total buffer duration: ${this.duration}`,
);
this.decodeChunkIndex++;
if (this.state === "play" && !this.isPlaying) {
console.log("ready to play");
this._play();
} else if (this.state === "stop" && this.options.autoPlay) {
this.play();
}
}
this.decoding = false;
}
async append(chunk: Blob) {
this.chunks.push(chunk);
if (!this.decoding) {
this.decodeAudioChunks();
}
}
private scheduleBuffers() {
while (this.scheduleIndex < this.buffers.length) {
if (this.nextScheduledTime - this.context.currentTime > 10) {
// 缓冲控制在 10s 左右
break;
}
const buffer = this.buffers[this.scheduleIndex];
const source = this.context.createBufferSource();
source.buffer = buffer;
// 记录并更新预定时间
const startTime = this.nextScheduledTime;
this.nextScheduledTime += buffer.duration;
source.connect(this.context.destination);
if (this.beginOffset !== 0) {
source.start(startTime, this.beginOffset);
this.beginOffset = 0;
} else {
source.start(startTime);
}
this.sourceSchedule.set(source, [startTime]);
console.log(`schedule chunk ${this.scheduleIndex}`);
this.activeSources.push(source);
const index = this.scheduleIndex;
this.scheduleIndex++;
// 监听播放结束来维护状态
source.addEventListener("ended", () => {
// 移除已结束的源
this.activeSources = this.activeSources.filter((s) => s !== source);
if (this.state !== "play") {
return;
}
console.log(`chunk ${index} play finish`);
if (this.scheduleIndex < this.buffers.length) {
// 继续安排未播放的切片
this.scheduleBuffers();
} else if (this.activeSources.length === 0) {
// 如果没有剩余的播放源,那么停止播放
this._stop();
}
});
}
}
private _play() {
// 使用计时器粗略记录已播放时长
// ?播放卡住了怎么办
const updatePlayDuration = (timestamp1: number) => {
return (timestamp2: number) => {
this.currentDuration += timestamp2 - timestamp1;
this.timer = requestAnimationFrame(updatePlayDuration(timestamp2));
};
};
this.timer = requestAnimationFrame(updatePlayDuration(performance.now()));
// 初始化播放时间为当前上下文时间
this.nextScheduledTime = this.context.currentTime;
this.isPlaying = true;
this.scheduleBuffers();
}
private _stop() {
if (this.state !== "play") {
return;
}
// 停止所有活跃的音频源
this.activeSources.forEach((source, index) => {
if (index === 0) {
// current playing source
const offset =
this.context.currentTime - this.sourceSchedule.get(source)![0];
console.log("offset:", offset);
}
source.stop();
});
cancelAnimationFrame(this.timer!);
this.timer = null;
this.activeSources = [];
// 不确定是否加载了全部的音频切片
this.state = "stop";
this.isPlaying = false;
console.log(`played duration: ${this.currentDuration}`);
}
resume() {
// 恢复播放应该依据已播放时间
// 因为已播放时间可以通过时间轴(暂未实现)调整
this.scheduleIndex = 0;
let d = 0;
for (; this.scheduleIndex < this.buffers.length; this.scheduleIndex++) {
const buffer = this.buffers[this.scheduleIndex];
if (d + buffer.duration * 1000 > this.currentDuration) {
break;
}
d += buffer.duration * 1000;
}
this.state = "play";
this.beginOffset = (this.currentDuration - d) / 1000;
console.log("resume offset", this.beginOffset);
this._play();
}
play() {
if (this.state === "play") {
return;
}
this.state = "play";
this.duration = this.buffers.reduce((total, buffer) => {
return total + buffer.duration;
}, 0);
if (this.duration === 0) {
console.warn("waiting buffer");
return;
}
this.currentDuration = 0;
this.scheduleIndex = 0;
console.log(this);
this._play();
}
pause() {
this._stop();
}
}
index.js
:
// something like:
const player = new StreamingAudioPlayer({ autoPlay: true });
const ws = new Websocket("xxx");
ws.send('{"type":"data","data":"你好"}');
ws.send('{"type":"data","data":",世界!"}');
ws.send('{"type":"end"}');
ws.addEventListener("message", (e) => {
player.append(e.data as Blob);
});
The code is for reference only. If anyone has any better suggestions, please feel free to share your thoughts.