WhatsApp Session Management with LangGraph Checkpointing
Context
Currently, the REST API (/execute) and WebSocket (/execute-ws) endpoints use LangGraph checkpointing to manage conversation history efficiently. This allows the AI to maintain context across multiple turns without manually reconstructing the entire conversation history from the database on each request.
However, the WhatsApp webhook handler (/webhooks/whatsapp) still uses the legacy approach: it manually fetches message history from the conversations and messages tables, converts them to LangChain message objects, and passes the full history to LangGraph on every invocation. This creates several issues:
- Performance overhead: Fetching and reconstructing 20+ messages on every WhatsApp message
- Inconsistency: Different channels using different state management patterns
- Missing features: No access to checkpoint-based session history API
- Limited scalability: Manual history reconstruction doesn't scale well with long conversations
Additionally, the user wants to:
- Track organization and user identification for WhatsApp conversations
- Ensure proper session management similar to the REST/WS implementation
- Maintain conversation metadata for org/user tracking
Objective
Migrate WhatsApp webhook handler to use LangGraph checkpointing for session management, aligning it with the REST/WS implementation, while maintaining org/user tracking through conversation metadata.
Current State Analysis
REST/WS Implementation (Good - Reference Pattern)
Files:
/home/bs01083/_work/chatbot_poc/core/app/api/v1/agent_flows.py(REST)/home/bs01083/_work/chatbot_poc/core/app/api/v1/agent_flows_ws.py(WebSocket)
Key Features:
- Uses
AsyncPostgresSavercheckpointer from/core/app/services/graph/checkpointer.py - Session ID passed as
thread_idin invoke config - Only new message passed to graph; checkpointer handles history
- Session metadata stored in
agent_flow_sessionstable - Full state persisted in LangGraph checkpoint tables (
langgraph_checkpoints, etc.)
WhatsApp Current Implementation (Needs Migration)
File: /home/bs01083/_work/chatbot_poc/core/app/api/v1/whatsapp_channel.py
Current Flow:
- Webhook receives message from Meta
- Extracts phone number and message text
- Finds/creates conversation in
conversationstable - Fetches last 20 messages from
messagestable - Manually converts to LangChain messages
- Passes full history to
graph.ainvoke() - Saves response back to
messagestable
Organization/User Tracking:
- Organization ID stored in
WhatsAppIntegration.organization_id - Conversation metadata stores:
wa_phone_number,wa_name,integration_id,organization_id - No authenticated
user_id(WhatsApp users are anonymous/external)
Gap Analysis
| Feature | REST/WS | WhatsApp (Current) | Required |
|---|---|---|---|
| LangGraph Checkpointing | ✅ Yes | ❌ No | ✅ Yes |
| Session ID Tracking | ✅ thread_id | ❌ Manual conversation lookup | ✅ Use conversation_id |
| AgentFlowSession Record | ✅ Yes | ❌ No | ✅ Yes |
| Organization Tracking | N/A | ✅ In metadata | ✅ Keep existing |
| User Identification | ✅ Keycloak user_id | ✅ Phone number in metadata | ✅ Keep existing |
| History Reconstruction | ✅ Automatic (checkpointer) | ❌ Manual (query DB) | ✅ Use checkpointer |
Proposed Solution
1. Architecture Alignment
Migrate WhatsApp to use the same checkpoint-based pattern as REST/WS:
┌─────────────────────────────────────────────────────────┐
│ WhatsApp Webhook Receives Message │
└──────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Get/Create Conversation (with org/user metadata) │
│ - Store wa_phone_number, wa_name │
│ - Store integration_id, organization_id │
└──────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Build Graph with Checkpointer │
│ - Pass checkpointer to create_flow_graph() │
│ - Use conversation_id as thread_id │
└──────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Check if First Message in Session │
│ - Query checkpoint state using conversation_id │
│ - is_first = (no existing checkpoint state) │
└──────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Build Initial State with Checkpoint Awareness │
│ - Use build_initial_state_with_checkpoint() │
│ - Only pass new message │
│ - Initialize state fields only on first message │
│ - Add channel context (org_id, phone, etc.) │
└──────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Invoke Graph with Config │
│ - config = {"configurable": {"thread_id": conv_id}} │
│ - Checkpointer automatically loads history │
└──────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Persist Session Record to agent_flow_sessions │
│ - flow_id, session_id (conversation_id) │
│ - channel_type: "whatsapp" │
│ - message_count │
└──────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Save Messages to Database (Optional - for UI/API) │
│ - User message and assistant response │
│ - Keep existing conversation/message storage │
└─────────────────────────────────────────────────────────┘
2. Key Design Decisions
A. Thread ID Strategy
- Use
conversation_idasthread_idfor LangGraph checkpointing - Rationale: Conversation already uniquely identifies a WhatsApp user-bot interaction
- Each conversation = one checkpoint thread = one session
B. Organization & User Tracking
- Keep existing metadata approach for org/user tracking
- Store in
Conversation.conv_metadataJSONB:{
"wa_phone_number": "1234567890",
"wa_name": "John Doe",
"integration_id": "uuid",
"organization_id": "uuid"
} - Rationale: WhatsApp users are external/anonymous; no Keycloak user_id available
C. Session Persistence
- Create record in
agent_flow_sessionstable after each message - Use
upsert_session()to update message count - Fields:
flow_id: FromWhatsAppIntegration.agent_flow_idsession_id: Same asconversation_id(UUID as string)channel_type: "whatsapp"message_count: Incremented on each turn
D. Dual Storage Strategy
- Primary state storage: LangGraph checkpoints (for AI execution)
- Secondary storage:
conversations/messagestables (for UI/API access) - Rationale: Checkpoints optimized for AI, but UI needs structured message list
3. Implementation Steps
Step 1: Update WhatsApp Webhook Handler
File: /core/app/api/v1/whatsapp_channel.py
Changes:
- Import checkpointer utilities
- Initialize graph with checkpointer
- Check if first message (query checkpoint state)
- Use
build_initial_state_with_checkpoint()instead of manual history - Pass invoke config with
thread_id - Persist session record using
persist_session()
Step 2: Add Organization/User Context to State
File: /core/app/api/v1/whatsapp_channel.py
Include org/user metadata in context field of initial state:
initial_state = build_initial_state_with_checkpoint(
message=text_body,
trace_id=trace_id,
is_first_message=is_first_message,
is_resumed=not is_first_message
)
# Add WhatsApp-specific context with org/user info
if is_first_message:
initial_state["context"] = {
"channel": "whatsapp",
"organization_id": str(integration.organization_id),
"wa_phone_number": from_number,
"wa_name": sender_name,
"integration_id": str(integration.id),
}
Step 3: Remove Manual History Reconstruction
File: /core/app/api/v1/whatsapp_channel.py
Delete functions:
get_conversation_history()(lines 98-116) - No longer needed- Manual message conversion logic (lines 321-329) - Replace with checkpoint
Step 4: Persist Session Metadata
File: /core/app/api/v1/whatsapp_channel.py
After successful graph invocation:
from app.utils.session_utils import persist_session
# Track message count (increment by 2: user + assistant)
message_count = message_count + 2
# Persist session record
await persist_session(
session_repo=session_repo,
flow_id=integration.agent_flow_id,
session_id=str(conversation_id),
channel_type="whatsapp",
message_count=message_count
)
Step 5: Update Dependencies
File: /core/app/api/v1/whatsapp_channel.py
Add imports:
from app.services.graph.checkpointer import get_checkpointer
from app.utils.checkpoint_utils import build_initial_state_with_checkpoint
from app.utils.session_utils import persist_session
from app.repositories.agent_flow_sessions_repository import AgentFlowSessionRepository
Add dependency injection:
async def whatsapp_webhook(
# ... existing params ...
session_repo: Annotated[AgentFlowSessionRepository, Depends(get_agent_flow_session_repository)],
):
4. Critical Files to Modify
| File | Changes | Lines (Approx) |
|---|---|---|
/core/app/api/v1/whatsapp_channel.py | Primary refactor: add checkpointing, remove manual history | 200-370 |
/core/app/api/deps.py | Add get_agent_flow_session_repository dependency if not exists | N/A (may already exist) |
5. Reusable Components (Already Exist)
| Component | Location | Usage |
|---|---|---|
AsyncPostgresSaver | /core/app/services/graph/checkpointer.py | Initialize checkpointer |
get_checkpointer() | Same file | Get initialized checkpointer instance |
build_initial_state_with_checkpoint() | /core/app/utils/checkpoint_utils.py | Build state for first/resumed messages |
persist_session() | /core/app/utils/session_utils.py | Persist session metadata |
AgentFlowSessionRepository | /core/app/repositories/agent_flow_sessions_repository.py | Session CRUD operations |
create_flow_graph() | /core/app/services/graph/flow_builder.py | Build graph with checkpointer |
6. Organization & User Retrieval
For WhatsApp conversations:
# Get conversation with metadata
conversation = await db.get(Conversation, conversation_id)
# Extract org/user info from metadata
organization_id = conversation.conv_metadata.get("organization_id")
wa_phone_number = conversation.conv_metadata.get("wa_phone_number")
wa_name = conversation.conv_metadata.get("wa_name")
integration_id = conversation.conv_metadata.get("integration_id")
Query by organization:
# Find all WhatsApp conversations for an organization
conversations = await db.execute(
select(Conversation)
.where(
Conversation.channel_type == "whatsapp",
Conversation.conv_metadata["organization_id"].astext == str(org_id)
)
)
7. Backwards Compatibility
Existing conversations with message history:
- First new message after migration: Checkpoint starts fresh
- Old messages in
messagestable: Remain accessible via Conversation API - No data loss: Historical messages preserved in database
- Gradual transition: New messages use checkpointing; old history still queryable
Migration strategy:
- No database migration needed
- Checkpointing activates automatically on first new message
- Conversations created before migration continue to work
8. Testing & Verification
Unit Tests
- Test checkpoint state initialization for first message
- Test checkpoint state retrieval for resumed conversation
- Test session persistence to
agent_flow_sessions - Test org/user metadata in conversation
Integration Tests
- Send message to WhatsApp webhook
- Verify checkpoint created in
langgraph_checkpoints - Send follow-up message
- Verify checkpoint updated (history preserved)
- Query session detail endpoint
- Verify org/user info in response
Manual Testing
- Send WhatsApp message to bot
- Check
agent_flow_sessionstable for new record - Send another message
- Verify context preserved across turns
- Check
conversationsandmessagestables still populated (dual storage)
Benefits
- Performance: Eliminates 20-message query on every WhatsApp message
- Consistency: All channels (REST, WS, WhatsApp) use same state management
- Scalability: Checkpointing handles long conversations efficiently
- Features: Access to session history API, state snapshots, turn tracking
- Maintainability: Single code path for history management
- Organization tracking: Preserved in conversation metadata and accessible via queries
- User identification: Phone number + integration ID continues to work
Risks & Mitigations
| Risk | Mitigation |
|---|---|
| Checkpoint initialization performance | Minimal overhead; checkpointer uses connection pooling |
| Conversation metadata JSONB queries | Already indexed; performance acceptable |
| Dual storage overhead | Only writes 2 messages per turn; acceptable cost for UI/API access |
| Session table growth | Add retention policy/archival in future if needed |
Open Questions
None - all requirements clarified through codebase exploration.
Future Enhancements
- Session retention policy: Archive old sessions after N days
- Org-level analytics: Aggregate WhatsApp conversation metrics by organization
- User identity linking: If WhatsApp users authenticate later, link phone to Keycloak user_id
- Checkpoint cleanup: Implement scheduled job to clean old checkpoints
Summary
This plan migrates WhatsApp to use LangGraph checkpointing (matching REST/WS implementation) while preserving organization and user tracking through conversation metadata. The solution reuses existing utilities, maintains backwards compatibility, and provides a clear path for implementation with minimal risk.