diff --git a/bun.lockb b/bun.lockb index b48b725..6034660 100755 Binary files a/bun.lockb and b/bun.lockb differ diff --git a/package.json b/package.json index 3d6d52d..f93fc8e 100644 --- a/package.json +++ b/package.json @@ -37,6 +37,7 @@ "server-only": "^0.0.1" }, "devDependencies": { + "@msgpack/msgpack": "^3.1.3", "@types/bun": "^1.3.13", "@types/node": "^22.18.13", "@types/react": "19.2.2", diff --git a/src/pipeline/models.ts b/src/pipeline/models.ts index 54ed81d..3bc1605 100644 --- a/src/pipeline/models.ts +++ b/src/pipeline/models.ts @@ -90,6 +90,19 @@ const NEW_MODELS: PipelineModel[] = [ name: 'TTS-1.5 Mini', registered: false, }, + // Fish Audio s2.1-pro, request mode `latency: "balanced"`. The transport + // (transports/fish.ts) speaks the realtime msgpack WS; pipelineModels uses + // vendorModelId for the `model` header on every request. arenaApiId is + // FROZEN ONCE clips are hashed. + { + id: 'fish-s21-pro', + slug: 'fish-s2-1-pro', + providerId: 'fish', + arenaApiId: 's2-1-pro', + vendorModelId: 's2.1-pro', + name: 's2.1-pro', + registered: false, + }, ]; const registryModels = (): PipelineModel[] => diff --git a/src/pipeline/transports/fish.ts b/src/pipeline/transports/fish.ts new file mode 100644 index 0000000..ac9a57b --- /dev/null +++ b/src/pipeline/transports/fish.ts @@ -0,0 +1,285 @@ +/** + * Fish Audio transport (s2.1-pro). + * + * Synthesis + TTFB ride the same realtime WebSocket: + * `wss://api.fish.audio/v1/tts/live`, msgpack-framed (`@msgpack/msgpack`). + * Protocol per the Fish docs and the in-house Fish CLI reference + * (code/fish/cli/index.js): client sends a `start` frame with the request, + * a `text` frame with the line, then `flush` + `stop`; server replies with + * `audio` frames carrying mp3 bytes and a terminal `finish` frame. The + * Index entry uses `latency: "balanced"` because a local bench (June 2026, + * n=20 per mode) measured balanced ~18 ms faster at the median than + * `latency: "low"`, despite the name. + * + * Clone creation (`createClone`) hits the documented multipart + * `POST https://api.fish.audio/model` (instant cloning, `train_mode: "fast"`, + * private model), per + * https://docs.fish.audio/developer-guide/sdk-guide/cookbook/instant-voice-cloning + * and verified against code/fish/livekit-demo/fish/src/voice_clone.py. The + * returned model id is what the synth `reference_id` points at. + */ +import { readFileSync } from 'node:fs'; +import { basename } from 'node:path'; + +import { decode, encode } from '@msgpack/msgpack'; + +import { requireEnv } from '../env'; +import { BENCH_TEXT, postFormForJson } from './http'; +import { + RateLimitedError, + TransportError, + type ProviderTransport, + type TtfbPlan, + type TtfbTrialResult, +} from './types'; + +const WS_URL = 'wss://api.fish.audio/v1/tts/live'; +const CLONE_URL = 'https://api.fish.audio/model'; +/** Streaming-optimised mode; see the doc comment above for the local bench. */ +const LATENCY: 'balanced' | 'low' | 'normal' = 'balanced'; +const TRIAL_TIMEOUT_MS = 30_000; +const SYNTH_TIMEOUT_MS = 60_000; + +const apiKey = (): string => requireEnv('FISH_API_KEY'); + +type ServerEvent = + | { event: 'audio'; audio: Uint8Array } + | { event: 'finish' } + | { event: 'log'; level?: string; message?: string } + | { event: string; [key: string]: unknown }; + +const decodeFrame = (data: ArrayBuffer): ServerEvent | null => { + try { + const value = decode(new Uint8Array(data)); + if (typeof value !== 'object' || value === null) return null; + return value as ServerEvent; + } catch { + return null; + } +}; + +const openWs = (vendorModelId: string): WebSocket => { + const ws = new WebSocket( + WS_URL, + { + headers: { Authorization: `Bearer ${apiKey()}`, model: vendorModelId }, + } as unknown as string[] | undefined, + ); + ws.binaryType = 'arraybuffer'; + return ws; +}; + +const synthOverWs = ( + vendorModelId: string, + providerVoiceId: string, + text: string, +): Promise => + new Promise((resolve, reject) => { + const ws = openWs(vendorModelId); + const chunks: Uint8Array[] = []; + let settled = false; + const finish = (cb: () => void) => { + if (settled) return; + settled = true; + try { + ws.close(); + } catch { + // already closed + } + cb(); + }; + const timer = setTimeout(() => { + finish(() => + reject(new TransportError('fish ws synth: no finish within timeout')), + ); + }, SYNTH_TIMEOUT_MS); + + ws.onopen = () => { + try { + ws.send( + encode({ + event: 'start', + request: { + text: '', + format: 'mp3', + latency: LATENCY, + reference_id: providerVoiceId, + }, + }), + ); + ws.send(encode({ event: 'text', text: `${text} ` })); + ws.send(encode({ event: 'flush' })); + ws.send(encode({ event: 'stop' })); + } catch (err) { + clearTimeout(timer); + finish(() => reject(err as Error)); + } + }; + + ws.onmessage = (event: MessageEvent) => { + if (!(event.data instanceof ArrayBuffer)) return; + const msg = decodeFrame(event.data); + if (!msg) return; + if (msg.event === 'audio') { + const audio = (msg as { audio?: unknown }).audio; + if (audio instanceof Uint8Array) chunks.push(audio); + return; + } + if (msg.event === 'finish') { + clearTimeout(timer); + if (chunks.length === 0) { + finish(() => reject(new TransportError('fish ws synth: no audio'))); + return; + } + const total = chunks.reduce((n, c) => n + c.length, 0); + const out = new Uint8Array(total); + let off = 0; + for (const c of chunks) { + out.set(c, off); + off += c.length; + } + finish(() => resolve(out)); + } + }; + + ws.onerror = () => { + clearTimeout(timer); + finish(() => reject(new TransportError('fish ws synth: socket error'))); + }; + ws.onclose = (event: CloseEvent) => { + clearTimeout(timer); + finish(() => { + if (chunks.length === 0) { + reject( + new TransportError( + `fish ws synth: closed before finish (code ${event.code} ${event.reason})`.trim(), + ), + ); + } + }); + }; + }); + +/** + * TTFB trial: fresh WS per trial, connectMs covers the handshake before t0, + * t0 right after the `text` frame is buffered (matches the in-house Fish + * CLI's textSentAt), t1 at the first inbound `audio` frame carrying bytes. + * Bench omits `reference_id` so it runs without a clone dependency; once + * the arena Clara clone exists locally, pin it here for parity with the + * other providers' WS benches. + */ +const fishTtfbTrial = (vendorModelId: string): Promise => + new Promise((resolve, reject) => { + const tConnect = performance.now(); + const ws = openWs(vendorModelId); + let t0 = 0; + let connectMs = 0; + let settled = false; + const finish = (cb: () => void) => { + if (settled) return; + settled = true; + try { + ws.close(); + } catch { + // already closed + } + cb(); + }; + const timer = setTimeout(() => { + finish(() => reject(new TransportError('fish ws bench: no audio within timeout'))); + }, TRIAL_TIMEOUT_MS); + + ws.onopen = () => { + connectMs = performance.now() - tConnect; + try { + ws.send( + encode({ + event: 'start', + request: { text: '', format: 'mp3', latency: LATENCY }, + }), + ); + ws.send(encode({ event: 'text', text: `${BENCH_TEXT} ` })); + t0 = performance.now(); + ws.send(encode({ event: 'flush' })); + ws.send(encode({ event: 'stop' })); + } catch (err) { + clearTimeout(timer); + finish(() => reject(err as Error)); + } + }; + + ws.onmessage = (event: MessageEvent) => { + if (!(event.data instanceof ArrayBuffer)) return; + const msg = decodeFrame(event.data); + if (!msg || t0 === 0) return; + if (msg.event === 'audio') { + const audio = (msg as { audio?: unknown }).audio; + if (audio instanceof Uint8Array && audio.length > 0) { + const ttfbMs = performance.now() - t0; + clearTimeout(timer); + finish(() => resolve({ ttfbMs, connectMs })); + } + } + }; + + ws.onerror = () => { + clearTimeout(timer); + finish(() => reject(new TransportError('fish ws bench: socket error'))); + }; + ws.onclose = (event: CloseEvent) => { + clearTimeout(timer); + finish(() => + reject( + new TransportError( + `fish ws bench: closed before audio (code ${event.code} ${event.reason})`.trim(), + ), + ), + ); + }; + }); + +export const fish: ProviderTransport = { + providerId: 'fish', + apiKeyEnv: 'FISH_API_KEY', + + synthesize: async ({ vendorModelId, providerVoiceId, text }) => ({ + bytes: await synthOverWs(vendorModelId, providerVoiceId, text), + format: 'mp3' as const, + }), + + createClone: async ({ displayName, sampleFiles }) => { + const form = new FormData(); + form.append('type', 'tts'); + form.append('title', displayName); + form.append('train_mode', 'fast'); + form.append('visibility', 'private'); + form.append('enhance_audio_quality', 'true'); + for (const file of sampleFiles) { + form.append( + 'voices', + new Blob([readFileSync(file)], { type: 'audio/wav' }), + basename(file), + ); + } + const result = await postFormForJson<{ _id: string }>( + CLONE_URL, + { Authorization: `Bearer ${apiKey()}` }, + form, + 'fish create-model', + ); + return result._id; + }, + + ttfbPlanFor: (vendorModelId): TtfbPlan => ({ + transport: 'websocket', + notes: `wss://api.fish.audio/v1/tts/live (msgpack frames: start/text/flush/stop; audio/finish back), model=${vendorModelId}, latency=${LATENCY}, mp3 chunks, no reference_id (stock voice for the bench)`, + trial: async (): Promise => { + try { + return await fishTtfbTrial(vendorModelId); + } catch (error) { + if (error instanceof RateLimitedError) throw error; + return fishTtfbTrial(vendorModelId); + } + }, + }), +}; diff --git a/src/pipeline/transports/index.ts b/src/pipeline/transports/index.ts index 27547b5..8815b0b 100644 --- a/src/pipeline/transports/index.ts +++ b/src/pipeline/transports/index.ts @@ -8,6 +8,7 @@ */ import { cartesia } from './cartesia'; import { elevenlabs } from './elevenlabs'; +import { fish } from './fish'; import { hume } from './hume'; import { inworld } from './inworld'; import { minimax } from './minimax'; @@ -29,6 +30,7 @@ export const TRANSPORTS: Record = { hume, sesame, xai, + fish, }; /** Why the remaining arena providers cannot be measured/generated live. */