Notifications
Bursty inbound events with debounce + coalesce. Pattern from the telegram skill.
A naive notification skill — "ping me whenever a Telegram message arrives" — would be unbearable. Five messages in 10 seconds would be five separate proactive turns, talking over each other, interrupting whatever you're doing.
This recipe is the debounce + coalesce pattern that makes proactive skills livable. The shipped Telegram skill uses it; same shape works for any bursty event source.
The shape
import asyncio
from collections import defaultdict
from datetime import datetime, timedelta
from huxley_sdk import Skill, SkillContext, InjectPriority
class NotificationsSkill:
name = "notifications"
def __init__(self) -> None:
# Per-sender pending messages, with the time the burst started.
self._buffers: dict[str, dict] = defaultdict(
lambda: {"messages": [], "started_at": None, "task": None}
)
self._debounce_seconds = 5.0 # wait this long after last message
self._lock = asyncio.Lock()
@property
def tools(self):
return [] # no user-facing tools; pure proactive
async def setup(self, ctx: SkillContext) -> None:
self._ctx = ctx
self._logger = ctx.logger
self._listener_handle = ctx.background_task(
name="notifications_listener",
coro_factory=self._listen,
restart_on_crash=True,
max_restarts_per_hour=10,
)
async def _listen(self):
"""Long-lived listener. Replace with real event source."""
async for event in self._event_source():
await self._on_event(event)
async def _on_event(self, event):
sender = event["sender"]
text = event["text"]
async with self._lock:
buf = self._buffers[sender]
buf["messages"].append(text)
if buf["started_at"] is None:
buf["started_at"] = datetime.now()
await self._logger.ainfo(
"burst_started",
sender=sender,
)
else:
await self._logger.adebug(
"appended_to_burst",
sender=sender,
burst_size=len(buf["messages"]),
)
# Cancel any existing flush task and reschedule.
if buf["task"] and not buf["task"].done():
buf["task"].cancel()
buf["task"] = asyncio.create_task(
self._flush_after_debounce(sender)
)
async def _flush_after_debounce(self, sender: str):
try:
await asyncio.sleep(self._debounce_seconds)
except asyncio.CancelledError:
return # a new message arrived; we get rescheduled
async with self._lock:
buf = self._buffers[sender]
messages = buf["messages"]
buf["messages"] = []
buf["started_at"] = None
buf["task"] = None
if not messages:
return
await self._logger.ainfo(
"flushing", sender=sender, count=len(messages),
)
# Coalesce: one inject for the whole burst.
if len(messages) == 1:
prompt = f"Tell the user: {sender} sent: {messages[0]}"
else:
joined = " | ".join(messages)
prompt = (
f"Tell the user: {sender} sent {len(messages)} messages. "
f"Read them out: {joined}"
)
await self._ctx.inject_turn(
prompt,
dedup_key=f"notif_{sender}_{datetime.now().isoformat()}",
priority=InjectPriority.BLOCK_BEHIND_COMMS,
)
async def teardown(self) -> None:
# Cancel pending flush tasks gracefully.
for buf in self._buffers.values():
if buf["task"]:
buf["task"].cancel()What's happening
Each event gets buffered per-sender
If María sends 3 messages and Juan sends 1, that's 2 buffers. Each tracks pending messages.
A flush task is scheduled, with debounce
Each buffer has at most one flush task. When a new message arrives, the old task is cancelled and a new one is scheduled with the same debounce_seconds delay. The clock resets every time a new message arrives.
After silence, flush
If no new message arrives within debounce_seconds, the task fires. It coalesces all pending messages into a single inject_turn.
Coalesce intelligently
For 1 message: "María sent: <text>".
For 3 messages: "María sent 3 messages. Read them out: a | b | c".
The model uses the persona's voice and language to narrate. The skill just hands it the data and an instruction.
Why both debounce and dedup_key
dedup_key only deduplicates queued injects. If the first inject already fired and a duplicate arrives a millisecond later, the duplicate fires too.
For bursty senders, that's not enough. We need application-level coalescing — combine N events into one narrated message. The buffer + debounce pattern does this; dedup_key is icing.
Backfill on connect
When the skill starts (or reconnects after a crash), there might be unread messages from the offline period. Don't flood the user with all of them — backfill carefully:
async def setup(self, ctx: SkillContext) -> None:
self._ctx = ctx
self._logger = ctx.logger
# Fetch unread from the last 6 hours.
unread = await self._fetch_unread(since=datetime.now() - timedelta(hours=6))
# Group by sender.
by_sender = defaultdict(list)
for msg in unread:
by_sender[msg.sender].append(msg.text)
# One inject per sender, summarizing.
for sender, msgs in by_sender.items():
if len(msgs) == 1:
prompt = f"Mientras estabas fuera, {sender} envió: {msgs[0]}"
else:
prompt = (
f"Mientras estabas fuera, {sender} envió {len(msgs)} mensajes. "
f"Léelos: {' | '.join(msgs)}"
)
await self._ctx.inject_turn(
prompt,
dedup_key=f"backfill_{sender}_{datetime.now().date()}",
priority=InjectPriority.NORMAL,
)The NORMAL priority means the backfill drains when the user is not in the middle of something. Less intrusive than BLOCK_BEHIND_COMMS.
When to inject vs queue
Three priority tiers, three scenarios:
| Scenario | Priority |
|---|---|
| New message from anyone — minor | NORMAL |
| New message from a starred contact | BLOCK_BEHIND_COMMS |
| Emergency contact ("ICE") | PREEMPT |
| Backfill on reconnect | NORMAL |
Think about what would the user want interrupted for this. Most messages: nothing. Important contacts: yes, but not phone calls. ICE: yes, even phone calls.
Inbound calls
Telegram supports voice calls. The pattern there is different: inject_turn_and_wait to announce, then start_input_claim to bridge audio:
async def _on_inbound_call(self, caller_name: str, peer_id: str):
# Step 1: Announce. Block until announcement finishes.
await self._ctx.inject_turn_and_wait(
f"Llamada entrante de {caller_name}, contestando.",
priority=InjectPriority.BLOCK_BEHIND_COMMS,
)
# Step 2: Bridge audio.
claim = InputClaim(
on_mic_frame=self._forward_to_peer(peer_id),
speaker_source=self._stream_from_peer(peer_id),
on_claim_end=self._on_call_end,
title=caller_name,
)
try:
handle = await self._ctx.start_input_claim(claim)
except ClaimBusyError:
# Another call is active.
await self._ctx.logger.awarning("call_blocked", reason="comms_busy")
returnKey: announce first (and wait), then bridge. If you bridge first, the user hears the caller's audio before the announcement.