Skip to main content

WhatsApp Session Management with LangGraph Checkpointing

Context

Currently, the REST API (/execute) and WebSocket (/execute-ws) endpoints use LangGraph checkpointing to manage conversation history efficiently. This allows the AI to maintain context across multiple turns without manually reconstructing the entire conversation history from the database on each request.

However, the WhatsApp webhook handler (/webhooks/whatsapp) still uses the legacy approach: it manually fetches message history from the conversations and messages tables, converts them to LangChain message objects, and passes the full history to LangGraph on every invocation. This creates several issues:

  1. Performance overhead: Fetching and reconstructing 20+ messages on every WhatsApp message
  2. Inconsistency: Different channels using different state management patterns
  3. Missing features: No access to checkpoint-based session history API
  4. Limited scalability: Manual history reconstruction doesn't scale well with long conversations

Additionally, the user wants to:

  • Track organization and user identification for WhatsApp conversations
  • Ensure proper session management similar to the REST/WS implementation
  • Maintain conversation metadata for org/user tracking

Objective

Migrate WhatsApp webhook handler to use LangGraph checkpointing for session management, aligning it with the REST/WS implementation, while maintaining org/user tracking through conversation metadata.

Current State Analysis

REST/WS Implementation (Good - Reference Pattern)

Files:

  • /home/bs01083/_work/chatbot_poc/core/app/api/v1/agent_flows.py (REST)
  • /home/bs01083/_work/chatbot_poc/core/app/api/v1/agent_flows_ws.py (WebSocket)

Key Features:

  1. Uses AsyncPostgresSaver checkpointer from /core/app/services/graph/checkpointer.py
  2. Session ID passed as thread_id in invoke config
  3. Only new message passed to graph; checkpointer handles history
  4. Session metadata stored in agent_flow_sessions table
  5. Full state persisted in LangGraph checkpoint tables (langgraph_checkpoints, etc.)

WhatsApp Current Implementation (Needs Migration)

File: /home/bs01083/_work/chatbot_poc/core/app/api/v1/whatsapp_channel.py

Current Flow:

  1. Webhook receives message from Meta
  2. Extracts phone number and message text
  3. Finds/creates conversation in conversations table
  4. Fetches last 20 messages from messages table
  5. Manually converts to LangChain messages
  6. Passes full history to graph.ainvoke()
  7. Saves response back to messages table

Organization/User Tracking:

  • Organization ID stored in WhatsAppIntegration.organization_id
  • Conversation metadata stores: wa_phone_number, wa_name, integration_id, organization_id
  • No authenticated user_id (WhatsApp users are anonymous/external)

Gap Analysis

FeatureREST/WSWhatsApp (Current)Required
LangGraph Checkpointing✅ Yes❌ No✅ Yes
Session ID Trackingthread_id❌ Manual conversation lookup✅ Use conversation_id
AgentFlowSession Record✅ Yes❌ No✅ Yes
Organization TrackingN/A✅ In metadata✅ Keep existing
User Identification✅ Keycloak user_id✅ Phone number in metadata✅ Keep existing
History Reconstruction✅ Automatic (checkpointer)❌ Manual (query DB)✅ Use checkpointer

Proposed Solution

1. Architecture Alignment

Migrate WhatsApp to use the same checkpoint-based pattern as REST/WS:

┌─────────────────────────────────────────────────────────┐
│ WhatsApp Webhook Receives Message │
└──────────────────┬──────────────────────────────────────┘


┌─────────────────────────────────────────────────────────┐
│ Get/Create Conversation (with org/user metadata) │
│ - Store wa_phone_number, wa_name │
│ - Store integration_id, organization_id │
└──────────────────┬──────────────────────────────────────┘


┌─────────────────────────────────────────────────────────┐
│ Build Graph with Checkpointer │
│ - Pass checkpointer to create_flow_graph() │
│ - Use conversation_id as thread_id │
└──────────────────┬──────────────────────────────────────┘


┌─────────────────────────────────────────────────────────┐
│ Check if First Message in Session │
│ - Query checkpoint state using conversation_id │
│ - is_first = (no existing checkpoint state) │
└──────────────────┬──────────────────────────────────────┘


┌─────────────────────────────────────────────────────────┐
│ Build Initial State with Checkpoint Awareness │
│ - Use build_initial_state_with_checkpoint() │
│ - Only pass new message │
│ - Initialize state fields only on first message │
│ - Add channel context (org_id, phone, etc.) │
└──────────────────┬──────────────────────────────────────┘


┌─────────────────────────────────────────────────────────┐
│ Invoke Graph with Config │
│ - config = {"configurable": {"thread_id": conv_id}} │
│ - Checkpointer automatically loads history │
└──────────────────┬──────────────────────────────────────┘


┌─────────────────────────────────────────────────────────┐
│ Persist Session Record to agent_flow_sessions │
│ - flow_id, session_id (conversation_id) │
│ - channel_type: "whatsapp" │
│ - message_count │
└──────────────────┬──────────────────────────────────────┘


┌─────────────────────────────────────────────────────────┐
│ Save Messages to Database (Optional - for UI/API) │
│ - User message and assistant response │
│ - Keep existing conversation/message storage │
└─────────────────────────────────────────────────────────┘

2. Key Design Decisions

A. Thread ID Strategy

  • Use conversation_id as thread_id for LangGraph checkpointing
  • Rationale: Conversation already uniquely identifies a WhatsApp user-bot interaction
  • Each conversation = one checkpoint thread = one session

B. Organization & User Tracking

  • Keep existing metadata approach for org/user tracking
  • Store in Conversation.conv_metadata JSONB:
    {
    "wa_phone_number": "1234567890",
    "wa_name": "John Doe",
    "integration_id": "uuid",
    "organization_id": "uuid"
    }
  • Rationale: WhatsApp users are external/anonymous; no Keycloak user_id available

C. Session Persistence

  • Create record in agent_flow_sessions table after each message
  • Use upsert_session() to update message count
  • Fields:
    • flow_id: From WhatsAppIntegration.agent_flow_id
    • session_id: Same as conversation_id (UUID as string)
    • channel_type: "whatsapp"
    • message_count: Incremented on each turn

D. Dual Storage Strategy

  • Primary state storage: LangGraph checkpoints (for AI execution)
  • Secondary storage: conversations/messages tables (for UI/API access)
  • Rationale: Checkpoints optimized for AI, but UI needs structured message list

3. Implementation Steps

Step 1: Update WhatsApp Webhook Handler

File: /core/app/api/v1/whatsapp_channel.py

Changes:

  1. Import checkpointer utilities
  2. Initialize graph with checkpointer
  3. Check if first message (query checkpoint state)
  4. Use build_initial_state_with_checkpoint() instead of manual history
  5. Pass invoke config with thread_id
  6. Persist session record using persist_session()

Step 2: Add Organization/User Context to State

File: /core/app/api/v1/whatsapp_channel.py

Include org/user metadata in context field of initial state:

initial_state = build_initial_state_with_checkpoint(
message=text_body,
trace_id=trace_id,
is_first_message=is_first_message,
is_resumed=not is_first_message
)

# Add WhatsApp-specific context with org/user info
if is_first_message:
initial_state["context"] = {
"channel": "whatsapp",
"organization_id": str(integration.organization_id),
"wa_phone_number": from_number,
"wa_name": sender_name,
"integration_id": str(integration.id),
}

Step 3: Remove Manual History Reconstruction

File: /core/app/api/v1/whatsapp_channel.py

Delete functions:

  • get_conversation_history() (lines 98-116) - No longer needed
  • Manual message conversion logic (lines 321-329) - Replace with checkpoint

Step 4: Persist Session Metadata

File: /core/app/api/v1/whatsapp_channel.py

After successful graph invocation:

from app.utils.session_utils import persist_session

# Track message count (increment by 2: user + assistant)
message_count = message_count + 2

# Persist session record
await persist_session(
session_repo=session_repo,
flow_id=integration.agent_flow_id,
session_id=str(conversation_id),
channel_type="whatsapp",
message_count=message_count
)

Step 5: Update Dependencies

File: /core/app/api/v1/whatsapp_channel.py

Add imports:

from app.services.graph.checkpointer import get_checkpointer
from app.utils.checkpoint_utils import build_initial_state_with_checkpoint
from app.utils.session_utils import persist_session
from app.repositories.agent_flow_sessions_repository import AgentFlowSessionRepository

Add dependency injection:

async def whatsapp_webhook(
# ... existing params ...
session_repo: Annotated[AgentFlowSessionRepository, Depends(get_agent_flow_session_repository)],
):

4. Critical Files to Modify

FileChangesLines (Approx)
/core/app/api/v1/whatsapp_channel.pyPrimary refactor: add checkpointing, remove manual history200-370
/core/app/api/deps.pyAdd get_agent_flow_session_repository dependency if not existsN/A (may already exist)

5. Reusable Components (Already Exist)

ComponentLocationUsage
AsyncPostgresSaver/core/app/services/graph/checkpointer.pyInitialize checkpointer
get_checkpointer()Same fileGet initialized checkpointer instance
build_initial_state_with_checkpoint()/core/app/utils/checkpoint_utils.pyBuild state for first/resumed messages
persist_session()/core/app/utils/session_utils.pyPersist session metadata
AgentFlowSessionRepository/core/app/repositories/agent_flow_sessions_repository.pySession CRUD operations
create_flow_graph()/core/app/services/graph/flow_builder.pyBuild graph with checkpointer

6. Organization & User Retrieval

For WhatsApp conversations:

# Get conversation with metadata
conversation = await db.get(Conversation, conversation_id)

# Extract org/user info from metadata
organization_id = conversation.conv_metadata.get("organization_id")
wa_phone_number = conversation.conv_metadata.get("wa_phone_number")
wa_name = conversation.conv_metadata.get("wa_name")
integration_id = conversation.conv_metadata.get("integration_id")

Query by organization:

# Find all WhatsApp conversations for an organization
conversations = await db.execute(
select(Conversation)
.where(
Conversation.channel_type == "whatsapp",
Conversation.conv_metadata["organization_id"].astext == str(org_id)
)
)

7. Backwards Compatibility

Existing conversations with message history:

  • First new message after migration: Checkpoint starts fresh
  • Old messages in messages table: Remain accessible via Conversation API
  • No data loss: Historical messages preserved in database
  • Gradual transition: New messages use checkpointing; old history still queryable

Migration strategy:

  • No database migration needed
  • Checkpointing activates automatically on first new message
  • Conversations created before migration continue to work

8. Testing & Verification

Unit Tests

  1. Test checkpoint state initialization for first message
  2. Test checkpoint state retrieval for resumed conversation
  3. Test session persistence to agent_flow_sessions
  4. Test org/user metadata in conversation

Integration Tests

  1. Send message to WhatsApp webhook
  2. Verify checkpoint created in langgraph_checkpoints
  3. Send follow-up message
  4. Verify checkpoint updated (history preserved)
  5. Query session detail endpoint
  6. Verify org/user info in response

Manual Testing

  1. Send WhatsApp message to bot
  2. Check agent_flow_sessions table for new record
  3. Send another message
  4. Verify context preserved across turns
  5. Check conversations and messages tables still populated (dual storage)

Benefits

  1. Performance: Eliminates 20-message query on every WhatsApp message
  2. Consistency: All channels (REST, WS, WhatsApp) use same state management
  3. Scalability: Checkpointing handles long conversations efficiently
  4. Features: Access to session history API, state snapshots, turn tracking
  5. Maintainability: Single code path for history management
  6. Organization tracking: Preserved in conversation metadata and accessible via queries
  7. User identification: Phone number + integration ID continues to work

Risks & Mitigations

RiskMitigation
Checkpoint initialization performanceMinimal overhead; checkpointer uses connection pooling
Conversation metadata JSONB queriesAlready indexed; performance acceptable
Dual storage overheadOnly writes 2 messages per turn; acceptable cost for UI/API access
Session table growthAdd retention policy/archival in future if needed

Open Questions

None - all requirements clarified through codebase exploration.

Future Enhancements

  1. Session retention policy: Archive old sessions after N days
  2. Org-level analytics: Aggregate WhatsApp conversation metrics by organization
  3. User identity linking: If WhatsApp users authenticate later, link phone to Keycloak user_id
  4. Checkpoint cleanup: Implement scheduled job to clean old checkpoints

Summary

This plan migrates WhatsApp to use LangGraph checkpointing (matching REST/WS implementation) while preserving organization and user tracking through conversation metadata. The solution reuses existing utilities, maintains backwards compatibility, and provides a clear path for implementation with minimal risk.