Audio Streaming
Long-form playback via factory + ffmpeg. Pattern from audiobooks and radio.
This is the framework's most powerful primitive. It's also where most skill-author confusion lives — closures, factories, interrupts, bookmarks, ffmpeg subprocess management.
This recipe lifts the pattern from the audiobooks skill and explains every choice.
The shape
import asyncio
import json
from huxley_sdk import (
Skill, ToolDefinition, ToolResult, SkillContext, AudioStream, ContentType,
)
class AudiobooksSkill:
name = "audiobooks"
@property
def tools(self) -> list[ToolDefinition]:
return [
ToolDefinition(
name="search_audiobooks",
description="Search the user's library by title or author.",
parameters={
"type": "object",
"properties": {"query": {"type": "string"}},
"required": ["query"],
},
),
ToolDefinition(
name="play_audiobook",
description="Start playing an audiobook by ID. Use AFTER search_audiobooks.",
parameters={
"type": "object",
"properties": {
"book_id": {"type": "string"},
},
"required": ["book_id"],
},
),
ToolDefinition(
name="audiobook_control",
description="Pause, resume, stop, skip, change speed.",
parameters={
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["pause", "resume", "stop", "forward", "backward"],
},
},
"required": ["action"],
},
),
]
async def setup(self, ctx: SkillContext) -> None:
self._ctx = ctx
self._library = await self._scan_library(ctx.persona_data_dir / ctx.config["library"])
self._sounds = load_pcm_palette(
ctx.persona_data_dir / ctx.config.get("sounds_path", "sounds"),
roles=["book_start", "book_end"],
)
self._catalog = ctx.catalog()
for book in self._library.values():
await self._catalog.upsert(
id=book.id,
fields={"title": book.title, "author": book.author},
payload={"path": str(book.path)},
)
async def handle(self, tool_name: str, args: dict) -> ToolResult:
if tool_name == "search_audiobooks":
hits = await self._catalog.search(args["query"], limit=5)
return ToolResult(output=json.dumps({
"matches": [{"id": h.id, "title": h.fields["title"]} for h in hits],
}))
if tool_name == "play_audiobook":
book = self._library[args["book_id"]]
saved_position = await self._get_saved_position(book.id)
factory = self._build_factory(book.id, book.path, saved_position)
return ToolResult(
output=json.dumps({"playing": book.title}),
side_effect=AudioStream(
factory=factory,
label=f"📖 {book.title}",
on_complete_prompt=self._on_complete_prompt(),
completion_silence_ms=500,
content_type=ContentType.NONMIXABLE,
patience=timedelta(seconds=5),
on_patience_expired=self._on_patience_expired,
),
)
# ... audiobook_controlThe factory
def _build_factory(self, book_id: str, path: Path, start_position: float):
"""Build an audio factory for a book, starting at start_position seconds.
Closure captures start_position. If user asks to skip, a new factory
is built with new start. Old factory cancelled, new factory runs.
"""
BYTES_PER_SECOND = 24000 * 2 # 24kHz, 16-bit, mono
async def stream():
bytes_read = 0
completed = False
try:
# Earcon at start.
if "book_start" in self._sounds:
yield self._sounds["book_start"]
# Real content via ffmpeg.
async for chunk in self._ffmpeg_stream(path, start_position):
bytes_read += len(chunk)
yield chunk
completed = True
# Earcon at natural completion.
if "book_end" in self._sounds:
yield self._sounds["book_end"]
finally:
elapsed_seconds = bytes_read / BYTES_PER_SECOND
final_position = 0.0 if completed else start_position + elapsed_seconds
await self._save_position(book_id, final_position)
await self._ctx.logger.ainfo(
"stream_ended",
book_id=book_id,
completed=completed,
final_position=final_position,
)
return streamClosure-captured position
Notice start_position is captured by the inner async def stream. The skill's instance state never holds "current position" — the position lives only in the closure.
Why? Because the user might say "skip ahead 30 seconds." The model calls audiobook_control(action="forward"). Your handler computes the new position, builds a new factory pointing at it, returns it as a side effect. The framework cancels the running stream and runs the new one. No race, no stale state on self.
If you stored "current position" eagerly on self, the cancellation would race the new factory's start, and you'd get a bookmark from somewhere weird.
try / finally for the bookmark
The finally runs whether:
- The stream completed naturally (
completed=True, save position 0 = "at the beginning"). - The stream was cancelled mid-playback (
completed=False, save where we are). - An exception was raised (
completed=False, same).
This makes the bookmark always correct. There's no "I forgot to save it" failure mode.
ffmpeg as the audio source
The audiobook files are .m4b, .mp3, .ogg — anything ffmpeg can read. We need PCM16 24kHz mono.
async def _ffmpeg_stream(self, path: Path, start_position: float):
cmd = [
"ffmpeg",
"-ss", str(start_position),
"-i", str(path),
"-f", "s16le",
"-ar", "24000",
"-ac", "1",
"-loglevel", "error",
"pipe:1",
]
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
while True:
chunk = await proc.stdout.read(4800) # 100ms at 24kHz mono
if not chunk:
break
yield chunk
finally:
if proc.returncode is None:
proc.kill()
await proc.wait()Three things:
-ss start_positionseeks to the right position before decoding. ffmpeg is fast at this for most formats.- PCM16, 24kHz, mono — the framework's required format. Specify
s16le,-ar 24000,-ac 1. - Kill on cancellation. The outer
try / finallyin the factory propagatesCancelledErrorthrough the generator; the innertry / finallyhere kills ffmpeg before the cancellation completes. Without this, ffmpeg subprocesses leak.
Patience: graceful preemption
AudioStream(
factory=factory,
patience=timedelta(seconds=5),
on_patience_expired=self._on_patience_expired,
...
)
async def _on_patience_expired(self):
"""Fired when a higher-priority claim is about to evict us. Narrate
'Lo dejé donde íbamos' before yielding."""
asyncio.create_task(self._ctx.inject_turn(
"Lo dejé donde íbamos. Continúo cuando termine.",
priority=InjectPriority.BLOCK_BEHIND_COMMS,
))When a phone call comes in (COMMS channel, priority 150), our CONTENT-channel book (priority 300) gets a 5-second patience window. The framework waits up to 5 seconds for us to wrap up gracefully. Our on_patience_expired callback narrates a transition; then the book yields.
The asyncio.create_task is critical — on_patience_expired fires inside the FocusManager's serialized actor. await ctx.inject_turn from there would deadlock. Schedule, don't await.
What you'd skip
For a simpler streamer (background music, podcast):
- No
on_patience_expiredcallback (just let it pause without narration). content_type=ContentType.MIXABLEfor music (it ducks instead of pausing).- No bookmark (radio doesn't resume — it's live).
- No earcons (or simpler ones).
The radio skill is a good example of a stripped-down version. Read server/skills/radio/.