Proactive Skills
Skills that speak first — timers, alerts, notifications. The three priority tiers and how to use them.
So far every skill has been reactive: the user speaks, the model decides to call your tool, you return a result. Some skills need to be proactive — they speak first, when something happens in the world: a timer fires, a Telegram message arrives, the laundry is done, a reminder is due.
This is what ctx.inject_turn is for.
What inject_turn does
Calling await ctx.inject_turn(prompt, ...) injects a new turn into the framework's coordinator, with the persona's model as the speaker. From the model's perspective, it's like the user just said something — except the "user" is your skill, and instead of microphone audio, you supply a text prompt.
The model then composes a response (in the persona's voice, narrated through the same audio path) and speaks it.
await ctx.inject_turn(
"Tell the user: time to take their medication.",
priority=InjectPriority.BLOCK_BEHIND_COMMS,
)The user hears the persona's voice say something like "Don Carlos, te toca la pastilla." The model handled the warm narration; the skill just supplied the intent.
A real proactive skill: timers
The shipped timers skill is the canonical example. Its full shape:
A tool to schedule
The user says "set a timer in 30 minutes for the laundry." The model calls set_timer(seconds=1800, message="The laundry is done").
The handler stores the timer + spawns a wait task
if tool_name == "set_timer":
timer_id = uuid.uuid4().hex
fire_at = datetime.now() + timedelta(seconds=args["seconds"])
entry = {
"id": timer_id,
"fire_at": fire_at.isoformat(),
"message": args["message"],
}
await self._ctx.storage.set_setting(f"timer:{timer_id}", json.dumps(entry))
self._spawn_fire_task(entry)
return ToolResult(output=json.dumps({"scheduled": timer_id}))The fire task waits, then injects
def _spawn_fire_task(self, entry):
async def fire():
delay = (datetime.fromisoformat(entry["fire_at"]) - datetime.now()).total_seconds()
await asyncio.sleep(max(0, delay))
# The "Tell the user:" prefix matters — without it, terse personas
# may treat the inject as a silent notification. See "Inject prompts
# are LLM instructions" below.
await self._ctx.inject_turn(
f"Tell the user: {entry['message']}",
dedup_key=f"timer_{entry['id']}",
priority=InjectPriority.BLOCK_BEHIND_COMMS,
)
await self._ctx.storage.delete_setting(f"timer:{entry['id']}")
self._ctx.background_task(
name=f"timer_{entry['id']}",
coro_factory=fire,
restart_on_crash=False, # one-shot
)On setup, restore from storage
async def setup(self, ctx: SkillContext) -> None:
self._ctx = ctx
for key, value in await ctx.storage.list_settings("timer:"):
entry = json.loads(value)
# Skip stale timers that should have fired more than an hour ago.
if (datetime.now() - datetime.fromisoformat(entry["fire_at"])).total_seconds() > 3600:
await ctx.storage.delete_setting(key)
continue
self._spawn_fire_task(entry)That's the entire skill, conceptually. ~50 lines of real code.
The pattern shows up everywhere proactive skills are built:
- Schedule via tool call. User says "do X at time Y," skill stores the intent.
- Persist eagerly. Timers, reminders, notifications must survive server restart. Use
ctx.storage. - Background task waits.
ctx.background_taskspawns a managed asyncio task that the framework supervises. - Inject when due.
ctx.inject_turnwith the right priority. - Clean up after firing. Delete the storage entry after the inject, so it doesn't fire twice.
- Restore on setup. Read storage, reschedule any pending tasks.
The three priorities
inject_turn takes a priority argument that controls how the new turn races against everything else happening:
class InjectPriority(Enum):
NORMAL = "normal"
BLOCK_BEHIND_COMMS = "block_behind_comms"
PREEMPT = "preempt"NORMAL
The injection drains only when nothing else is happening. If the user is in the middle of a turn (mid-PTT), or the model is speaking, or a phone call is active, or a book is playing — the injection waits.
When does it fire? When the framework reaches an idle moment. Often this is "shortly after the next turn ends."
Use NORMAL for non-urgent announcements:
- "You have 3 unread messages."
- "Don't forget to pick up groceries."
BLOCK_BEHIND_COMMS
Preempts the CONTENT channel (audiobooks, music, podcasts), but yields to the COMMS channel (active phone calls). If the user is on a call, the injection waits for the call to end. Otherwise, it interrupts whatever's playing, speaks, and (if it preempted a CONTENT stream) the framework resumes the stream after.
This is the right default for most timers and reminders. Don't interrupt phone calls — that's rude. But do interrupt the audiobook — that's the point of a timer.
PREEMPT
Preempts everything, including active phone calls. The phone call's on_claim_end fires with ClaimEndReason.PREEMPTED, the model speaks the urgent message, and the call is dead.
Use PREEMPT only for genuinely urgent events:
- Fire alarm.
- Security breach.
- Medical alert.
If you're not sure, you don't want PREEMPT. Use BLOCK_BEHIND_COMMS.
dedup_key — preventing duplicates
If your skill might fire the same message multiple times in quick succession, pass dedup_key:
await ctx.inject_turn(
f"Message from {sender}: {body}",
dedup_key=f"telegram_{message_id}",
priority=InjectPriority.BLOCK_BEHIND_COMMS,
)The framework dedups injections by key — if a turn with that key is already pending, the new one is dropped silently. This handles the common case of "the API gave me the same notification twice."
dedup_key only dedups queued injections, not fired ones. If your first inject already fired and a second one arrives with the same key, the second still fires. For fully-debounced behavior (the same kind of notification within N seconds gets coalesced), you need a skill-side debounce buffer. See Cookbook: Notifications for the pattern.
inject_turn_and_wait
Sometimes you need the injection to finish before doing something else. Example: announce an incoming call before bridging audio. If you bridge first and then announce, the user hears the caller's audio before they hear the announcement — confusing.
await ctx.inject_turn_and_wait(
f"Llamada entrante de {caller_name}, contestando.",
priority=InjectPriority.BLOCK_BEHIND_COMMS,
)
# Now bridge audio.
claim = InputClaim(on_mic_frame=..., speaker_source=..., on_claim_end=...)
await ctx.start_input_claim(claim)inject_turn_and_wait blocks until the model finishes speaking (response_done). Then your code continues.
If the coordinator is already busy (a turn in progress) when inject_turn_and_wait is called, it falls back to a plain enqueue and returns immediately. The wait semantics only hold when the call could fire its turn directly. Defensive code: don't assume the model is done speaking just because inject_turn_and_wait returned.
Background tasks
ctx.background_task is how skills run long-lived async work — schedulers, websocket clients, polling loops. The framework supervises:
def _start_listener(self):
async def listener():
async with telegram_client.connect() as ws:
async for msg in ws:
await self._handle_inbound(msg)
self._handle = self._ctx.background_task(
name="telegram_listener",
coro_factory=listener,
restart_on_crash=True,
max_restarts_per_hour=10,
on_permanent_failure=self._on_telegram_dead,
)
async def _on_telegram_dead(self, failure):
await self._ctx.inject_turn(
"Telegram disconnected and won't reconnect. Tell the user to check the server.",
priority=InjectPriority.NORMAL,
)Key options:
restart_on_crash— if your task throws, framework restarts it. UseTruefor long-lived listeners;Falsefor one-shot waiters (like a single timer firing).max_restarts_per_hour— limits restart loops. After this many crashes in an hour, the task is marked permanently failed.on_permanent_failure— called once when the task gives up. Often: inject a turn so the user knows.
A common pitfall: callbacks inside the framework's actors
Some callbacks (like on_claim_end and on_patience_expired) fire from inside the framework's serialized actors. You cannot await ctx.inject_turn directly from these callbacks — it would deadlock the actor.
Instead, schedule the inject:
async def on_call_ended(self, reason: ClaimEndReason):
if reason == ClaimEndReason.NATURAL:
# WRONG: would deadlock
# await self._ctx.inject_turn(...)
# RIGHT: schedule on the event loop
asyncio.create_task(self._ctx.inject_turn(
"La llamada terminó. Resume lo que estabas haciendo.",
priority=InjectPriority.BLOCK_BEHIND_COMMS,
))The lesson: when in doubt, asyncio.create_task it. The framework's logs will tell you if you got it wrong (you'll see a hang followed by a timeout).
Inject prompts are LLM instructions, not facts
The prompt you pass to inject_turn is an instruction to the model. If you want the model to speak the content, say so:
# Wrong: the model may treat this as a silent notification.
await ctx.inject_turn("The laundry is done.")
# Right: explicit instruction to narrate.
await ctx.inject_turn("Tell the user: the laundry is done.")This is especially important when the persona's prompt has any "be quiet unless asked" language. The inject must override that intent.
Test before you ship
Test that your inject fires:
@pytest.mark.asyncio
async def test_timer_fires_inject():
skill = TimersSkill()
ctx = AsyncMock()
await skill.setup(ctx)
# Schedule a near-immediate timer.
await skill.handle("set_timer", {"seconds": 0.1, "message": "test"})
await asyncio.sleep(0.3)
# Assert inject_turn was called with the right instruction prefix.
ctx.inject_turn.assert_called_once_with(
"Tell the user: test",
dedup_key=ANY,
priority=InjectPriority.BLOCK_BEHIND_COMMS,
)Real-time testing of injects against the running framework is harder — you'd need to mock the OpenAI Realtime session. The shipped framework has integration tests that exercise this; for skill tests, the unit-level "did inject_turn get called correctly" check is usually enough.