Real-Time Streaming

Apollo uses Server-Sent Events (SSE) for real-time response streaming, optimized for smooth 60fps rendering with advanced token buffering and backpressure handling.

Streaming Overview

Traditional request-response patterns wait for complete LLM generation before displaying results (8-15 seconds). Streaming provides immediate feedback by sending tokens as they’re generated.

Why Streaming Matters:

  • Perceived Performance: Users see responses 8-10x faster (TTFT less than 500ms vs full 8-15s)
  • Better UX: Progressive rendering reduces cognitive load
  • Cancellation: Users can stop generation mid-stream to save resources
  • Live Monitoring: Real-time performance metrics and health indicators

Apollo achieves 80-100 tokens/sec inference speed on RTX 5080 with Q5_K_M quantization.

SSE Protocol

Connection Management

Server-Sent Events use HTTP streaming with automatic reconnection and keep-alive:

// Frontend: src/hooks/useStreamingChat.ts
const sendMessageStream = async (request, callbacks) => {
  const abortController = new AbortController();
 
  const response = await fetch('/api/query/stream', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify(request),
    signal: abortController.signal
  });
 
  const reader = response.body!.getReader();
  const decoder = new TextDecoder();
  let buffer = '';
 
  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
 
    buffer += decoder.decode(value, { stream: true });
    const lines = buffer.split('\n');
    buffer = lines.pop() || ''; // Keep incomplete line
 
    for (const line of lines) {
      if (!line.startsWith('data: ')) continue;
      const event = JSON.parse(line.slice(6));
      handleEvent(event, callbacks);
    }
  }
};

Backend Implementation (FastAPI):

# Backend: app/api/query.py
from fastapi.responses import StreamingResponse
 
@app.post("/api/query/stream")
async def query_stream_endpoint(request: QueryRequest):
    async def event_generator():
        try:
            # Stream tokens
            async for token in llm_engine.generate_stream(prompt):
                yield f"data: {json.dumps({'type': 'token', 'content': token})}\n\n"
 
            # Send sources
            yield f"data: {json.dumps({'type': 'sources', 'content': sources})}\n\n"
 
            # Send metadata
            yield f"data: {json.dumps({'type': 'metadata', 'content': metadata})}\n\n"
 
            # Signal completion
            yield f"data: {json.dumps({'type': 'done'})}\n\n"
 
        except Exception as e:
            yield f"data: {json.dumps({'type': 'error', 'content': str(e)})}\n\n"
 
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            'Cache-Control': 'no-cache',
            'X-Accel-Buffering': 'no'  # Disable nginx buffering
        }
    )

Token Buffering (60fps Optimization)

Problem: Streaming 80-100 tokens/sec causes 100+ React re-renders/sec, freezing the UI.

Solution: Buffer tokens in a ref and throttle UI updates to 60fps (16ms frame budget).

// Frontend: src/hooks/useChat.ts
const tokenBufferRef = useRef<string>('');
 
const flushTokens = useCallback(() => {
  if (tokenBufferRef.current) {
    appendToLastMessage(tokenBufferRef.current);
    tokenBufferRef.current = '';
  }
}, [appendToLastMessage]);
 
// Throttle to 60fps (16ms)
const throttledFlushTokens = useThrottledCallback(flushTokens, 16);
 
const onToken = useCallback((token: string) => {
  tokenBufferRef.current += token;  // Accumulate
  throttledFlushTokens();           // Flush at 60fps max
}, [throttledFlushTokens]);

Performance Gain: Reduces re-renders from 100+/sec to 60/sec (40% reduction), maintaining smooth 60fps rendering.

Throttling Implementation

// Frontend: src/hooks/useThrottledCallback.ts
export const useThrottledCallback = (
  callback: Function,
  delay: number
): Function => {
  const timeoutRef = useRef<number | null>(null);
  const lastCallRef = useRef<number>(0);
 
  return useCallback((...args: any[]) => {
    const now = Date.now();
    const timeSinceLastCall = now - lastCallRef.current;
 
    if (timeSinceLastCall >= delay) {
      callback(...args);
      lastCallRef.current = now;
    } else {
      if (timeoutRef.current) clearTimeout(timeoutRef.current);
      timeoutRef.current = setTimeout(() => {
        callback(...args);
        lastCallRef.current = Date.now();
      }, delay - timeSinceLastCall);
    }
  }, [callback, delay]);
};

Backpressure Handling

Apollo implements multi-level flow control to prevent overwhelming the client:

Server-Side Backpressure

# Backend: _src/llm_engine_llamacpp.py
async def generate_stream(self, prompt: str):
    semaphore = asyncio.Semaphore(1)  # Single concurrent generation
 
    async with semaphore:
        for token in self.llm.create_completion(
            prompt,
            max_tokens=512,
            stream=True,
            temperature=0.0
        ):
            # Rate limiting: ~80-100 tok/s on RTX 5080
            yield token['choices'][0]['text']
            await asyncio.sleep(0.001)  # Prevent overwhelming network

Client-Side Cancellation

// Frontend: src/hooks/useStreamingChat.ts
const cancelStream = useCallback(() => {
  if (abortControllerRef.current) {
    abortControllerRef.current.abort();
    abortControllerRef.current = null;
    setIsStreaming(false);
  }
}, []);
 
// User can cancel via UI button
<button onClick={cancelStream}>Cancel</button>

React Rendering Optimization

React.memo with Custom Comparison

Prevent unnecessary re-renders of completed messages:

// Frontend: src/components/Chat/ChatMessage.tsx
const arePropsEqual = (prev: ChatMessageProps, next: ChatMessageProps) => {
  return (
    prev.message.id === next.message.id &&
    prev.message.content === next.message.content &&
    prev.message.isStreaming === next.message.isStreaming &&
    prev.message.sources?.length === next.message.sources?.length
  );
};
 
export default React.memo(ChatMessage, arePropsEqual);

Best Practice: Only compare fields that affect rendering. Avoid deep comparison of large objects.

Flushing Before Metadata Updates

Ensure all tokens are rendered before adding sources/metadata:

// Frontend: src/hooks/useChat.ts
const onSources = useCallback((sources: Source[]) => {
  flushTokens();  // Force flush before updating sources
  updateLastMessage({ sources });
}, [flushTokens, updateLastMessage]);
 
const onMetadata = useCallback((metadata: QueryMetadata) => {
  flushTokens();  // Force flush before metadata
  updateLastMessage({ metadata, isStreaming: false });
}, [flushTokens, updateLastMessage]);

Stream Events

Event Types

type StreamEvent =
  | { type: 'token'; content: string }
  | { type: 'sources'; content: Source[] }
  | { type: 'metadata'; content: QueryMetadata }
  | { type: 'done' }
  | { type: 'error'; content: string };

Event Data Format

// Token event
data: {"type":"token","content":"Apollo"}
 
// Sources event
data: {"type":"sources","content":[{"content":"...","file_name":"doc.pdf","relevance_score":0.89}]}
 
// Metadata event
data: {"type":"metadata","content":{"processing_time":12.4,"cache_hit":false,"mode":"simple"}}
 
// Completion event
data: {"type":"done"}

Error Handling

Connection Recovery

// Frontend: src/hooks/useStreamingChat.ts
const sendMessageStream = async (request, callbacks) => {
  try {
    // ... streaming logic
  } catch (error) {
    if (error.name === 'AbortError') {
      // User cancellation - silent
      return;
    }
 
    // Network error - notify user
    callbacks.onError(
      'Connection lost. Check backend health and retry.'
    );
  }
};

Graceful Degradation

If streaming fails, fallback to non-streaming:

// Frontend: src/hooks/useChat.ts
const sendMessage = async (question: string) => {
  if (settings.streamResponse) {
    try {
      await sendMessageStream(request, callbacks);
    } catch (error) {
      console.warn('Streaming failed, falling back to batch', error);
      await sendMessageBatch(request);
    }
  } else {
    await sendMessageBatch(request);
  }
};

Performance Considerations

Memory Leak Risk: Clear timeouts and abort controllers on unmount to prevent memory leaks.

Cleanup on Unmount

useEffect(() => {
  return () => {
    if (abortControllerRef.current) {
      abortControllerRef.current.abort();
    }
    if (timeoutRef.current) {
      clearTimeout(timeoutRef.current);
    }
  };
}, []);

Network Efficiency

  • Keep-Alive: SSE maintains single HTTP connection (no polling overhead)
  • Compression: Use gzip for event-stream (50% bandwidth reduction)
  • Buffering Disabled: X-Accel-Buffering: no for nginx to prevent delays

Monitoring Streams

Track streaming performance in real-time:

// Frontend: src/store/performanceStore.ts
const trackStreamingPerformance = (metadata: QueryMetadata) => {
  addQueryHistory({
    timestamp: Date.now(),
    time: metadata.processing_time,
    cacheHit: metadata.cache_hit,
    mode: metadata.mode,
    streaming: true
  });
};

Next Steps