huxley
Cookbook

Audio Streaming

Long-form playback via factory + ffmpeg. Pattern from audiobooks and radio.

This is the framework's most powerful primitive. It's also where most skill-author confusion lives — closures, factories, interrupts, bookmarks, ffmpeg subprocess management.

This recipe lifts the pattern from the audiobooks skill and explains every choice.

The shape

huxley_skill_audiobooks/__init__.py (excerpt)
import asyncio
import json
from huxley_sdk import (
    Skill, ToolDefinition, ToolResult, SkillContext, AudioStream, ContentType,
)


class AudiobooksSkill:
    name = "audiobooks"

    @property
    def tools(self) -> list[ToolDefinition]:
        return [
            ToolDefinition(
                name="search_audiobooks",
                description="Search the user's library by title or author.",
                parameters={
                    "type": "object",
                    "properties": {"query": {"type": "string"}},
                    "required": ["query"],
                },
            ),
            ToolDefinition(
                name="play_audiobook",
                description="Start playing an audiobook by ID. Use AFTER search_audiobooks.",
                parameters={
                    "type": "object",
                    "properties": {
                        "book_id": {"type": "string"},
                    },
                    "required": ["book_id"],
                },
            ),
            ToolDefinition(
                name="audiobook_control",
                description="Pause, resume, stop, skip, change speed.",
                parameters={
                    "type": "object",
                    "properties": {
                        "action": {
                            "type": "string",
                            "enum": ["pause", "resume", "stop", "forward", "backward"],
                        },
                    },
                    "required": ["action"],
                },
            ),
        ]

    async def setup(self, ctx: SkillContext) -> None:
        self._ctx = ctx
        self._library = await self._scan_library(ctx.persona_data_dir / ctx.config["library"])
        self._sounds = load_pcm_palette(
            ctx.persona_data_dir / ctx.config.get("sounds_path", "sounds"),
            roles=["book_start", "book_end"],
        )
        self._catalog = ctx.catalog()
        for book in self._library.values():
            await self._catalog.upsert(
                id=book.id,
                fields={"title": book.title, "author": book.author},
                payload={"path": str(book.path)},
            )

    async def handle(self, tool_name: str, args: dict) -> ToolResult:
        if tool_name == "search_audiobooks":
            hits = await self._catalog.search(args["query"], limit=5)
            return ToolResult(output=json.dumps({
                "matches": [{"id": h.id, "title": h.fields["title"]} for h in hits],
            }))

        if tool_name == "play_audiobook":
            book = self._library[args["book_id"]]
            saved_position = await self._get_saved_position(book.id)
            factory = self._build_factory(book.id, book.path, saved_position)
            return ToolResult(
                output=json.dumps({"playing": book.title}),
                side_effect=AudioStream(
                    factory=factory,
                    label=f"📖 {book.title}",
                    on_complete_prompt=self._on_complete_prompt(),
                    completion_silence_ms=500,
                    content_type=ContentType.NONMIXABLE,
                    patience=timedelta(seconds=5),
                    on_patience_expired=self._on_patience_expired,
                ),
            )

        # ... audiobook_control

The factory

def _build_factory(self, book_id: str, path: Path, start_position: float):
    """Build an audio factory for a book, starting at start_position seconds.

    Closure captures start_position. If user asks to skip, a new factory
    is built with new start. Old factory cancelled, new factory runs.
    """
    BYTES_PER_SECOND = 24000 * 2  # 24kHz, 16-bit, mono

    async def stream():
        bytes_read = 0
        completed = False
        try:
            # Earcon at start.
            if "book_start" in self._sounds:
                yield self._sounds["book_start"]

            # Real content via ffmpeg.
            async for chunk in self._ffmpeg_stream(path, start_position):
                bytes_read += len(chunk)
                yield chunk
            completed = True

            # Earcon at natural completion.
            if "book_end" in self._sounds:
                yield self._sounds["book_end"]
        finally:
            elapsed_seconds = bytes_read / BYTES_PER_SECOND
            final_position = 0.0 if completed else start_position + elapsed_seconds
            await self._save_position(book_id, final_position)
            await self._ctx.logger.ainfo(
                "stream_ended",
                book_id=book_id,
                completed=completed,
                final_position=final_position,
            )

    return stream

Closure-captured position

Notice start_position is captured by the inner async def stream. The skill's instance state never holds "current position" — the position lives only in the closure.

Why? Because the user might say "skip ahead 30 seconds." The model calls audiobook_control(action="forward"). Your handler computes the new position, builds a new factory pointing at it, returns it as a side effect. The framework cancels the running stream and runs the new one. No race, no stale state on self.

If you stored "current position" eagerly on self, the cancellation would race the new factory's start, and you'd get a bookmark from somewhere weird.

try / finally for the bookmark

The finally runs whether:

  • The stream completed naturally (completed=True, save position 0 = "at the beginning").
  • The stream was cancelled mid-playback (completed=False, save where we are).
  • An exception was raised (completed=False, same).

This makes the bookmark always correct. There's no "I forgot to save it" failure mode.

ffmpeg as the audio source

The audiobook files are .m4b, .mp3, .ogg — anything ffmpeg can read. We need PCM16 24kHz mono.

async def _ffmpeg_stream(self, path: Path, start_position: float):
    cmd = [
        "ffmpeg",
        "-ss", str(start_position),
        "-i", str(path),
        "-f", "s16le",
        "-ar", "24000",
        "-ac", "1",
        "-loglevel", "error",
        "pipe:1",
    ]
    proc = await asyncio.create_subprocess_exec(
        *cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    try:
        while True:
            chunk = await proc.stdout.read(4800)  # 100ms at 24kHz mono
            if not chunk:
                break
            yield chunk
    finally:
        if proc.returncode is None:
            proc.kill()
            await proc.wait()

Three things:

  1. -ss start_position seeks to the right position before decoding. ffmpeg is fast at this for most formats.
  2. PCM16, 24kHz, mono — the framework's required format. Specify s16le, -ar 24000, -ac 1.
  3. Kill on cancellation. The outer try / finally in the factory propagates CancelledError through the generator; the inner try / finally here kills ffmpeg before the cancellation completes. Without this, ffmpeg subprocesses leak.

Patience: graceful preemption

AudioStream(
    factory=factory,
    patience=timedelta(seconds=5),
    on_patience_expired=self._on_patience_expired,
    ...
)

async def _on_patience_expired(self):
    """Fired when a higher-priority claim is about to evict us. Narrate
    'Lo dejé donde íbamos' before yielding."""
    asyncio.create_task(self._ctx.inject_turn(
        "Lo dejé donde íbamos. Continúo cuando termine.",
        priority=InjectPriority.BLOCK_BEHIND_COMMS,
    ))

When a phone call comes in (COMMS channel, priority 150), our CONTENT-channel book (priority 300) gets a 5-second patience window. The framework waits up to 5 seconds for us to wrap up gracefully. Our on_patience_expired callback narrates a transition; then the book yields.

The asyncio.create_task is critical — on_patience_expired fires inside the FocusManager's serialized actor. await ctx.inject_turn from there would deadlock. Schedule, don't await.

What you'd skip

For a simpler streamer (background music, podcast):

  • No on_patience_expired callback (just let it pause without narration).
  • content_type=ContentType.MIXABLE for music (it ducks instead of pausing).
  • No bookmark (radio doesn't resume — it's live).
  • No earcons (or simpler ones).

The radio skill is a good example of a stripped-down version. Read server/skills/radio/.

Common pitfalls

Next

On this page