Integration & Data Flow
Apollo RAG employs a sophisticated three-tier architecture with dual communication patterns optimized for both native OS integration and high-throughput data transfer.
Integration Overview
Communication Architecture
Apollo uses dual communication for optimization:
- Tauri IPC (JSON-RPC): Native features (health checks, model switching, notifications)
- Direct HTTP: Data endpoints (queries, document upload, streaming responses)
+----------------------------------------------------------------+
| React Frontend |
| WebView2 (Windows) / WebKit (macOS/Linux) |
+----------------------------------------------------------------+
| |
| Tauri IPC | Direct HTTP
| (Native Features) | (Data Transfer)
v v
+----------------------------------------------------------------+
| Tauri Bridge (Rust) |
| - IPC command handlers |
| - Backend health monitoring |
| - HTTP proxy to FastAPI |
+----------------------------------------------------------------+
|
| HTTP/REST
| Localhost:8000
v
+----------------------------------------------------------------+
| FastAPI Backend (Python) |
| - RAG orchestration engine |
| - llama.cpp GPU inference (CUDA) |
| - Vector database (Qdrant) |
| - Multi-tier caching (Redis) |
+----------------------------------------------------------------+Why Dual Communication? Tauri IPC is optimized for small, frequent messages (health checks, commands), while direct HTTP is better for large payloads (document uploads, streaming responses). This hybrid approach achieves the best of both worlds.
Upload Pipeline (15 Stages)
The document ingestion pipeline processes files through multiple security, processing, and indexing stages.
Stage Breakdown
- File Selection (Frontend)
└─ Location: src/components/Documents/DocumentUpload.tsx
└─ Timing: less than 1ms (user action)
- Client-Side Validation (Frontend)
├─ Size check: MAX 50MB
├─ Type check: .pdf, .txt, .docx, .doc, .md
└─ Timing: 1-2ms
- Multipart Upload (HTTP)
└─ POST /api/documents/upload
└─ Protocol: Direct Fetch (bypasses Tauri for large payloads)
└─ Timing: 100ms - 5s (network + file size dependent)
- Filename Sanitization (Backend)
└─ werkzeug.secure_filename() prevents path traversal
└─ Timing: less than 1ms
- Extension Validation (Backend)
└─ Whitelist check: [.pdf, .txt, .docx, .doc, .md]
└─ Timing: less than 1ms
- SHA256 Hash Computation (Backend)
└─ Duplicate detection
└─ Timing: 50-200ms (file size dependent)
- Path Validation (Backend)
└─ resolved_path.is_relative_to(documents_dir)
└─ Prevents directory traversal
└─ Timing: less than 1ms
- File Storage (Backend)
└─ Write to backend/documents/
└─ Timing: 10-500ms (disk I/O)
- User Triggers Reindexing (Frontend)
└─ Manual button click
└─ POST /api/documents/reindex
- Mutex Lock Acquisition (Backend)
└─ Prevents concurrent reindexing
└─ Returns HTTP 409 if already running
└─ Timing: less than 1ms
- Document Scanning (Backend)
└─ Scans documents/ directory
└─ Extracts text from PDFs, DOCX
└─ Location: backend/_src/document_processor.py
└─ Timing: 500ms - 5s per document
- Semantic Chunking (Backend)
└─ Chunk size: 1024 tokens
└─ Overlap: 128 tokens
└─ Timing: 100-500ms per document
- Embedding Generation (Backend)
└─ Model: BAAI/bge-large-en-v1.5
└─ Device: CPU (RTX 5080 PyTorch incompatible)
└─ Dimensions: 1024
└─ Timing: 50ms per chunk
- Vector Indexing (Qdrant)
└─ Dense vectors + Sparse BM25
└─ Hybrid search enabled
└─ Atomic swap: Build in temp, replace existing
└─ Timing: 2-10s (depending on document count)
- Mutex Release & Response (Backend)
└─ Returns: {success: true, total_files: 5, total_chunks: 247, processing_time: 18.3s}Reindexing Lock: Only one reindexing operation can run at a time. Concurrent requests receive HTTP 409 (Conflict). This prevents vector database corruption and race conditions.
Pipeline Flowchart
+----------------+
| User Selects |
| File |
+-------+--------+
|
v
+-------------------+
| Validation |---- Size > 50MB -------> HTTP 413
| (Size, Type) |---- Invalid Type ------> HTTP 400
+-------+-----------+
| Valid
v
+-------------------+
| HTTP Upload |
| (Direct Fetch) |
+-------+-----------+
|
v
+-------------------+
| Security |---- Path Traversal ----> HTTP 400
| Sanitization |---- Duplicate ---------> HTTP 200 (duplicate=true)
+-------+-----------+
|
v
+-------------------+
| Save to Disk |
+-------+-----------+
|
v
+-------------------+
| User Triggers |
| Reindexing |
+-------+-----------+
|
v
+-------------------+
| Acquire Lock |---- Already Running ---> HTTP 409
+-------+-----------+
|
v
+-------------------+
| Extract + Chunk |
| + Embed |
+-------+-----------+
|
v
+-------------------+
| Index (Qdrant) |
| Atomic Swap |
+-------+-----------+
|
v
+-------------------+
| Release Lock + |
| Return Success |
+-------------------+Query Pipeline (26 Stages)
The query processing pipeline orchestrates retrieval, caching, generation, and streaming.
Stage-by-Stage Breakdown
STAGE 1-3: Security & Input Processing (5-10ms)
├─ 1. Rate Limiting: 30 requests/60s per IP
├─ 2. Input Sanitization: Remove null bytes, control chars
└─ 3. Prompt Injection Detection: 20+ suspicious patterns
STAGE 4-5: Cache Lookup (0.86ms - 50ms)
├─ 4. L1 Exact Match: Redis hash lookup
└─ 5. L1 Normalized Match: Case/whitespace insensitive
STAGE 6: L1 Semantic Match (2-5ms)
└─ Cosine similarity > 0.95 threshold
STAGE 7: Conversation Memory Retrieval (1-2ms)
└─ Last N turns from Redis (if use_context=true)
STAGE 8: Query Classification (200ms)
└─ LLM-based: simple / moderate / complex
└─ Location: backend/_src/adaptive_retrieval.py
STAGE 9: Strategy Selection (less than 1ms)
├─ Simple: Dense vector only (top_k=3)
├─ Hybrid: Dense + BM25 with RRF (top_k=20)
└─ Advanced: Multi-query + HyDE + reranking (top_k=15)
STAGE 10-11: Embedding Generation (L2 Cache)
├─ 10. Check Redis embedding cache (0.86ms)
└─ 11. Generate if miss (50ms on CPU)
STAGE 12: Query Embedding (50ms)
└─ Model: BAAI/bge-large-en-v1.5
└─ Dimensions: 1024
└─ Device: CPU
STAGE 13-14: Vector Search (Qdrant)
├─ 13. Dense Search: HNSW index (3-5ms @ 1M docs)
└─ 14. Sparse Search: BM25-style keyword (if hybrid/advanced)
STAGE 15: Reciprocal Rank Fusion (RRF)
└─ Fuses dense + sparse results (k=60)
└─ Timing: 2-5ms
STAGE 16: BGE Reranking (60ms)
└─ Model: BAAI/bge-reranker-large
└─ Device: CPU fallback (GPU preferred)
└─ Processes: 32 documents
STAGE 17: Conversation Context Assembly (1-2ms)
└─ Combines retrieved docs + conversation history
STAGE 18: Prompt Construction (2-5ms)
└─ System prompt + context + user query
└─ Location: backend/_src/adaptive_retrieval.py
STAGE 19-21: LLM Generation (8-15 seconds)
├─ 19. KV Cache Warmup: First token latency
├─ 20. Token Generation: 80-100 tok/s (RTX 5080, Q5_K_M)
└─ 21. Streaming Response: SSE events every 10ms
STAGE 22: Frontend Token Buffering (continuous)
└─ Buffer tokens in ref, throttled flush at 60fps
└─ Reduces re-renders from 100+/s to 60/s
└─ Location: src/hooks/useChat.ts:tokenBufferRef
STAGE 23: Confidence Scoring (500ms, parallel)
├─ Retrieval quality: 30%
├─ Answer relevance: 40%
└─ Source consistency: 30%
STAGE 24: Response Formatting (1-2ms)
└─ JSON with answer, sources, metadata, timing
STAGE 25: Background Tasks (async, non-blocking)
├─ Store in conversation memory (Redis, TTL=1h)
├─ Cache result (L1 Redis)
└─ Trigger prefetching (L5 experimental)
STAGE 26: Response Return
└─ QueryResponse with complete timing breakdownCache Hit Fast Path: When L1 cache hits (60-80% of queries), stages 8-21 are skipped entirely. Response time: 50-100ms vs 8-15 seconds for cache miss.
Query Timing Comparison
| Scenario | Total Time | Breakdown |
|---|---|---|
| Cache Hit | 50-100ms | Redis lookup (0.86ms) + Deserialization (49ms) |
| Simple Mode (Miss) | 8-15s | Embedding (50ms) + Search (100ms) + Generation (8-12s) |
| Adaptive Mode (Miss) | 10-25s | Classification (200ms) + Hybrid Search (300ms) + Rerank (2s) + Generation (10-15s) |
Cross-Layer Communication
Sequence Diagram: Query with Streaming
+---------+ +---------+ +----------+ +----------+
| Browser | | Tauri | | FastAPI | | Qdrant |
+----+----+ +----+----+ +----+-----+ +----+-----+
| | | |
| useChat.sendMessage() | |
+------------>| | |
| | POST /api/query/stream |
| +------------->| |
| | | Check Rate Limit
| | | Sanitize Input
| | | Cache Lookup (L1-L2)
| | | |
| | | search() |
| | +------------->|
| | |<-------------+
| | | top_k results|
| | | |
| | | generate_stream()
| | | (llama.cpp) |
| | | |
| |<- SSE: token + |
|<------------+ | |
| Buffer Token| | |
| |<- SSE: token + |
|<------------+ | |
| Throttle @60fps | |
| |<- SSE: sources |
|<------------+ | |
| Update UI | | |
| |<- SSE: done + |
|<------------+ | |
| Final Update| | |
| | | Cache Result |
| | | (Background) |Event Streaming (SSE) Protocol
// SSE Event Types
type SSEEvent =
| { type: 'token', content: string } // Generated text token
| { type: 'sources', content: Source[] } // Retrieved documents
| { type: 'metadata', content: Metadata } // Timing, cache hit, strategy
| { type: 'done' } // Generation complete
| { type: 'error', content: string } // Error message
// Example SSE Stream
data: {"type":"token","content":"Based"}
data: {"type":"token","content":" on"}
data: {"type":"token","content":" the"}
...
data: {"type":"sources","content":[{"file_name":"doc.pdf","chunk_id":"abc123","relevance":0.92}]}
data: {"type":"metadata","content":{"processing_time_ms":8234,"cache_hit":false,"strategy":"hybrid"}}
data: {"type":"done"}Token Buffering: Frontend accumulates tokens in a ref and flushes at 60fps max. This prevents React re-renders from blocking the UI thread during high-frequency SSE events (100+ events/second).
Error Handling & Recovery
Multi-Layer Error Propagation
+--------------------------------------------------------------+
| Layer 1: Backend (Python) |
| - Exception caught by FastAPI error handler |
| - HTTP 500 + JSON: {error: "message", details: {...}} |
| - Logged to backend/logs/ with stack trace |
+--------------------------------------------------------------+
|
v
+--------------------------------------------------------------+
| Layer 2: Tauri Bridge (Rust) |
| - Result::Err propagates to frontend as rejected Promise |
| - Command errors logged via tauri-plugin-log |
+--------------------------------------------------------------+
|
v
+--------------------------------------------------------------+
| Layer 3: Frontend (React) |
| - ApiError exception caught by error boundary |
| - User-friendly message displayed in UI |
| - Error details logged to field operator console |
+--------------------------------------------------------------+Retry Strategies
// src/services/api.ts
const RETRY_CONFIG = {
maxRetries: 3,
delays: [1000, 2000, 4000], // Exponential backoff
retryableStatuses: [500, 502, 503, 504],
retryableErrors: ['NetworkError', 'TimeoutError']
};
async function fetchWithRetry(url: string, options: RequestInit): Promise<Response> {
for (let attempt = 0; attempt < RETRY_CONFIG.maxRetries; attempt++) {
try {
const response = await fetch(url, options);
if (response.ok || !RETRY_CONFIG.retryableStatuses.includes(response.status)) {
return response; // Success or non-retryable error
}
// Retryable HTTP error
if (attempt < RETRY_CONFIG.maxRetries - 1) {
await sleep(RETRY_CONFIG.delays[attempt]);
continue;
}
return response; // Final attempt failed
} catch (error) {
if (attempt === RETRY_CONFIG.maxRetries - 1) throw error;
await sleep(RETRY_CONFIG.delays[attempt]);
}
}
}Graceful Degradation
# backend/app/core/rag_engine.py
async def query(self, question: str) -> QueryResponse:
"""Multi-stage graceful degradation"""
try:
# Try advanced retrieval with reranking
return await self._query_with_reranking(question)
except Exception as e:
logger.warning(f"Reranking failed: {e}. Falling back to hybrid.")
try:
# Fallback: Hybrid retrieval without reranking
return await self._query_hybrid(question)
except Exception as e:
logger.error(f"Hybrid failed: {e}. Falling back to simple.")
# Final fallback: Simple dense retrieval
return await self._query_simple(question)Critical Path Protection: Model hotswap operations block query processing (503 response) to prevent incomplete responses. Health checks monitor component readiness before accepting queries.
State Synchronization
Frontend ↔ Backend Sync
// src/store/useStore.ts
const useStore = create<AppState>((set, get) => ({
// Ephemeral state (resets on app restart)
messages: [],
isOnline: true,
currentModel: null,
// Synced from backend
settings: null,
availableModels: [],
// Sync methods
async syncSettings() {
const settings = await api.getSettings();
set({ settings });
},
async syncModels() {
const models = await invoke<ModelInfo[]>('get_models_list');
set({ availableModels: models });
}
}));Backend State Persistence
| State Type | Storage | TTL | Persistence |
|---|---|---|---|
| Conversation Memory | Redis | 1 hour | Volatile |
| Query Cache (L1) | Redis | 7 days | RDB snapshots |
| Embedding Cache (L2) | Redis | 7 days | RDB snapshots |
| Vector Index | Qdrant | Permanent | Disk WAL |
| Runtime Settings | In-memory | Session | None |
| Model State | In-memory | Session | None |
Configuration Propagation
- User Updates Settings (Frontend)
└─ SettingsPanel.tsx: PUT /api/settings
- FastAPI Validates & Stores (Backend)
└─ Pydantic validation: k > 0, weight ∈ [0,1]
└─ RAGEngine.runtime_settings = new_settings
- Frontend Polls for Confirmation
└─ GET /api/settings
└─ Updates Zustand store on successNo Persistence: Runtime settings are stored in memory only and reset on backend restart. This is intentional to prevent configuration drift. Future: Consider config.yml persistence with hot-reload.
Caching Integration (ATLAS)
Multi-Level Cache Architecture
+----------------------------------------------------------------+
| L1: Query Cache (Redis Hash) |
| - Key: query:{hash(normalized_question)} |
| - Value: {answer, sources, metadata} |
| - Lookup Strategy: |
| 1. Exact match (0.86ms) |
| 2. Normalized match (case/whitespace) |
| 3. Semantic match (cosine > 0.95) |
| - TTL: 7 days |
| - Hit Rate: 60-80% in production |
+----------------------------------------------------------------+
| Cache Miss
v
+----------------------------------------------------------------+
| L2: Embedding Cache (Redis String) |
| - Key: emb:v1:{sha256(text)} |
| - Value: msgpack(np.float32[1024]) |
| - Latency Reduction: 98% (50ms -> <1ms) |
| - TTL: 7 days |
+----------------------------------------------------------------+
| Cache Miss
v
+----------------------------------------------------------------+
| L3: Conversation Memory (Redis List) |
| - Key: conv:{session_id} |
| - Value: [{role, content, timestamp}...] |
| - Max Size: 10 exchanges (ring buffer) |
| - TTL: 1 hour |
+----------------------------------------------------------------+
|
v
+----------------------------------------------------------------+
| L4: Model Cache (Docker Layer + HuggingFace) |
| - Pre-cached in Docker image build |
| - Location: /root/.cache/huggingface |
| - Startup Speedup: 15-20s saved |
+----------------------------------------------------------------+
|
v
+----------------------------------------------------------------+
| L5: Query Prefetcher (Experimental) |
| - Pattern detection -> predict next query |
| - Max concurrent: 3 prefetch tasks |
| - Background asyncio.create_task() |
+----------------------------------------------------------------+Cache Invalidation Rules
# Triggers for cache clearing
CACHE_CLEAR_EVENTS = {
'reindexing': 'Clear all (L1-L5) - documents changed',
'model_switch': 'Clear L1 only - answers incompatible',
'settings_update': 'No clear - settings applied on next query',
'conversation_clear': 'Clear L3 only - user requested',
}Performance Optimization
Pipeline Shortcuts
Full Pipeline (Cache Miss):
Security (10ms)
→ Cache Lookup (5ms)
→ Classification (200ms)
→ Embedding (50ms)
→ Search (100ms)
→ Reranking (2000ms)
→ Generation (10000ms)
→ Caching (5ms)
= 12,370ms
Cache Hit Pipeline:
Security (10ms)
→ Cache Lookup (0.86ms)
→ Deserialization (49ms)
= 59.86ms
Speedup: 206x (12.37s vs 0.06s)Optimization Impact
| Technique | Stage | Speedup | Tradeoff |
|---|---|---|---|
| KV Cache Preservation | Generation | 40-60% | Minor context bleed |
| Token Buffering | Frontend | 60fps cap | None |
| Parallel Init | Startup | 3.4x | None |
| Embedding Cache | Embedding | 98% | Redis dependency |
| BGE Reranker | Reranking | 85% | GPU preferred |
| Model Pre-cache | Startup | 15-20s | +9GB image size |
Monitoring & Observability
Health Check System
// src-tauri/src/commands.rs
#[tauri::command]
async fn check_atlas_health() -> Result<HealthStatus, String> {
let response = reqwest::get("http://localhost:8000/api/health")
.await?
.json::<HealthStatus>()
.await?;
Ok(response)
}# backend/app/api/health.py
@app.get("/api/health")
async def health_check() -> HealthResponse:
components = {
"vectorstore": "ready" if qdrant_client.is_ready() else "error",
"llm": "ready" if rag_engine.llm is not None else "error",
"bm25_retriever": "ready" if rag_engine.bm25 is not None else "error",
"cache": "ready" if redis_client.ping() else "error"
}
status = "healthy" if all(v == "ready" for v in components.values()) else "degraded"
return HealthResponse(status=status, components=components)Performance Metrics
// src/store/performanceStore.ts
interface QueryPerformance {
timestamp: number;
time: number;
cacheHit: boolean;
strategy: 'simple' | 'hybrid' | 'advanced';
queryType: string;
}
const computeStats = (queries: QueryPerformance[]) => ({
avgTime: queries.reduce((sum, q) => sum + q.time, 0) / queries.length,
fastestTime: Math.min(...queries.map(q => q.time)),
slowestTime: Math.max(...queries.map(q => q.time)),
cacheHitRate: queries.filter(q => q.cacheHit).length / queries.length
});Docker Monitoring Stack
- Prometheus: Scrapes metrics from FastAPI, Qdrant, Redis (15s interval)
- Grafana: Visualizes dashboards (Atlas Overview, Performance, Resources)
- cAdvisor: Container resource metrics (CPU, RAM, GPU)
- node-exporter: Host system metrics
Next Steps
Continue Learning: Now that you understand how Apollo’s layers integrate, dive into Core Concepts to learn about the RAG engine, retrieval strategies, and caching mechanisms.
Related Documentation
- Architecture Overview - High-level system design
- Backend Architecture - FastAPI RAG engine details
- Frontend Architecture - React component hierarchy
- Core Concepts - RAG fundamentals and algorithms