// https://www.volcengine.com/docs/6561/1329505#%E7%A4%BA%E4%BE%8Bsamples import { WebSocket } from 'ws'; import { EventEmitter } from 'eventemitter3'; import fs from 'fs/promises'; import { nanoid } from 'nanoid'; const uuidv4 = nanoid; const PROTOCOL_VERSION = 0b0001; const DEFAULT_HEADER_SIZE = 0b0001; // Message Type: const FULL_CLIENT_REQUEST = 0b0001; const AUDIO_ONLY_RESPONSE = 0b1011; const FULL_SERVER_RESPONSE = 0b1001; const ERROR_INFORMATION = 0b1111; // Message Type Specific Flags const MsgTypeFlagNoSeq = 0b0000; // Non-terminal packet with no sequence const MsgTypeFlagPositiveSeq = 0b1; // Non-terminal packet with sequence > 0 const MsgTypeFlagLastNoSeq = 0b10; // last packet with no sequence const MsgTypeFlagNegativeSeq = 0b11; // Payload contains event number (int32) const MsgTypeFlagWithEvent = 0b100; // Message Serialization const NO_SERIALIZATION = 0b0000; const JSON_TYPE = 0b0001; // Message Compression const COMPRESSION_NO = 0b0000; const COMPRESSION_GZIP = 0b0001; const EVENT_NONE = 0; const EVENT_Start_Connection = 1; const EVENT_FinishConnection = 2; const EVENT_ConnectionStarted = 50; // 成功建连 const EVENT_ConnectionFailed = 51; // 建连失败(可能是无法通过权限认证) const EVENT_ConnectionFinished = 52; // 连接结束 // 上行Session事件 const EVENT_StartSession = 100; const EVENT_FinishSession = 102; // 下行Session事件 const EVENT_SessionStarted = 150; const EVENT_SessionFinished = 152; const EVENT_SessionFailed = 153; // 上行通用事件 const EVENT_TaskRequest = 200; // 下行TTS事件 const EVENT_TTSSentenceStart = 350; const EVENT_TTSSentenceEnd = 351; const EVENT_TTSResponse = 352; class Header { headerSize: number; protocolVersion: number; messageType: number; messageTypeSpecificFlags: number; serialMethod: number; compressionType: number; reservedData: number; constructor( protocolVersion: number = PROTOCOL_VERSION, headerSize: number = DEFAULT_HEADER_SIZE, messageType: number = 0, messageTypeSpecificFlags: number = 0, serialMethod: number = NO_SERIALIZATION, compressionType: number = COMPRESSION_NO, reservedData: number = 0, ) { this.headerSize = headerSize; this.protocolVersion = protocolVersion; this.messageType = messageType; this.messageTypeSpecificFlags = messageTypeSpecificFlags; this.serialMethod = serialMethod; this.compressionType = compressionType; this.reservedData = reservedData; } asBytes(): Buffer { return Buffer.from([ (this.protocolVersion << 4) | this.headerSize, (this.messageType << 4) | this.messageTypeSpecificFlags, (this.serialMethod << 4) | this.compressionType, this.reservedData, ]); } } class Optional { event: number; sessionId: string | null; errorCode: number; connectionId: string | null; responseMetaJson: string | null; sequence: number | null; constructor(event: number = EVENT_NONE, sessionId: string | null = null, sequence: number | null = null) { this.event = event; this.sessionId = sessionId; this.errorCode = 0; this.connectionId = null; this.responseMetaJson = null; this.sequence = sequence; } // 转成 byte 序列 asBytes(): Buffer { const optionBytes = Buffer.alloc(0); let result = optionBytes; if (this.event !== EVENT_NONE) { const eventBuffer = Buffer.alloc(4); eventBuffer.writeInt32BE(this.event); result = Buffer.concat([result, eventBuffer]); } if (this.sessionId !== null) { const sessionIdBytes = Buffer.from(this.sessionId); const sizeBuffer = Buffer.alloc(4); sizeBuffer.writeInt32BE(sessionIdBytes.length); result = Buffer.concat([result, sizeBuffer, sessionIdBytes]); } if (this.sequence !== null) { const sequenceBuffer = Buffer.alloc(4); sequenceBuffer.writeInt32BE(this.sequence); result = Buffer.concat([result, sequenceBuffer]); } return result; } } class Response { optional: Optional; header: Header; payload: Buffer | null; constructor(header: Header, optional: Optional) { this.optional = optional; this.header = header; this.payload = null; } toString(): string { return this.payload?.toString() || ''; } } // 发送事件 async function sendEvent(ws: WebSocket, header: Buffer, optional: Buffer | null = null, payload: Buffer | null = null): Promise { return new Promise((resolve, reject) => { const fullClientRequest: Buffer[] = [header]; if (optional !== null) { fullClientRequest.push(optional); } if (payload !== null) { const payloadSizeBuffer = Buffer.alloc(4); payloadSizeBuffer.writeInt32BE(payload.length); fullClientRequest.push(payloadSizeBuffer, payload); } ws.send(Buffer.concat(fullClientRequest), (err) => { if (err) reject(err); else resolve(); }); }); } // 读取 res 数组某段 字符串内容 function readResContent(res: Buffer, offset: number): [string, number] { const contentSize = res.readInt32BE(offset); offset += 4; const content = res.slice(offset, offset + contentSize).toString(); offset += contentSize; return [content, offset]; } // 读取 payload function readResPayload(res: Buffer, offset: number): [Buffer, number] { const payloadSize = res.readInt32BE(offset); offset += 4; const payload = res.slice(offset, offset + payloadSize); offset += payloadSize; return [payload, offset]; } // 解析响应结果 function parserResponse(res: Buffer | string): Response { if (typeof res === 'string') { throw new Error(res); } const response = new Response(new Header(), new Optional()); // 解析结果 // header const header = response.header; const num = 0b00001111; header.protocolVersion = (res[0] >> 4) & num; header.headerSize = res[0] & 0x0f; header.messageType = (res[1] >> 4) & num; header.messageTypeSpecificFlags = res[1] & 0x0f; header.serialMethod = (res[2] >> 4) & num; header.compressionType = res[2] & 0x0f; header.reservedData = res[3]; let offset = 4; const optional = response.optional; if (header.messageType === FULL_SERVER_RESPONSE || header.messageType === AUDIO_ONLY_RESPONSE) { // read event if (header.messageTypeSpecificFlags === MsgTypeFlagWithEvent) { optional.event = res.readInt32BE(offset); offset += 4; if (optional.event === EVENT_NONE) { return response; } // read connectionId else if (optional.event === EVENT_ConnectionStarted) { [optional.connectionId, offset] = readResContent(res, offset); } else if (optional.event === EVENT_ConnectionFailed) { [optional.responseMetaJson, offset] = readResContent(res, offset); } else if (optional.event === EVENT_SessionStarted || optional.event === EVENT_SessionFailed || optional.event === EVENT_SessionFinished) { [optional.sessionId, offset] = readResContent(res, offset); [optional.responseMetaJson, offset] = readResContent(res, offset); } else { [optional.sessionId, offset] = readResContent(res, offset); [response.payload, offset] = readResPayload(res, offset); } } } else if (header.messageType === ERROR_INFORMATION) { optional.errorCode = res.readInt32BE(offset); offset += 4; [response.payload, offset] = readResPayload(res, offset); } return response; } function printResponse(res: Response, tag: string): void { // console.log(`===>${tag} header:`, res.header, res.optional.event); // console.log(`===>${tag} optional:`, res.optional); } function getPayloadBytes( uid: string = '1234', event: number = EVENT_NONE, text: string = '', speaker: string = '', audioFormat: string = 'mp3', audioSampleRate: number = 24000, ): Buffer { return Buffer.from( JSON.stringify({ user: { uid }, event, namespace: 'BidirectionalTTS', req_params: { text, speaker, audio_params: { format: audioFormat, sample_rate: audioSampleRate, }, }, }), ); } async function startConnection(websocket: WebSocket): Promise { const header = new Header(PROTOCOL_VERSION, DEFAULT_HEADER_SIZE, FULL_CLIENT_REQUEST, MsgTypeFlagWithEvent).asBytes(); const optional = new Optional(EVENT_Start_Connection).asBytes(); const payload = Buffer.from('{}'); return await sendEvent(websocket, header, optional, payload); } async function startSession(websocket: WebSocket, speaker: string, sessionId: string): Promise { const header = new Header(PROTOCOL_VERSION, DEFAULT_HEADER_SIZE, FULL_CLIENT_REQUEST, MsgTypeFlagWithEvent, JSON_TYPE).asBytes(); const optional = new Optional(EVENT_StartSession, sessionId).asBytes(); const payload = getPayloadBytes('1234', EVENT_StartSession, '', speaker); return await sendEvent(websocket, header, optional, payload); } async function sendText(ws: WebSocket, speaker: string, text: string, sessionId: string): Promise { const header = new Header(PROTOCOL_VERSION, DEFAULT_HEADER_SIZE, FULL_CLIENT_REQUEST, MsgTypeFlagWithEvent, JSON_TYPE).asBytes(); console.log('sendText=========', text); const optional = new Optional(EVENT_TaskRequest, sessionId).asBytes(); const payload = getPayloadBytes('1234', EVENT_TaskRequest, text, speaker); return await sendEvent(ws, header, optional, payload); } async function finishSession(ws: WebSocket, sessionId: string): Promise { const header = new Header(PROTOCOL_VERSION, DEFAULT_HEADER_SIZE, FULL_CLIENT_REQUEST, MsgTypeFlagWithEvent, JSON_TYPE).asBytes(); const optional = new Optional(EVENT_FinishSession, sessionId).asBytes(); const payload = Buffer.from('{}'); return await sendEvent(ws, header, optional, payload); } async function finishConnection(ws: WebSocket): Promise { const header = new Header(PROTOCOL_VERSION, DEFAULT_HEADER_SIZE, FULL_CLIENT_REQUEST, MsgTypeFlagWithEvent, JSON_TYPE).asBytes(); const optional = new Optional(EVENT_FinishConnection).asBytes(); const payload = Buffer.from('{}'); return await sendEvent(ws, header, optional, payload); } type RunOptions = { id?: string; autoEnd?: boolean; }; export async function runDemo( appId: string, token: string, speaker: string, text: string, outputPath: string, emitter?: EventEmitter, opts: RunOptions = {}, ): Promise { const autoEnd = opts.autoEnd ?? true; return new Promise((resolve, reject) => { const wsHeader = { 'X-Api-App-Key': appId, 'X-Api-Access-Key': token, 'X-Api-Resource-Id': 'volc.service_type.10029', 'X-Api-Connect-Id': uuidv4(), }; const url = 'wss://openspeech.bytedance.com/api/v3/tts/bidirection'; const ws = new WebSocket(url, { headers: wsHeader }); const filename = outputPath.split('/').pop() || ''; // 开始连接 let isBegin = true; const writeFileEmitter = (data: Buffer) => { const value: TTSWriteType = { type: 'tts-mix', filename, data, }; if (isBegin) { value.isBegin = true; isBegin = false; } emitter?.emit?.('writeFile', value); }; const finishEmitter = () => { emitter?.emit?.('writeFile', { type: 'tts-mix', isEnd: true, data: Buffer.from(''), filename, }); }; ws.on('open', async () => { try { await startConnection(ws); let fileHandle: fs.FileHandle | null = null; let sessionId: string = ''; let isFirstResponse = true; let cacheText = ''; const emitOk = (id: string, code = 200) => { emitter.emit(id, { code, msg: 'ok' }); }; emitter.on('text', async ({ text, id }) => { await sendText(ws, speaker, text, sessionId); emitOk(id); }); emitter.on('textEnd', async ({ id }) => { console.log('text end'); await finishSession(ws, sessionId); emitOk(id); }); ws.on('message', async (data) => { try { const res = parserResponse(data as Buffer); printResponse(res, 'message res:'); if (res.optional.event === EVENT_ConnectionStarted) { sessionId = uuidv4().replace(/-/g, ''); await startSession(ws, speaker, sessionId); return; } else if (res.optional.event === EVENT_ConnectionFinished) { ws.close(); resolve(); return; } if (res.optional.event === EVENT_SessionStarted && isFirstResponse) { isFirstResponse = false; console.log('start session', sessionId, autoEnd); emitter.emit('isConnect', sessionId); text && (await sendText(ws, speaker, text, sessionId)); autoEnd && (await finishSession(ws, sessionId)); fileHandle = await fs.open(outputPath, 'w'); } else if (!isFirstResponse) { if (res.optional.event === EVENT_TTSResponse && res.header.messageType === AUDIO_ONLY_RESPONSE && res.payload && fileHandle) { await fileHandle.write(res.payload); writeFileEmitter(res.payload); } else if (res.optional.event === EVENT_TTSSentenceStart || res.optional.event === EVENT_TTSSentenceEnd) { // continue } else { // 152 if (fileHandle) { await fileHandle.close(); fileHandle = null; } await finishConnection(ws); finishEmitter(); } } } catch (err) { ws.close(); reject(err); } }); } catch (err) { ws.close(); reject(err); } }); ws.on('error', (err) => { reject(err); }); }); } type TTSWriteType = { type: 'tts-mix'; filename: string; data?: Buffer; isBegin?: boolean; isEnd?: boolean; index?: number; }; export class TtsMix { appId: string; token: string; emitter: EventEmitter; isStart = false; constructor(appId: string, token: string) { this.appId = appId; this.token = token; this.emitter = new EventEmitter(); this.emitter.on('isConnect', () => { this.isStart = true; }); } /** * 获取语音 * @param speaker 说话人 * @param text 文本 * @param outputPath 输出路径 * @returns */ async getVoiceDemo(speaker: string, text: string, outputPath: string, autoEnd = true): Promise { const id = nanoid(); const listenId = 'text' + id; return runDemo(this.appId, this.token, speaker, text, outputPath, this.emitter, { autoEnd, id: listenId }); } async isConnect() { if (this.isStart) { return Promise.resolve(true); } return new Promise((resolve) => { this.emitter.once('isConnect', resolve); }); } async sendText(text: string): Promise<{ code?: number; msg?: string }> { const id = nanoid(); return new Promise((resolve) => { this.emitter.once(id, resolve); this.emitter.emit('text', { text, id }); }); } async sendTextEnd() { const id = nanoid(); return new Promise((resolve) => { this.emitter.once(id, resolve); this.emitter.emit('textEnd', { id }); }); } /** * 写入文件的时候同步的流监听 * @param callback * @returns */ onWriteFile(callback: (data: TTSWriteType) => void) { this.emitter.on('writeFile', callback); return () => { this.emitter.off?.('writeFile', callback); }; } /** * 缓冲区写入 * @TIPS 如果数据过小,音频在前端播放的时候,会卡顿 * @param callback * @returns */ onWriteFileBuffer(callback: (data: TTSWriteType) => void, opts?: { chunkSize?: number }) { let index = 0; let sendBuffer: Buffer = Buffer.alloc(0); let chunkSize = opts?.chunkSize || 1024 * 50; // 50kb const callbackBuff = (data: TTSWriteType) => { index++; if (data.isBegin) { callback(data); return; } const { data: videoBuffer } = data; const sendValue = { ...data, index, }; sendBuffer = Buffer.concat([sendBuffer, videoBuffer]); const sendBufferLenght = Buffer.byteLength(sendBuffer); if (sendBufferLenght > chunkSize) { sendValue.data = sendBuffer; callback(sendValue); sendBuffer = Buffer.alloc(0); } else if (data.isEnd) { sendValue.data = sendBuffer; callback(sendValue); sendBuffer = Buffer.alloc(0); } }; this.emitter.on('writeFile', callbackBuff); return () => { this.emitter.off?.('writeFile', callbackBuff); }; } }