Listen + Pusher Pipeline — Sequence Diagrams
Last updated: 2026-05-13 (PR #7142 — add Modulate Velma-2 STT, provider-agnostic architecture) These diagrams document the real behavior observed during E2E testing with live services (backend, pusher, STT providers, embedding API). Update when the pipeline changes.
1. Connection + Streaming + Transcription
The STT provider is selected at session start viaSTT_SERVICE_MODELS env var
(e.g., modulate-velma-2,dg-nova-3). The first provider supporting the
requested language wins. All providers implement the STTSocket ABC and are
wrapped by the universal GatedSTTSocket VAD gate.
Segments buffered in realtime_segment_buffers are sorted by start time
before processing — this corrects non-deterministic WebSocket arrival order
from providers like Modulate whose internal parallelism can deliver shorter
utterances before longer ones.
2. Conversation Lifecycle (Silence Timeout Path)
This is the normal path when the client stays connected but stops speaking. Key design rule (#6061): Listen NEVER processes conversations locally. All conversation processing routes through pusher viarequest_conversation_processing().
When pusher is unavailable, conversations stay buffered in pending_conversation_requests
and are flushed when pusher reconnects.
3. Disconnect Path
What happens when the WS connection closes.3.1 Pusher Reconnect & Pending Flush
When pusher reconnects after a disconnection, all buffered conversations are replayed.4. Speaker ID Lifecycle (2-Session Flow)
Speaker identification requires two sessions: one to store the embedding, one to match against it. The user’s own voice is also identified via their speech profile embedding (loaded at session start alongside person embeddings).5. Private Cloud Sync (Audio Upload)
Whenprivate_cloud_sync_enabled is set for the user.
6. Event Wire Protocol
Server → Client (JSON over WS text frames)
| Type | Format | Example |
|---|---|---|
| Transcripts | JSON array | [{id, text, speaker, speaker_id, is_user, start, end}, ...] |
| Events | JSON object | {type: "...", ...} |
| Keepalive | Plain text | "ping" (not JSON — filter before parsing) |
Event Types
| Event | Fields | When |
|---|---|---|
service_status | {type, status: "ready"} | After WS connect, services initialized |
memory_processing_started | {type} | Conversation sent to pusher for LLM |
memory_created | {type, memory: {id, structured: {title, overview, ...}}} | LLM processing complete |
speaker_label_suggestion | {type, person_id, person_name, distance, segments} | Speaker matched via embedding |
Client → Server
| Type | Format | Notes |
|---|---|---|
| Audio | Binary frames | PCM16LE bytes |
| Silence keepalive | b'\x00' * 320 | Resets last_activity_time but NOT finished_at |
7. Timing Constants
| Constant | Value | Location | Purpose |
|---|---|---|---|
conversation_timeout | 120s (min) | transcribe.py | Silence before lifecycle triggers |
last_activity_time timeout | 90s | transcribe.py | WS inactivity disconnect |
SPEAKER_SAMPLE_MIN_AGE | 120s | pusher.py | Wait before extracting embedding (skipped on shutdown) |
SPEAKER_SAMPLE_PROCESS_INTERVAL | 15s | pusher.py | Queue poll interval |
WS_RECEIVE_TIMEOUT | 300s | pusher.py, transcribe.py | No-data timeout on WebSocket receive; detects dead connections |
BG_DRAIN_TIMEOUT | 30s | pusher.py, transcribe.py | Grace period for background tasks to drain after disconnect before force-cancel |
lifecycle_manager poll | 5s | transcribe.py:1683 | Check finished_at interval |
| Pusher audio batch | 60s | pusher | GCS upload batch size |
| Speaker match threshold | 0.45 | transcribe.py | Cosine distance cutoff |
PENDING_REQUEST_TIMEOUT | 120s | transcribe.py | Timeout before retrying a pending request |
MAX_RETRIES_PER_REQUEST | 3 | transcribe.py | Max retries before keeping buffered |
PUSHER_MAX_RECONNECT_ATTEMPTS | 6 | transcribe.py | Reconnect attempts before DEGRADED |
MAX_PENDING_REQUESTS | 100 | transcribe.py | Max buffered conversations per session |
8. WebSocket Task Supervision
Bothtranscribe.py and pusher.py use an asyncio.wait(FIRST_COMPLETED) supervisor loop instead of asyncio.gather() to manage background tasks. This prevents ghost connections where a hung background task blocks cleanup forever.
Supervisor exits on:
- Client disconnect (receive task completes)
- Background task crash (exception)
- Lifetime task normal completion (e.g. heartbeat inactivity timeout)
- Finite task normal completion (e.g.
process_pending_conversations,speaker_identification_task)
BG_DRAIN_TIMEOUT (30s) before force-cancel. The connection gauge (inc/dec) is always paired in try/finally.