huxley
Cookbook

Persistent State

ctx.storage, ctx.logger, ctx.background_task — the trio every real skill uses.

Three SDK primitives show up in every skill that does real work: ctx.storage (persistence), ctx.logger (observability), ctx.background_task (long-lived async work). This recipe shows how to use them well.

ctx.storage — per-skill key-value store

Each skill has its own namespace inside the persona's SQLite database. Read, write, list, delete:

# Write
await ctx.storage.set_setting("last_book_id", "el_principito")

# Read with default
last = await ctx.storage.get_setting("last_book_id", default=None)

# List by prefix (great for restoring state on setup)
for key, value in await ctx.storage.list_settings("timer:"):
    entry = json.loads(value)
    self._restore(entry)

# Delete
await ctx.storage.delete_setting("timer:abc123")

Values are strings. To store structured data, JSON-encode it on write and decode on read:

await ctx.storage.set_setting(
    f"book:{book_id}",
    json.dumps({"position": 1840.5, "last_played": now.isoformat()}),
)

raw = await ctx.storage.get_setting(f"book:{book_id}")
state = json.loads(raw) if raw else None

Patterns

Composite keys. Use prefix:identifier so you can list_settings(prefix) to scan:

"timer:abc123"           # prefix=timer
"book:el_principito"     # prefix=book
"contact:maria"          # prefix=contact

Don't store secrets. The SQLite database is on disk, not encrypted. API keys, passwords, OAuth tokens — read those from environment variables, not storage.

Don't store binary blobs. Storage is for small key-value state. Big files (audiobooks, images, audio caches) go on the filesystem under ctx.persona_data_dir.

Storage is per-persona. AbuelOS and BasicOS have separate databases. Switching personas doesn't share state.

Restore on setup

Almost every skill that persists state restores on setup:

async def setup(self, ctx: SkillContext) -> None:
    self._ctx = ctx
    for key, value in await ctx.storage.list_settings("timer:"):
        try:
            entry = json.loads(value)
            self._restore(key, entry)
        except (json.JSONDecodeError, KeyError) as e:
            await ctx.logger.aexception("restore_failed", key=key, error=str(e))
            await ctx.storage.delete_setting(key)  # corrupt entries get nuked

The try / except matters: if you ship a v2 of your skill that changes the storage format, old entries should be cleaned up rather than crashing the skill on restart.

ctx.logger — structured logs

Don't use print. Don't use the standard logging module. Use ctx.logger. It:

  • Auto-namespaces events under your skill name (audiobooks.stream_started).
  • Auto-injects the current turn ID (turn=t-7f3a).
  • Emits structured JSONL or pretty depending on HUXLEY_LOG_JSON.
  • Integrates with the framework's observability conventions.
await ctx.logger.ainfo("event_name", key1=value1, key2=value2)
await ctx.logger.adebug("debug_event", detail="lots of detail")
await ctx.logger.awarning("something_questionable", reason="see below")
await ctx.logger.aerror("explicit_error", what_failed="...")
await ctx.logger.aexception("crashed", error=str(e))  # captures traceback

Naming events

Convention: verb_noun or noun_verb_past:

await ctx.logger.ainfo("stream_started", book_id=...)
await ctx.logger.ainfo("fetching_news", url=...)
await ctx.logger.ainfo("fetched_news", count=10, ms=234)
await ctx.logger.ainfo("scheduled", timer_id=..., fire_at=...)
await ctx.logger.ainfo("fired", timer_id=...)

The combination of skill name (auto) + event name + turn ID (auto) + your fields makes events queryable:

# Find every audiobook playback in the last hour
grep '"event":"stream_started"' logs/server.jsonl | grep audiobooks

What to log

A useful heuristic: if this skill misbehaves in production, what would I need to see?

  • Setup completion. "I loaded the library, here's how many books."
  • Every tool dispatch. What tool, what args (sanitized).
  • Every external call. Before (URL, request) and after (status, duration).
  • Every state transition. "started_workout", "stopped_workout".
  • Every error path. Always log before raising, returning errors, or falling back.
  • Every proactive trigger. "timer fired", "notification flushed".

What not to log:

  • Per-frame audio data (way too much).
  • Personal information beyond what the user already said (don't log payment details, addresses).
  • API keys or tokens (ever).

Reading logs

In pretty format (default in dev):

[2026-04-29 02:45:12.341] audiobooks.stream_started turn=t-7f3a book_id=el_principito start_position=1840.5

In jsonl format (production):

{"ts":"2026-04-29T02:45:12.341Z","level":"info","event":"stream_started","skill":"audiobooks","turn":"t-7f3a","book_id":"el_principito","start_position":1840.5}

Use jq for structured queries:

# All errors in the last hour
tail logs/server.jsonl | jq 'select(.level == "error" and (.ts | fromdate) > (now - 3600))'

# Reconstruct a single turn
tail logs/server.jsonl | jq 'select(.turn == "t-7f3a")'

# Average tool dispatch duration per skill
tail logs/server.jsonl | jq -s 'group_by(.skill) | map({skill: .[0].skill, avg_ms: (map(.duration_ms // 0) | add / length)})'

ctx.background_task — supervised async work

For anything that runs longer than a single tool dispatch — listeners, schedulers, polling loops — use background_task:

self._listener = ctx.background_task(
    name="telegram_listener",
    coro_factory=self._listen,
    restart_on_crash=True,
    max_restarts_per_hour=10,
    on_permanent_failure=self._on_listener_dead,
)

The framework supervises:

  • If the coroutine raises, the framework logs and restarts (if restart_on_crash=True).
  • After max_restarts_per_hour crashes, it stops restarting and calls on_permanent_failure.
  • On skill teardown, the framework cancels the task gracefully.

Common shapes

Long-lived listener (websocket subscriber, MQTT client):

async def _listen(self):
    async with self._client.connect() as ws:
        async for event in ws:
            await self._on_event(event)

self._handle = ctx.background_task(
    name="my_listener",
    coro_factory=self._listen,
    restart_on_crash=True,
    max_restarts_per_hour=10,
)

One-shot waiter (a single timer):

async def fire():
    await asyncio.sleep(delay)
    await self._ctx.inject_turn(message)

ctx.background_task(
    name=f"timer_{id}",
    coro_factory=fire,
    restart_on_crash=False,  # one-shot
)

Polling loop (periodic refresh):

async def _poll(self):
    while True:
        await self._refresh_cache()
        await asyncio.sleep(300)

ctx.background_task(
    name="cache_refresh",
    coro_factory=self._poll,
    restart_on_crash=True,
)

Permanent failure callback

When the supervisor gives up, on_permanent_failure fires once. The right move is usually to inject a turn so the user knows:

async def _on_listener_dead(self, failure):
    await self._ctx.logger.aerror(
        "permanent_failure",
        task=failure.name,
        restarts=failure.restart_count,
        last_error=failure.last_exception_message,
    )
    # Schedule (don't await — could deadlock the supervisor's actor)
    asyncio.create_task(self._ctx.inject_turn(
        "The Telegram connection failed permanently. "
        "Tell the user to check the server.",
        priority=InjectPriority.NORMAL,
    ))

asyncio.create_task because on_permanent_failure may fire from inside a serialized actor; awaiting inject_turn directly would deadlock.

Combined: a tiny but real skill

Here's the trio working together:

huxley_skill_doorbell/__init__.py
import asyncio
import json
from huxley_sdk import Skill, ToolDefinition, ToolResult, SkillContext, InjectPriority


class DoorbellSkill:
    name = "doorbell"
    tools = []  # pure proactive

    async def setup(self, ctx: SkillContext) -> None:
        self._ctx = ctx
        self._logger = ctx.logger

        # Restore the visitor count we keep across restarts.
        raw = await ctx.storage.get_setting("visit_count", "0")
        self._visit_count = int(raw)
        await self._logger.ainfo("setup", visit_count=self._visit_count)

        # Spawn a supervised listener for doorbell events.
        ctx.background_task(
            name="doorbell_listener",
            coro_factory=self._listen,
            restart_on_crash=True,
            max_restarts_per_hour=10,
        )

    async def _listen(self):
        async for ring in self._doorbell_events():
            await self._on_ring(ring)

    async def _on_ring(self, ring):
        self._visit_count += 1
        await self._ctx.storage.set_setting("visit_count", str(self._visit_count))
        await self._logger.ainfo("ring", visit_count=self._visit_count)

        await self._ctx.inject_turn(
            f"Tell the user: someone's at the door.",
            dedup_key=f"ring_{ring.timestamp}",
            priority=InjectPriority.PREEMPT,
        )

Three primitives. ~40 lines. Real, working, tested-in-production-shaped skill.

Next

On this page