ATLAS Multi-Level Caching

Apollo RAG employs a sophisticated five-layer caching architecture (L1-L5) called the ATLAS Protocol that achieves 98% latency reduction for cached queries, dropping response times from 50-100ms to less than 1ms.

Caching Overview

Why ATLAS Matters

The ATLAS (Adaptive Tiered Latency-Aware Storage) protocol is designed to minimize redundant computation at every stage of the RAG pipeline:

  • L1 Query Cache: Exact, normalized, and semantic query matching
  • L2 Embedding Cache: Reusable vector embeddings for queries and documents
  • L3 Conversation Memory: Context-aware conversation history
  • L4 Model Cache: Pre-cached ML models in Docker layers
  • L5 Query Prefetcher: Predictive query prefetching (experimental)

Performance Impact: The multi-tier caching system achieves 60-80% cache hit rates in production, with less than 1ms response time for cache hits vs 8-15 seconds for cache misses.

Cache Performance Comparison

OperationWithout CacheWith CacheReduction
Query Lookup8-15 secondsless than 1ms99.99%
Embedding Generation50-100msless than 1ms98%
Model Loading15-20 secondsInstant100%
Conversation ContextN/AInstantN/A

L1: Query Cache

The L1 Query Cache implements three-stage matching for intelligent query result caching:

Matching Strategies

  • Exact Match (0.86ms avg)

    • Character-for-character comparison
    • Fastest, highest precision
    • Use case: Repeat queries
  • Normalized Match (1-2ms)

    • Case-insensitive
    • Whitespace-tolerant
    • Use case: “What is RAG?” vs “what is rag?”
  • Semantic Match (2-5ms)

    • Cosine similarity ≥ 0.95
    • Handles paraphrasing
    • Use case: “Explain RAG” vs “What does RAG mean?”

Implementation

# backend/_src/cache_next_gen.py
class NextGenCacheManager:
    async def get_query_result(self, query: str) -> Optional[QueryResult]:
        # Stage 1: Exact match
        result = await self.redis.hget("query_cache", query)
        if result:
            return self._deserialize(result)
 
        # Stage 2: Normalized match
        normalized = query.lower().strip()
        result = await self.redis.hget("query_cache_normalized", normalized)
        if result:
            return self._deserialize(result)
 
        # Stage 3: Semantic match (0.95 threshold)
        query_embedding = await self.embedding_cache.get(query)
        for cached_query, cached_result in await self._get_all_cached():
            similarity = cosine_similarity(query_embedding, cached_query.embedding)
            if similarity >= 0.95:
                return cached_result
 
        return None  # Cache miss

Configuration

# Default L1 settings
QUERY_CACHE_TTL = 7 * 24 * 60 * 60  # 7 days
QUERY_CACHE_MAX_SIZE = 10000
SEMANTIC_MATCH_THRESHOLD = 0.95

Cache Invalidation: The L1 cache is automatically cleared when switching LLM models, as cached answers may be incompatible with the new model’s response style.


L2: Embedding Cache

The L2 Embedding Cache eliminates redundant embedding computation, the most expensive operation after LLM generation.

Architecture

Storage: Redis string
Key Format: emb:v1:{text_hash}
Value Format: Pickled NumPy float32 array (msgpack compression)
TTL: 7 days (604800 seconds)
Hit Rate: 60-80% in production

Performance Impact

Without L2 cache:

  • Query embedding: 50-100ms (CPU, BGE-large-en-v1.5)
  • Document embedding (250 chunks): 12-25 seconds

With L2 cache:

  • Cache hit: less than 1ms (Redis lookup + deserialization)
  • Cache miss: 50-100ms (compute + store)

Net Result: 98% latency reduction on cache hits.

Implementation

# backend/_src/embedding_cache.py
class EmbeddingCache:
    def __init__(self, redis_client: Redis):
        self.redis = redis_client
        self.key_prefix = "emb:v1:"
 
    async def get(self, text: str) -> Optional[np.ndarray]:
        """Retrieve cached embedding"""
        key = self._make_key(text)
        cached = await self.redis.get(key)
 
        if cached:
            return msgpack.unpackb(cached, raw=False)
        return None
 
    async def put(self, text: str, embedding: np.ndarray):
        """Store embedding with 7-day TTL"""
        key = self._make_key(text)
        serialized = msgpack.packb(embedding.astype(np.float32).tobytes())
        await self.redis.setex(key, 604800, serialized)
 
    def _make_key(self, text: str) -> str:
        """Generate cache key from text hash"""
        text_hash = hashlib.sha256(text.encode()).hexdigest()[:16]
        return f"{self.key_prefix}{text_hash}"

Use Cases

  • Query Embeddings: Frequently asked questions
  • Document Embeddings: Reindexing operations with existing chunks
  • HyDE Embeddings: Repeated hypothetical document generations

Optimization Tip: The embedding cache is especially effective during reindexing, where document chunks often have high overlap with previous versions.


L3: Conversation Memory

The L3 Conversation Memory maintains context-aware conversation history using a ring buffer architecture.

Features

Type: In-memory ring buffer
Capacity: 10 exchanges (20 messages)
Summarization Threshold: 5 exchanges
Storage Lifetime: Session-based (cleared on restart)

Automatic Summarization

When the conversation exceeds 5 exchanges, Apollo automatically summarizes older context to stay within token limits:

# backend/_src/conversation_memory.py
class ConversationMemory:
    async def get_relevant_context_for_query(self, query: str) -> str:
        """Retrieve context with automatic summarization"""
        if len(self.history) > 5:
            # Summarize exchanges 1-3
            summary = await self._summarize_exchanges(self.history[:3])
            recent = self.history[-2:]  # Keep last 2 exchanges
            context = f"Previous context: {summary}\n\nRecent:\n{recent}"
        else:
            context = "\n".join([f"Q: {q}\nA: {a}" for q, a in self.history])
 
        return context
 
    async def _summarize_exchanges(self, exchanges: List[Tuple[str, str]]) -> str:
        """LLM-based summarization of conversation history"""
        prompt = f"Summarize these exchanges concisely:\n{exchanges}"
        summary = await self.llm.generate(prompt, max_tokens=150)
        return summary

Context Injection

# Integration with query processing
async def query(self, question: str, use_context: bool = True):
    if use_context:
        context = await self.conversation_memory.get_relevant_context_for_query(question)
        enhanced_question = f"{context}\n\nCurrent question: {question}"
    else:
        enhanced_question = question
 
    # Continue with retrieval...

Use Case: Multi-turn conversations like “Tell me about RAG” → “How does it compare to fine-tuning?” benefit significantly from L3 context injection.


L4: Model Cache

The L4 Model Cache pre-loads ML models during Docker image build to eliminate startup latency.

Cached Models

models = [
    "BAAI/bge-large-en-v1.5",           # Embedding model (1024-dim)
    "BAAI/bge-reranker-v2-m3",          # Fast reranker
    "cross-encoder/ms-marco-MiniLM-L-12-v2",  # Cross-encoder
]

Docker Layer Optimization

# Stage 3: Pre-cache HuggingFace models (backend/Dockerfile.atlas)
FROM python-deps AS model-cache
RUN python -c "from sentence_transformers import SentenceTransformer; \
    SentenceTransformer('BAAI/bge-large-en-v1.5'); \
    SentenceTransformer('BAAI/bge-reranker-v2-m3')"
 
# Models cached in /root/.cache/huggingface
ENV HF_HOME=/root/.cache/huggingface
ENV TRANSFORMERS_CACHE=/root/.cache/huggingface

Performance Impact

PhaseWithout L4With L4Improvement
Model Download60-120s0sInstant
Model Loading15-20s3-5s75% faster
Startup Time78s23s3.4x faster

Production Benefit: L4 caching combined with parallel initialization reduces Apollo’s startup time from 78 seconds to 20-30 seconds.


L5: Query Prefetcher (Experimental)

The L5 Query Prefetcher predictively caches likely next queries based on pattern detection.

Strategy

Type: Predictive prefetching
Implementation: backend/_src/query_prefetcher.py
Pattern Detection Window: Last 10 queries
Max Concurrent Prefetches: 3
Status: Experimental (disabled by default)

Pattern Detection

# backend/_src/query_prefetcher.py
class QueryPrefetcher:
    def on_query_received(self, query: str):
        """Detect patterns and prefetch likely next queries"""
        self.query_history.append(query)
 
        # Detect common patterns
        patterns = self._detect_patterns(self.query_history[-10:])
 
        # Generate predictions
        next_queries = self._predict_next_queries(query, patterns)
 
        # Prefetch top 3 predictions (non-blocking)
        for predicted_query in next_queries[:3]:
            asyncio.create_task(self._prefetch_query(predicted_query))
 
    async def _prefetch_query(self, query: str):
        """Background prefetch of predicted query"""
        try:
            # Generate embeddings and cache
            embedding = await self.embeddings.embed_query(query)
            await self.embedding_cache.put(query, embedding)
 
            # Optionally prefetch retrieval results
            # (commented out to avoid excessive computation)
            # results = await self.retriever.retrieve(query)
        except Exception as e:
            logger.debug(f"Prefetch failed for '{query}': {e}")

Use Cases

  • Follow-up Questions: “What is RAG?” → “How does RAG work?”
  • Topic Exploration: “Explain caching” → “How to configure cache TTL?”
  • Comparison Queries: “Qdrant vs ChromaDB” → “Which is faster?”

Status: L5 is experimental and disabled by default. Enable with caution as it increases background computation.


Cache Hit Performance

Before/After Metrics

Without ATLAS Caching:

Average Query Time: 12.5 seconds
Cache Hit Rate: 0%
Embedding Computation: Always required (50-100ms)
Model Loading: On every startup (15-20s)

With ATLAS Caching:

Average Query Time: 3.2 seconds (74% reduction)
Cache Hit Rate: 60-80%
Cache Hit Latency: less than 1ms (99.9% reduction)
Embedding Cache Hit: less than 1ms (98% reduction)
Model Loading: Pre-cached (instant)

Real-World Impact

# Example query timeline (CACHE MISS)
Query: "What is retrieval-augmented generation?"
 
L1 Lookup:        2ms  (miss)
L2 Embedding:    52ms  (miss, compute + store)
Retrieval:      100ms  (Qdrant HNSW)
Generation:    8500ms  (llama.cpp)
Confidence:     500ms  (parallel scoring)
L1 Store:         5ms  (background)
─────────────────────
Total:        9159ms
 
# Same query again (CACHE HIT)
L1 Lookup:       0.86ms (exact match)
Deserialization: 49ms
─────────────────────
Total:           50ms  (99.5% faster)

Cache Configuration

Redis Settings

# backend/_src/config.py
REDIS_CONFIG = {
    "host": "redis",              # Docker service name
    "port": 6379,
    "db": 0,
    "decode_responses": False,    # Binary data (embeddings)
    "socket_timeout": 5,
    "socket_connect_timeout": 5,
    "max_connections": 50,
 
    # Memory management
    "maxmemory": "8gb",
    "maxmemory_policy": "allkeys-lru",  # Evict least recently used
 
    # Persistence
    "save": ["900 1", "300 10"],  # RDB snapshots
}

TTL Configuration

# Time-To-Live settings
CACHE_TTL_CONFIG = {
    "query_cache": 7 * 24 * 60 * 60,      # 7 days
    "embedding_cache": 7 * 24 * 60 * 60,  # 7 days
    "conversation_memory": None,          # Session-based
}

Size Limits

# Maximum cache sizes
CACHE_SIZE_LIMITS = {
    "query_cache": 10000,         # Max 10K cached queries
    "embedding_cache": 50000,     # Max 50K cached embeddings
    "conversation_memory": 10,    # Max 10 exchanges per session
}

Tuning Recommendation: Increase TTL for stable document collections. Decrease for rapidly changing data sources.


Cache Invalidation Strategies

Manual Invalidation

# Clear all caches via API
POST /api/conversation/clear
 
# Clear specific cache via Redis CLI
redis-cli DEL "query_cache"
redis-cli KEYS "emb:v1:*" | xargs redis-cli DEL

Automatic Invalidation

  • Model Switch: L1 query cache cleared (old model’s answers incompatible)
  • Document Reindex: Embedding cache preserved (reusable chunks)
  • Session End: L3 conversation memory cleared
  • TTL Expiration: Redis automatically evicts expired keys

Selective Invalidation

# Invalidate specific query family
async def invalidate_query_family(self, query_prefix: str):
    """Remove all queries matching prefix"""
    keys = await self.redis.keys(f"query_cache:*{query_prefix}*")
    if keys:
        await self.redis.delete(*keys)

Monitoring Cache Performance

Metrics Tracking

# backend/app/core/rag_engine.py
class RAGEngine:
    async def query(self, question: str):
        # Track cache hit/miss
        cache_hit = False
        cache_latency = 0
 
        start = time.time()
        cached_result = await self.cache.get_query_result(question)
        cache_latency = (time.time() - start) * 1000
 
        if cached_result:
            cache_hit = True
            logger.info(f"Cache HIT: {cache_latency:.2f}ms")
        else:
            logger.info(f"Cache MISS: {cache_latency:.2f}ms (fallback to retrieval)")
 
        # Return metadata
        return QueryResponse(
            answer=result,
            cache_hit=cache_hit,
            timing={"cache_lookup_ms": cache_latency}
        )

Health Check Integration

GET /api/health
 
{
  "status": "healthy",
  "components": {
    "cache": "ready",        # L1/L2 Redis
    "vectorstore": "ready",
    "llm": "ready"
  },
  "cache_stats": {
    "hit_rate": 0.73,        # 73% hit rate
    "avg_hit_latency_ms": 0.86,
    "avg_miss_latency_ms": 12500
  }
}

Best Practices

1. Optimize for High Hit Rates

# Use semantic matching for paraphrased queries
SEMANTIC_MATCH_THRESHOLD = 0.95  # Balance precision vs recall
 
# Increase TTL for stable document collections
QUERY_CACHE_TTL = 14 * 24 * 60 * 60  # 14 days

2. Memory Management

# Monitor Redis memory usage
INFO memory
 
# Set eviction policy
config set maxmemory-policy allkeys-lru
 
# Monitor key counts
DBSIZE

3. Embedding Cache Optimization

# Batch embedding generation to leverage cache
async def embed_documents_batch(self, texts: List[str]) -> List[np.ndarray]:
    embeddings = []
    uncached_texts = []
    uncached_indices = []
 
    # Check cache first
    for i, text in enumerate(texts):
        cached = await self.embedding_cache.get(text)
        if cached is not None:
            embeddings.append(cached)
        else:
            uncached_texts.append(text)
            uncached_indices.append(i)
 
    # Batch compute uncached embeddings
    if uncached_texts:
        new_embeddings = await self.model.encode(uncached_texts)
        for idx, emb in zip(uncached_indices, new_embeddings):
            embeddings.insert(idx, emb)
            await self.embedding_cache.put(texts[idx], emb)
 
    return embeddings

4. Conversation Memory Management

# Clear conversation memory for new topics
if user_starts_new_topic:
    await conversation_memory.clear()
 
# Explicitly disable context for unrelated queries
response = await rag_engine.query(
    question="What's the weather today?",
    use_context=False  # Don't pollute conversation history
)

Next Steps

Learn about Adaptive Retrieval to understand how Apollo intelligently routes queries to different retrieval strategies based on complexity.

Related Topics: