ArchitectureData Flow & Integration

Integration & Data Flow

Apollo RAG employs a sophisticated three-tier architecture with dual communication patterns optimized for both native OS integration and high-throughput data transfer.

Integration Overview

Communication Architecture

Apollo uses dual communication for optimization:

  • Tauri IPC (JSON-RPC): Native features (health checks, model switching, notifications)
  • Direct HTTP: Data endpoints (queries, document upload, streaming responses)
+----------------------------------------------------------------+
|                      React Frontend                            |
|  WebView2 (Windows) / WebKit (macOS/Linux)                     |
+----------------------------------------------------------------+
            |                    |
            | Tauri IPC          | Direct HTTP
            | (Native Features)  | (Data Transfer)
            v                    v
+----------------------------------------------------------------+
|                   Tauri Bridge (Rust)                          |
|  - IPC command handlers                                        |
|  - Backend health monitoring                                   |
|  - HTTP proxy to FastAPI                                       |
+----------------------------------------------------------------+
                     |
                     | HTTP/REST
                     | Localhost:8000
                     v
+----------------------------------------------------------------+
|              FastAPI Backend (Python)                          |
|  - RAG orchestration engine                                    |
|  - llama.cpp GPU inference (CUDA)                              |
|  - Vector database (Qdrant)                                    |
|  - Multi-tier caching (Redis)                                  |
+----------------------------------------------------------------+

Why Dual Communication? Tauri IPC is optimized for small, frequent messages (health checks, commands), while direct HTTP is better for large payloads (document uploads, streaming responses). This hybrid approach achieves the best of both worlds.


Upload Pipeline (15 Stages)

The document ingestion pipeline processes files through multiple security, processing, and indexing stages.

Stage Breakdown

- File Selection (Frontend)
   └─ Location: src/components/Documents/DocumentUpload.tsx
   └─ Timing: less than 1ms (user action)

- Client-Side Validation (Frontend)
   ├─ Size check: MAX 50MB
   ├─ Type check: .pdf, .txt, .docx, .doc, .md
   └─ Timing: 1-2ms

- Multipart Upload (HTTP)
   └─ POST /api/documents/upload
   └─ Protocol: Direct Fetch (bypasses Tauri for large payloads)
   └─ Timing: 100ms - 5s (network + file size dependent)

- Filename Sanitization (Backend)
   └─ werkzeug.secure_filename() prevents path traversal
   └─ Timing: less than 1ms

- Extension Validation (Backend)
   └─ Whitelist check: [.pdf, .txt, .docx, .doc, .md]
   └─ Timing: less than 1ms

- SHA256 Hash Computation (Backend)
   └─ Duplicate detection
   └─ Timing: 50-200ms (file size dependent)

- Path Validation (Backend)
   └─ resolved_path.is_relative_to(documents_dir)
   └─ Prevents directory traversal
   └─ Timing: less than 1ms

- File Storage (Backend)
   └─ Write to backend/documents/
   └─ Timing: 10-500ms (disk I/O)

- User Triggers Reindexing (Frontend)
   └─ Manual button click
   └─ POST /api/documents/reindex

- Mutex Lock Acquisition (Backend)
   └─ Prevents concurrent reindexing
   └─ Returns HTTP 409 if already running
   └─ Timing: less than 1ms

- Document Scanning (Backend)
   └─ Scans documents/ directory
   └─ Extracts text from PDFs, DOCX
   └─ Location: backend/_src/document_processor.py
   └─ Timing: 500ms - 5s per document

- Semantic Chunking (Backend)
   └─ Chunk size: 1024 tokens
   └─ Overlap: 128 tokens
   └─ Timing: 100-500ms per document

- Embedding Generation (Backend)
   └─ Model: BAAI/bge-large-en-v1.5
   └─ Device: CPU (RTX 5080 PyTorch incompatible)
   └─ Dimensions: 1024
   └─ Timing: 50ms per chunk

- Vector Indexing (Qdrant)
   └─ Dense vectors + Sparse BM25
   └─ Hybrid search enabled
   └─ Atomic swap: Build in temp, replace existing
   └─ Timing: 2-10s (depending on document count)

- Mutex Release & Response (Backend)
   └─ Returns: {success: true, total_files: 5, total_chunks: 247, processing_time: 18.3s}

Reindexing Lock: Only one reindexing operation can run at a time. Concurrent requests receive HTTP 409 (Conflict). This prevents vector database corruption and race conditions.

Pipeline Flowchart

+----------------+
| User Selects   |
|     File       |
+-------+--------+
        |
        v
+-------------------+
|   Validation      |---- Size > 50MB -------> HTTP 413
|  (Size, Type)     |---- Invalid Type ------> HTTP 400
+-------+-----------+
        | Valid
        v
+-------------------+
|  HTTP Upload      |
|  (Direct Fetch)   |
+-------+-----------+
        |
        v
+-------------------+
|    Security       |---- Path Traversal ----> HTTP 400
|   Sanitization    |---- Duplicate ---------> HTTP 200 (duplicate=true)
+-------+-----------+
        |
        v
+-------------------+
|  Save to Disk     |
+-------+-----------+
        |
        v
+-------------------+
| User Triggers     |
|   Reindexing      |
+-------+-----------+
        |
        v
+-------------------+
|  Acquire Lock     |---- Already Running ---> HTTP 409
+-------+-----------+
        |
        v
+-------------------+
| Extract + Chunk   |
|     + Embed       |
+-------+-----------+
        |
        v
+-------------------+
|  Index (Qdrant)   |
|  Atomic Swap      |
+-------+-----------+
        |
        v
+-------------------+
| Release Lock +    |
| Return Success    |
+-------------------+

Query Pipeline (26 Stages)

The query processing pipeline orchestrates retrieval, caching, generation, and streaming.

Stage-by-Stage Breakdown

STAGE 1-3: Security & Input Processing (5-10ms)
├─ 1. Rate Limiting: 30 requests/60s per IP
├─ 2. Input Sanitization: Remove null bytes, control chars
└─ 3. Prompt Injection Detection: 20+ suspicious patterns

STAGE 4-5: Cache Lookup (0.86ms - 50ms)
├─ 4. L1 Exact Match: Redis hash lookup
└─ 5. L1 Normalized Match: Case/whitespace insensitive

STAGE 6: L1 Semantic Match (2-5ms)
└─ Cosine similarity > 0.95 threshold

STAGE 7: Conversation Memory Retrieval (1-2ms)
└─ Last N turns from Redis (if use_context=true)

STAGE 8: Query Classification (200ms)
└─ LLM-based: simple / moderate / complex
└─ Location: backend/_src/adaptive_retrieval.py

STAGE 9: Strategy Selection (less than 1ms)
├─ Simple: Dense vector only (top_k=3)
├─ Hybrid: Dense + BM25 with RRF (top_k=20)
└─ Advanced: Multi-query + HyDE + reranking (top_k=15)

STAGE 10-11: Embedding Generation (L2 Cache)
├─ 10. Check Redis embedding cache (0.86ms)
└─ 11. Generate if miss (50ms on CPU)

STAGE 12: Query Embedding (50ms)
└─ Model: BAAI/bge-large-en-v1.5
└─ Dimensions: 1024
└─ Device: CPU

STAGE 13-14: Vector Search (Qdrant)
├─ 13. Dense Search: HNSW index (3-5ms @ 1M docs)
└─ 14. Sparse Search: BM25-style keyword (if hybrid/advanced)

STAGE 15: Reciprocal Rank Fusion (RRF)
└─ Fuses dense + sparse results (k=60)
└─ Timing: 2-5ms

STAGE 16: BGE Reranking (60ms)
└─ Model: BAAI/bge-reranker-large
└─ Device: CPU fallback (GPU preferred)
└─ Processes: 32 documents

STAGE 17: Conversation Context Assembly (1-2ms)
└─ Combines retrieved docs + conversation history

STAGE 18: Prompt Construction (2-5ms)
└─ System prompt + context + user query
└─ Location: backend/_src/adaptive_retrieval.py

STAGE 19-21: LLM Generation (8-15 seconds)
├─ 19. KV Cache Warmup: First token latency
├─ 20. Token Generation: 80-100 tok/s (RTX 5080, Q5_K_M)
└─ 21. Streaming Response: SSE events every 10ms

STAGE 22: Frontend Token Buffering (continuous)
└─ Buffer tokens in ref, throttled flush at 60fps
└─ Reduces re-renders from 100+/s to 60/s
└─ Location: src/hooks/useChat.ts:tokenBufferRef

STAGE 23: Confidence Scoring (500ms, parallel)
├─ Retrieval quality: 30%
├─ Answer relevance: 40%
└─ Source consistency: 30%

STAGE 24: Response Formatting (1-2ms)
└─ JSON with answer, sources, metadata, timing

STAGE 25: Background Tasks (async, non-blocking)
├─ Store in conversation memory (Redis, TTL=1h)
├─ Cache result (L1 Redis)
└─ Trigger prefetching (L5 experimental)

STAGE 26: Response Return
└─ QueryResponse with complete timing breakdown

Cache Hit Fast Path: When L1 cache hits (60-80% of queries), stages 8-21 are skipped entirely. Response time: 50-100ms vs 8-15 seconds for cache miss.

Query Timing Comparison

ScenarioTotal TimeBreakdown
Cache Hit50-100msRedis lookup (0.86ms) + Deserialization (49ms)
Simple Mode (Miss)8-15sEmbedding (50ms) + Search (100ms) + Generation (8-12s)
Adaptive Mode (Miss)10-25sClassification (200ms) + Hybrid Search (300ms) + Rerank (2s) + Generation (10-15s)

Cross-Layer Communication

Sequence Diagram: Query with Streaming

+---------+   +---------+   +----------+   +----------+
| Browser |   |  Tauri  |   | FastAPI  |   | Qdrant   |
+----+----+   +----+----+   +----+-----+   +----+-----+
     |             |              |              |
     | useChat.sendMessage()      |              |
     +------------>|              |              |
     |             | POST /api/query/stream      |
     |             +------------->|              |
     |             |              | Check Rate Limit
     |             |              | Sanitize Input
     |             |              | Cache Lookup (L1-L2)
     |             |              |              |
     |             |              | search()     |
     |             |              +------------->|
     |             |              |<-------------+
     |             |              | top_k results|
     |             |              |              |
     |             |              | generate_stream()
     |             |              | (llama.cpp)  |
     |             |              |              |
     |             |<- SSE: token +              |
     |<------------+              |              |
     | Buffer Token|              |              |
     |             |<- SSE: token +              |
     |<------------+              |              |
     | Throttle @60fps            |              |
     |             |<- SSE: sources              |
     |<------------+              |              |
     | Update UI   |              |              |
     |             |<- SSE: done  +              |
     |<------------+              |              |
     | Final Update|              |              |
     |             |              | Cache Result |
     |             |              | (Background) |

Event Streaming (SSE) Protocol

// SSE Event Types
type SSEEvent =
  | { type: 'token', content: string }         // Generated text token
  | { type: 'sources', content: Source[] }     // Retrieved documents
  | { type: 'metadata', content: Metadata }    // Timing, cache hit, strategy
  | { type: 'done' }                           // Generation complete
  | { type: 'error', content: string }         // Error message
 
// Example SSE Stream
data: {"type":"token","content":"Based"}
data: {"type":"token","content":" on"}
data: {"type":"token","content":" the"}
...
data: {"type":"sources","content":[{"file_name":"doc.pdf","chunk_id":"abc123","relevance":0.92}]}
data: {"type":"metadata","content":{"processing_time_ms":8234,"cache_hit":false,"strategy":"hybrid"}}
data: {"type":"done"}

Token Buffering: Frontend accumulates tokens in a ref and flushes at 60fps max. This prevents React re-renders from blocking the UI thread during high-frequency SSE events (100+ events/second).


Error Handling & Recovery

Multi-Layer Error Propagation

+--------------------------------------------------------------+
| Layer 1: Backend (Python)                                   |
| - Exception caught by FastAPI error handler                 |
| - HTTP 500 + JSON: {error: "message", details: {...}}       |
| - Logged to backend/logs/ with stack trace                  |
+--------------------------------------------------------------+
                     |
                     v
+--------------------------------------------------------------+
| Layer 2: Tauri Bridge (Rust)                                |
| - Result::Err propagates to frontend as rejected Promise    |
| - Command errors logged via tauri-plugin-log                |
+--------------------------------------------------------------+
                     |
                     v
+--------------------------------------------------------------+
| Layer 3: Frontend (React)                                   |
| - ApiError exception caught by error boundary               |
| - User-friendly message displayed in UI                     |
| - Error details logged to field operator console            |
+--------------------------------------------------------------+

Retry Strategies

// src/services/api.ts
const RETRY_CONFIG = {
  maxRetries: 3,
  delays: [1000, 2000, 4000], // Exponential backoff
  retryableStatuses: [500, 502, 503, 504],
  retryableErrors: ['NetworkError', 'TimeoutError']
};
 
async function fetchWithRetry(url: string, options: RequestInit): Promise<Response> {
  for (let attempt = 0; attempt < RETRY_CONFIG.maxRetries; attempt++) {
    try {
      const response = await fetch(url, options);
 
      if (response.ok || !RETRY_CONFIG.retryableStatuses.includes(response.status)) {
        return response; // Success or non-retryable error
      }
 
      // Retryable HTTP error
      if (attempt < RETRY_CONFIG.maxRetries - 1) {
        await sleep(RETRY_CONFIG.delays[attempt]);
        continue;
      }
 
      return response; // Final attempt failed
 
    } catch (error) {
      if (attempt === RETRY_CONFIG.maxRetries - 1) throw error;
      await sleep(RETRY_CONFIG.delays[attempt]);
    }
  }
}

Graceful Degradation

# backend/app/core/rag_engine.py
async def query(self, question: str) -> QueryResponse:
    """Multi-stage graceful degradation"""
 
    try:
        # Try advanced retrieval with reranking
        return await self._query_with_reranking(question)
    except Exception as e:
        logger.warning(f"Reranking failed: {e}. Falling back to hybrid.")
 
        try:
            # Fallback: Hybrid retrieval without reranking
            return await self._query_hybrid(question)
        except Exception as e:
            logger.error(f"Hybrid failed: {e}. Falling back to simple.")
 
            # Final fallback: Simple dense retrieval
            return await self._query_simple(question)

Critical Path Protection: Model hotswap operations block query processing (503 response) to prevent incomplete responses. Health checks monitor component readiness before accepting queries.


State Synchronization

Frontend ↔ Backend Sync

// src/store/useStore.ts
const useStore = create<AppState>((set, get) => ({
  // Ephemeral state (resets on app restart)
  messages: [],
  isOnline: true,
  currentModel: null,
 
  // Synced from backend
  settings: null,
  availableModels: [],
 
  // Sync methods
  async syncSettings() {
    const settings = await api.getSettings();
    set({ settings });
  },
 
  async syncModels() {
    const models = await invoke<ModelInfo[]>('get_models_list');
    set({ availableModels: models });
  }
}));

Backend State Persistence

State TypeStorageTTLPersistence
Conversation MemoryRedis1 hourVolatile
Query Cache (L1)Redis7 daysRDB snapshots
Embedding Cache (L2)Redis7 daysRDB snapshots
Vector IndexQdrantPermanentDisk WAL
Runtime SettingsIn-memorySessionNone
Model StateIn-memorySessionNone

Configuration Propagation

- User Updates Settings (Frontend)
   └─ SettingsPanel.tsx: PUT /api/settings

- FastAPI Validates & Stores (Backend)
   └─ Pydantic validation: k > 0, weight ∈ [0,1]
   └─ RAGEngine.runtime_settings = new_settings

- Frontend Polls for Confirmation
   └─ GET /api/settings
   └─ Updates Zustand store on success

No Persistence: Runtime settings are stored in memory only and reset on backend restart. This is intentional to prevent configuration drift. Future: Consider config.yml persistence with hot-reload.


Caching Integration (ATLAS)

Multi-Level Cache Architecture

+----------------------------------------------------------------+
| L1: Query Cache (Redis Hash)                                  |
| - Key: query:{hash(normalized_question)}                      |
| - Value: {answer, sources, metadata}                          |
| - Lookup Strategy:                                            |
|   1. Exact match (0.86ms)                                     |
|   2. Normalized match (case/whitespace)                       |
|   3. Semantic match (cosine > 0.95)                           |
| - TTL: 7 days                                                 |
| - Hit Rate: 60-80% in production                              |
+----------------------------------------------------------------+
         | Cache Miss
         v
+----------------------------------------------------------------+
| L2: Embedding Cache (Redis String)                            |
| - Key: emb:v1:{sha256(text)}                                  |
| - Value: msgpack(np.float32[1024])                            |
| - Latency Reduction: 98% (50ms -> <1ms)                       |
| - TTL: 7 days                                                 |
+----------------------------------------------------------------+
         | Cache Miss
         v
+----------------------------------------------------------------+
| L3: Conversation Memory (Redis List)                          |
| - Key: conv:{session_id}                                      |
| - Value: [{role, content, timestamp}...]                      |
| - Max Size: 10 exchanges (ring buffer)                        |
| - TTL: 1 hour                                                 |
+----------------------------------------------------------------+
         |
         v
+----------------------------------------------------------------+
| L4: Model Cache (Docker Layer + HuggingFace)                  |
| - Pre-cached in Docker image build                            |
| - Location: /root/.cache/huggingface                          |
| - Startup Speedup: 15-20s saved                               |
+----------------------------------------------------------------+
         |
         v
+----------------------------------------------------------------+
| L5: Query Prefetcher (Experimental)                           |
| - Pattern detection -> predict next query                     |
| - Max concurrent: 3 prefetch tasks                            |
| - Background asyncio.create_task()                            |
+----------------------------------------------------------------+

Cache Invalidation Rules

# Triggers for cache clearing
CACHE_CLEAR_EVENTS = {
    'reindexing': 'Clear all (L1-L5) - documents changed',
    'model_switch': 'Clear L1 only - answers incompatible',
    'settings_update': 'No clear - settings applied on next query',
    'conversation_clear': 'Clear L3 only - user requested',
}

Performance Optimization

Pipeline Shortcuts

Full Pipeline (Cache Miss):
  Security (10ms)
  → Cache Lookup (5ms)
  → Classification (200ms)
  → Embedding (50ms)
  → Search (100ms)
  → Reranking (2000ms)
  → Generation (10000ms)
  → Caching (5ms)
  = 12,370ms

Cache Hit Pipeline:
  Security (10ms)
  → Cache Lookup (0.86ms)
  → Deserialization (49ms)
  = 59.86ms

Speedup: 206x (12.37s vs 0.06s)

Optimization Impact

TechniqueStageSpeedupTradeoff
KV Cache PreservationGeneration40-60%Minor context bleed
Token BufferingFrontend60fps capNone
Parallel InitStartup3.4xNone
Embedding CacheEmbedding98%Redis dependency
BGE RerankerReranking85%GPU preferred
Model Pre-cacheStartup15-20s+9GB image size

Monitoring & Observability

Health Check System

// src-tauri/src/commands.rs
#[tauri::command]
async fn check_atlas_health() -> Result<HealthStatus, String> {
    let response = reqwest::get("http://localhost:8000/api/health")
        .await?
        .json::<HealthStatus>()
        .await?;
 
    Ok(response)
}
# backend/app/api/health.py
@app.get("/api/health")
async def health_check() -> HealthResponse:
    components = {
        "vectorstore": "ready" if qdrant_client.is_ready() else "error",
        "llm": "ready" if rag_engine.llm is not None else "error",
        "bm25_retriever": "ready" if rag_engine.bm25 is not None else "error",
        "cache": "ready" if redis_client.ping() else "error"
    }
 
    status = "healthy" if all(v == "ready" for v in components.values()) else "degraded"
 
    return HealthResponse(status=status, components=components)

Performance Metrics

// src/store/performanceStore.ts
interface QueryPerformance {
  timestamp: number;
  time: number;
  cacheHit: boolean;
  strategy: 'simple' | 'hybrid' | 'advanced';
  queryType: string;
}
 
const computeStats = (queries: QueryPerformance[]) => ({
  avgTime: queries.reduce((sum, q) => sum + q.time, 0) / queries.length,
  fastestTime: Math.min(...queries.map(q => q.time)),
  slowestTime: Math.max(...queries.map(q => q.time)),
  cacheHitRate: queries.filter(q => q.cacheHit).length / queries.length
});

Docker Monitoring Stack

  • Prometheus: Scrapes metrics from FastAPI, Qdrant, Redis (15s interval)
  • Grafana: Visualizes dashboards (Atlas Overview, Performance, Resources)
  • cAdvisor: Container resource metrics (CPU, RAM, GPU)
  • node-exporter: Host system metrics

Next Steps

Continue Learning: Now that you understand how Apollo’s layers integrate, dive into Core Concepts to learn about the RAG engine, retrieval strategies, and caching mechanisms.