huxley
Cookbook

Notifications

Bursty inbound events with debounce + coalesce. Pattern from the telegram skill.

A naive notification skill — "ping me whenever a Telegram message arrives" — would be unbearable. Five messages in 10 seconds would be five separate proactive turns, talking over each other, interrupting whatever you're doing.

This recipe is the debounce + coalesce pattern that makes proactive skills livable. The shipped Telegram skill uses it; same shape works for any bursty event source.

The shape

huxley_skill_notifications/__init__.py
import asyncio
from collections import defaultdict
from datetime import datetime, timedelta
from huxley_sdk import Skill, SkillContext, InjectPriority


class NotificationsSkill:
    name = "notifications"

    def __init__(self) -> None:
        # Per-sender pending messages, with the time the burst started.
        self._buffers: dict[str, dict] = defaultdict(
            lambda: {"messages": [], "started_at": None, "task": None}
        )
        self._debounce_seconds = 5.0  # wait this long after last message
        self._lock = asyncio.Lock()

    @property
    def tools(self):
        return []  # no user-facing tools; pure proactive

    async def setup(self, ctx: SkillContext) -> None:
        self._ctx = ctx
        self._logger = ctx.logger
        self._listener_handle = ctx.background_task(
            name="notifications_listener",
            coro_factory=self._listen,
            restart_on_crash=True,
            max_restarts_per_hour=10,
        )

    async def _listen(self):
        """Long-lived listener. Replace with real event source."""
        async for event in self._event_source():
            await self._on_event(event)

    async def _on_event(self, event):
        sender = event["sender"]
        text = event["text"]

        async with self._lock:
            buf = self._buffers[sender]
            buf["messages"].append(text)
            if buf["started_at"] is None:
                buf["started_at"] = datetime.now()
                await self._logger.ainfo(
                    "burst_started",
                    sender=sender,
                )
            else:
                await self._logger.adebug(
                    "appended_to_burst",
                    sender=sender,
                    burst_size=len(buf["messages"]),
                )

            # Cancel any existing flush task and reschedule.
            if buf["task"] and not buf["task"].done():
                buf["task"].cancel()
            buf["task"] = asyncio.create_task(
                self._flush_after_debounce(sender)
            )

    async def _flush_after_debounce(self, sender: str):
        try:
            await asyncio.sleep(self._debounce_seconds)
        except asyncio.CancelledError:
            return  # a new message arrived; we get rescheduled

        async with self._lock:
            buf = self._buffers[sender]
            messages = buf["messages"]
            buf["messages"] = []
            buf["started_at"] = None
            buf["task"] = None

        if not messages:
            return

        await self._logger.ainfo(
            "flushing", sender=sender, count=len(messages),
        )

        # Coalesce: one inject for the whole burst.
        if len(messages) == 1:
            prompt = f"Tell the user: {sender} sent: {messages[0]}"
        else:
            joined = " | ".join(messages)
            prompt = (
                f"Tell the user: {sender} sent {len(messages)} messages. "
                f"Read them out: {joined}"
            )

        await self._ctx.inject_turn(
            prompt,
            dedup_key=f"notif_{sender}_{datetime.now().isoformat()}",
            priority=InjectPriority.BLOCK_BEHIND_COMMS,
        )

    async def teardown(self) -> None:
        # Cancel pending flush tasks gracefully.
        for buf in self._buffers.values():
            if buf["task"]:
                buf["task"].cancel()

What's happening

Each event gets buffered per-sender

If María sends 3 messages and Juan sends 1, that's 2 buffers. Each tracks pending messages.

A flush task is scheduled, with debounce

Each buffer has at most one flush task. When a new message arrives, the old task is cancelled and a new one is scheduled with the same debounce_seconds delay. The clock resets every time a new message arrives.

After silence, flush

If no new message arrives within debounce_seconds, the task fires. It coalesces all pending messages into a single inject_turn.

Coalesce intelligently

For 1 message: "María sent: <text>".

For 3 messages: "María sent 3 messages. Read them out: a | b | c".

The model uses the persona's voice and language to narrate. The skill just hands it the data and an instruction.

Why both debounce and dedup_key

dedup_key only deduplicates queued injects. If the first inject already fired and a duplicate arrives a millisecond later, the duplicate fires too.

For bursty senders, that's not enough. We need application-level coalescing — combine N events into one narrated message. The buffer + debounce pattern does this; dedup_key is icing.

Backfill on connect

When the skill starts (or reconnects after a crash), there might be unread messages from the offline period. Don't flood the user with all of them — backfill carefully:

async def setup(self, ctx: SkillContext) -> None:
    self._ctx = ctx
    self._logger = ctx.logger

    # Fetch unread from the last 6 hours.
    unread = await self._fetch_unread(since=datetime.now() - timedelta(hours=6))

    # Group by sender.
    by_sender = defaultdict(list)
    for msg in unread:
        by_sender[msg.sender].append(msg.text)

    # One inject per sender, summarizing.
    for sender, msgs in by_sender.items():
        if len(msgs) == 1:
            prompt = f"Mientras estabas fuera, {sender} envió: {msgs[0]}"
        else:
            prompt = (
                f"Mientras estabas fuera, {sender} envió {len(msgs)} mensajes. "
                f"Léelos: {' | '.join(msgs)}"
            )
        await self._ctx.inject_turn(
            prompt,
            dedup_key=f"backfill_{sender}_{datetime.now().date()}",
            priority=InjectPriority.NORMAL,
        )

The NORMAL priority means the backfill drains when the user is not in the middle of something. Less intrusive than BLOCK_BEHIND_COMMS.

When to inject vs queue

Three priority tiers, three scenarios:

ScenarioPriority
New message from anyone — minorNORMAL
New message from a starred contactBLOCK_BEHIND_COMMS
Emergency contact ("ICE")PREEMPT
Backfill on reconnectNORMAL

Think about what would the user want interrupted for this. Most messages: nothing. Important contacts: yes, but not phone calls. ICE: yes, even phone calls.

Inbound calls

Telegram supports voice calls. The pattern there is different: inject_turn_and_wait to announce, then start_input_claim to bridge audio:

async def _on_inbound_call(self, caller_name: str, peer_id: str):
    # Step 1: Announce. Block until announcement finishes.
    await self._ctx.inject_turn_and_wait(
        f"Llamada entrante de {caller_name}, contestando.",
        priority=InjectPriority.BLOCK_BEHIND_COMMS,
    )

    # Step 2: Bridge audio.
    claim = InputClaim(
        on_mic_frame=self._forward_to_peer(peer_id),
        speaker_source=self._stream_from_peer(peer_id),
        on_claim_end=self._on_call_end,
        title=caller_name,
    )
    try:
        handle = await self._ctx.start_input_claim(claim)
    except ClaimBusyError:
        # Another call is active.
        await self._ctx.logger.awarning("call_blocked", reason="comms_busy")
        return

Key: announce first (and wait), then bridge. If you bridge first, the user hears the caller's audio before the announcement.

Pitfalls

Next

On this page