> ## Documentation Index
> Fetch the complete documentation index at: https://docs.omi.me/llms.txt
> Use this file to discover all available pages before exploring further.

# Listen + Pusher Pipeline

> Sequence diagrams for the /v4/listen WebSocket and Pusher processing pipeline

# Listen + Pusher Pipeline — Sequence Diagrams

> Last updated: 2026-03-28 (PR #6061 — remove local fallback)
>
> These diagrams document the real behavior observed during E2E testing with
> live services (backend, pusher, Deepgram, embedding API). Update when the
> pipeline changes.

## 1. Connection + Streaming + Transcription

```mermaid theme={null}
sequenceDiagram
    participant Client
    participant Backend as Backend (/v4/listen)
    participant Deepgram
    participant Pusher
    participant Firestore

    Client->>Backend: WS connect (uid, language, sample_rate, codec)
    Backend->>Backend: Auth check (dev-token or Firebase)
    Backend->>Firestore: Create stub conversation (status=in_progress)
    Backend->>Deepgram: Open STT WebSocket
    Backend->>Pusher: Open Pusher WebSocket
    Backend-->>Client: {type: "service_status", status: "ready"}

    Note over Backend: Start background tasks:<br/>conversation_lifecycle_manager (5s poll)<br/>speaker_identification_task<br/>stream_transcript_process

    loop Audio Streaming
        Client->>Backend: PCM audio bytes
        Backend->>Backend: Update last_activity_time
        Backend->>Deepgram: Forward audio
        Deepgram-->>Backend: Transcript segments
        Backend->>Backend: Update finished_at (only on real segments)
        Backend->>Firestore: Write transcript_segments
        Backend-->>Client: JSON array of segments [{id, text, speaker, ...}]
    end
```

## 2. Conversation Lifecycle (Silence Timeout Path)

This is the normal path when the client stays connected but stops speaking.

**Key design rule (#6061):** Listen NEVER processes conversations locally.
All conversation processing routes through pusher via `request_conversation_processing()`.
When pusher is unavailable, conversations stay buffered in `pending_conversation_requests`
and are flushed when pusher reconnects.

```mermaid theme={null}
sequenceDiagram
    participant Client
    participant Backend as Backend
    participant Pusher
    participant OpenAI as OpenAI (LLM)
    participant Firestore

    Note over Client: Client stops sending audio<br/>(silence keepalive OK — doesn't update finished_at)

    loop conversation_lifecycle_manager (every 5s)
        Backend->>Firestore: Check finished_at on current conversation
        Note over Backend: If now() - finished_at >= conversation_timeout (120s)
    end

    Backend->>Backend: _process_conversation(conversation_id)
    alt Pusher available (request_conversation_processing != None)
        Backend->>Firestore: Set status=processing
        Backend->>Pusher: Send process_conversation request (opcode 104)
        Backend->>Firestore: Create new stub conversation
    else Pusher not configured (HOSTED_PUSHER_API_URL unset)
        Note over Backend: Skip processing — conversation stays in_progress permanently.<br/>Pusher is required for conversation processing in production.
    end

    alt Pusher connected
        Pusher->>OpenAI: Generate title, overview, category, emoji, action_items
        OpenAI-->>Pusher: Structured data
        Pusher->>Firestore: Write structured data + set status=completed
        Pusher->>OpenAI: Extract memories
        Pusher->>Firestore: Save embedding vector
        Pusher-->>Backend: Opcode 201 (conversation processed callback)
        Backend-->>Client: {type: "memory_processing_started"}
        Backend-->>Client: {type: "memory_created", memory: {...}}
    else Pusher disconnected (RECONNECT_BACKOFF / DEGRADED)
        Note over Backend: Request buffered in pending_conversation_requests<br/>Resent automatically when pusher reconnects
    end
```

## 3. Disconnect Path

What happens when the WS connection closes.

```mermaid theme={null}
sequenceDiagram
    participant Client
    participant Backend as Backend
    participant Pusher

    Client->>Backend: WS close (code=1000)
    Backend->>Backend: Log "Client disconnected"

    alt Multi-channel
        Backend->>Backend: _process_conversation(current_conversation_id)
        alt Pusher available
            Backend->>Pusher: Send process_conversation request (opcode 104)
        else Pusher not configured
            Note over Backend: Conversation stays in_progress permanently<br/>until HOSTED_PUSHER_API_URL is configured
        end
    else Single-channel
        Note over Backend: No explicit _process_conversation() call<br/>Relies on next session to pick up as stale
    end

    Backend->>Backend: Cancel ALL background tasks<br/>(including conversation_lifecycle_manager)
    Backend->>Backend: Close Pusher socket
    Backend->>Backend: Cleanup memory (clear caches, buffers)

    Note over Backend: No local fallback processing (#6061).<br/>Pusher is required for conversation processing.<br/>Without it, conversations stay in_progress permanently.
```

## 3.1 Pusher Reconnect & Pending Flush

When pusher reconnects after a disconnection, all buffered conversations are replayed.

```mermaid theme={null}
sequenceDiagram
    participant Backend as Backend
    participant Pusher

    Note over Backend: Pusher disconnects → RECONNECT_BACKOFF

    loop _pusher_reconnect_loop (exponential backoff)
        Backend->>Pusher: Attempt reconnect
        alt Connection fails
            Note over Backend: Increment reconnect_attempts
            alt attempts >= 6 OR circuit breaker open
                Note over Backend: Transition to DEGRADED<br/>Pending conversations KEPT buffered
            end
        else Connection succeeds
            Note over Backend: Transition to CONNECTED
            Backend->>Backend: Flush pending_conversation_requests
            loop Each buffered conversation
                Backend->>Pusher: Re-send process_conversation (opcode 104)
            end
        end
    end

    Note over Backend: Retry exhaustion: after MAX_RETRIES (3),<br/>conversation stays buffered with reset sent_at.<br/>cleanup_processing_conversations picks it up next session.
```

## 4. Speaker ID Lifecycle (2-Session Flow)

Speaker identification requires two sessions: one to store the embedding, one
to match against it. The user's own voice is also identified via their speech
profile embedding (loaded at session start alongside person embeddings).

```mermaid theme={null}
sequenceDiagram
    participant Client
    participant Backend as Backend
    participant Pusher
    participant EmbeddingAPI as Embedding API (wespeaker)
    participant GCS as Google Cloud Storage
    participant Firestore

    Note over Client: SESSION 1: Assign Speaker

    Client->>Backend: Stream audio (speech segments)
    Backend-->>Client: Transcript segments with speaker labels
    Client->>Backend: POST /assign_speaker {segment_id, person_id}
    Backend->>Backend: Queue speaker sample (opcode 105)
    Backend->>Pusher: Forward speaker sample request

    Note over Pusher: SPEAKER_SAMPLE_MIN_AGE = 120s<br/>SPEAKER_SAMPLE_PROCESS_INTERVAL = 15s

    loop Speaker sample queue (every 15s)
        Pusher->>Pusher: Check if sample age >= 120s
    end

    Pusher->>EmbeddingAPI: POST /v1/speakers/extract (audio bytes)
    EmbeddingAPI-->>Pusher: 256-d embedding vector
    Pusher->>Firestore: Store embedding in person.speech_samples[]

    Note over Client: SESSION 2: Auto-Label

    Client->>Backend: WS connect
    Backend->>Firestore: Load person_embeddings_cache (once at session start)
    Backend->>GCS: Load user speech profile WAV (if exists)
    Backend->>EmbeddingAPI: Extract user embedding from profile audio
    Note over Backend: Cache user embedding as 'user' sentinel
    Client->>Backend: Stream audio
    Backend->>EmbeddingAPI: Extract embedding from current speech
    EmbeddingAPI-->>Backend: 256-d embedding
    Backend->>Backend: Cosine distance vs cached embeddings (persons + user)
    Note over Backend: Match if distance < 0.45
    alt Match is user's own voice
        Backend->>Backend: Set is_user=True on segments via speaker_to_person_map
    else Match is known person
        Backend-->>Client: {type: "speaker_label_suggestion", person_id, distance}
    end
```

## 5. Private Cloud Sync (Audio Upload)

When `private_cloud_sync_enabled` is set for the user.

```mermaid theme={null}
sequenceDiagram
    participant Client
    participant Backend as Backend
    participant Pusher
    participant GCS as Google Cloud Storage
    participant Firestore

    Client->>Backend: Stream audio
    Backend->>Pusher: Forward audio bytes

    loop Every 60s batch
        Pusher->>Pusher: Encode audio to Opus
        Pusher->>GCS: Upload audio chunk
        Pusher->>Firestore: Append to conversation.audio_files[]
    end

    Note over Pusher: In LOCAL_DEVELOPMENT mode,<br/>GCS upload fails (no prod creds).<br/>Conversation lifecycle still works.
```

## 6. Event Wire Protocol

### Server → Client (JSON over WS text frames)

| Type        | Format      | Example                                                       |
| ----------- | ----------- | ------------------------------------------------------------- |
| Transcripts | JSON array  | `[{id, text, speaker, speaker_id, is_user, start, end}, ...]` |
| Events      | JSON object | `{type: "...", ...}`                                          |
| Keepalive   | Plain text  | `"ping"` (not JSON — filter before parsing)                   |

### Event Types

| Event                       | Fields                                                     | When                                   |
| --------------------------- | ---------------------------------------------------------- | -------------------------------------- |
| `service_status`            | `{type, status: "ready"}`                                  | After WS connect, services initialized |
| `memory_processing_started` | `{type}`                                                   | Conversation sent to pusher for LLM    |
| `memory_created`            | `{type, memory: {id, structured: {title, overview, ...}}}` | LLM processing complete                |
| `speaker_label_suggestion`  | `{type, person_id, person_name, distance, segments}`       | Speaker matched via embedding          |

### Client → Server

| Type              | Format          | Notes                                             |
| ----------------- | --------------- | ------------------------------------------------- |
| Audio             | Binary frames   | PCM16LE bytes                                     |
| Silence keepalive | `b'\x00' * 320` | Resets `last_activity_time` but NOT `finished_at` |

## 7. Timing Constants

| Constant                          | Value      | Location           | Purpose                                   |
| --------------------------------- | ---------- | ------------------ | ----------------------------------------- |
| `conversation_timeout`            | 120s (min) | transcribe.py      | Silence before lifecycle triggers         |
| `last_activity_time` timeout      | 90s        | transcribe.py      | WS inactivity disconnect                  |
| `SPEAKER_SAMPLE_MIN_AGE`          | 120s       | pusher.py:39       | Wait before extracting embedding          |
| `SPEAKER_SAMPLE_PROCESS_INTERVAL` | 15s        | pusher.py:40       | Queue poll interval                       |
| `lifecycle_manager` poll          | 5s         | transcribe.py:1683 | Check `finished_at` interval              |
| Pusher audio batch                | 60s        | pusher             | GCS upload batch size                     |
| Speaker match threshold           | 0.45       | transcribe.py      | Cosine distance cutoff                    |
| `PENDING_REQUEST_TIMEOUT`         | 120s       | transcribe.py      | Timeout before retrying a pending request |
| `MAX_RETRIES_PER_REQUEST`         | 3          | transcribe.py      | Max retries before keeping buffered       |
| `PUSHER_MAX_RECONNECT_ATTEMPTS`   | 6          | transcribe.py      | Reconnect attempts before DEGRADED        |
| `MAX_PENDING_REQUESTS`            | 100        | transcribe.py      | Max buffered conversations per session    |
