Skip to main content
The components directory contains reusable, self-contained Haystack pipeline building blocks that implement specific retrieval and generation sub-tasks. Feature pipelines compose these components rather than reimplementing the same logic.

Component overview

AgenticRouter

LLM-based decision-making for agentic RAG with tool selection and self-reflection

ContextCompressor

Reduces retrieved context using abstractive, extractive, or relevance filtering

QueryEnhancer

Multi-query, HyDE, and step-back query expansion

ResultMerger

RRF and weighted fusion for hybrid search results

AgenticRouter

An LLM-based decision-making component for agentic RAG pipelines.

Capabilities

  • Tool selection: Given a query, selects the appropriate processing path ("retrieval", "web_search", "calculation", or "reasoning")
  • Answer quality evaluation: Sends the query, draft answer, and retrieved context to the LLM and receives a JSON-structured assessment
  • Refinement decision: Computes whether the average quality score falls below a threshold
  • Answer refinement: Given issues and suggestions from evaluation, sends a targeted revision request to the LLM
  • Self-reflection loop: Orchestrates the full evaluate-refine cycle for up to max_iterations rounds

Implementation

src/vectordb/haystack/components/agentic_router.py
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from haystack.utils import Secret

class AgenticRouter:
    """Route and orchestrate RAG with agent-like behavior."""

    def __init__(
        self,
        model: str = "llama-3.3-70b-versatile",
        api_key: str | None = None,
        api_base_url: str = "https://api.groq.com/openai/v1",
    ) -> None:
        """Initialize agentic router."""
        resolved_api_key = api_key or os.environ.get("GROQ_API_KEY")
        if not resolved_api_key:
            raise ValueError("GROQ_API_KEY required")

        # Temperature=0 ensures deterministic routing decisions
        self.generator = OpenAIChatGenerator(
            api_key=Secret.from_token(resolved_api_key),
            model=model,
            api_base_url=api_base_url,
            generation_kwargs={"temperature": 0, "max_tokens": 1024},
        )

        self.available_tools = [
            "retrieval",
            "web_search",
            "calculation",
            "reasoning",
        ]

    def select_tool(self, query: str) -> str:
        """Select the best tool for a query."""
        tools_str = ", ".join(self.available_tools)
        prompt = f"""Given this query: "{query}"

Select the BEST tool to answer it. Options: {tools_str}

- retrieval: For factual information from a knowledge base
- web_search: For current events, real-time information
- calculation: For mathematical or computational problems
- reasoning: For multi-step logic or analysis

Return ONLY the tool name."""

        messages = [ChatMessage.from_user(prompt)]
        response = self.generator.run(messages=messages)
        tool = response["replies"][0].text.strip().lower()
        
        if tool not in self.available_tools:
            tool = "retrieval"  # Fallback
        
        return tool

    def evaluate_answer_quality(
        self,
        query: str,
        answer: str,
        context: str = "",
    ) -> dict[str, Any]:
        """Evaluate generated answer quality.
        
        Returns:
            Dict with relevance, completeness, grounding scores (0-100),
            plus issues and suggestions lists.
        """
        prompt = f"""Evaluate this answer to the query.

Query: "{query}"
Answer: "{answer}"
Context: "{context}"

Assess:
1. Relevance (0-100): Does it answer the query?
2. Completeness (0-100): Is it sufficiently detailed?
3. Grounding (0-100): Is it grounded in the context?
4. Issues: List any problems (max 3)
5. Suggestions: List improvements (max 2)

Format as JSON:
{{"relevance": X, "completeness": X, "grounding": X, "issues": [...], "suggestions": [...]}}

Return ONLY the JSON."""

        messages = [ChatMessage.from_user(prompt)]
        response = self.generator.run(messages=messages)
        content = response["replies"][0].text
        
        return json.loads(content.strip())

    def self_reflect_loop(
        self,
        query: str,
        answer: str,
        context: str = "",
        max_iterations: int = 2,
        quality_threshold: int = 75,
    ) -> str:
        """Run self-reflection loop to iteratively improve answer."""
        current_answer = answer

        for iteration in range(max_iterations):
            eval_result = self.evaluate_answer_quality(query, current_answer, context)
            
            # Exit early if quality is acceptable
            avg_score = (
                eval_result.get("relevance", 0)
                + eval_result.get("completeness", 0)
                + eval_result.get("grounding", 0)
            ) / 3
            
            if avg_score >= quality_threshold:
                break
            
            # Refine answer based on feedback
            current_answer = self.refine_answer(query, current_answer, eval_result)
        
        return current_answer

Usage

from vectordb.haystack.components import AgenticRouter

router = AgenticRouter(model="llama-3.3-70b-versatile")

# Tool selection
tool = router.select_tool("What is quantum entanglement?")
# → "retrieval"

# Answer quality evaluation
quality = router.evaluate_answer_quality(query, answer, context)
# → {"relevance": 85, "completeness": 70, "grounding": 90, ...}

# Self-reflection loop
final_answer = router.self_reflect_loop(
    query, draft_answer, context, max_iterations=2
)

ContextCompressor

Reduces retrieved context to query-relevant fragments before generation.

Compression strategies

  • Abstractive: LLM generates a focused summary of the context relevant to the query
  • Extractive: LLM selects the N most relevant sentences from the original text
  • Relevance filtering: LLM evaluates each paragraph and drops those below a threshold
All methods fall back to returning the original context unchanged on LLM failure.

Implementation

src/vectordb/haystack/components/context_compressor.py
from haystack.components.generators.chat import OpenAIChatGenerator

class ContextCompressor:
    """Compress and summarize retrieved context."""

    def __init__(
        self,
        model: str = "llama-3.3-70b-versatile",
        api_key: str | None = None,
    ) -> None:
        """Initialize context compressor."""
        resolved_api_key = api_key or os.environ.get("GROQ_API_KEY")
        
        self.generator = OpenAIChatGenerator(
            api_key=Secret.from_token(resolved_api_key),
            model=model,
            api_base_url="https://api.groq.com/openai/v1",
            generation_kwargs={"temperature": 0, "max_tokens": 2048},
        )

    def compress_abstractive(
        self,
        context: str,
        query: str,
        max_tokens: int = 2048,
    ) -> str:
        """Abstractive compression using LLM summarization."""
        prompt = f"""Summarize the following context to answer this question: "{query}"

Keep only the most relevant information. Be concise.

Context:
{context}

Summary (max {max_tokens} tokens):"""

        messages = [ChatMessage.from_user(prompt)]
        response = self.generator.run(messages=messages)
        summary = response["replies"][0].text
        
        compression_ratio = len(context) / (len(summary) + 1)
        logger.info("Abstractive compression: %.2fx", compression_ratio)
        
        return summary

    def compress_extractive(
        self,
        context: str,
        query: str,
        num_sentences: int = 5,
    ) -> str:
        """Extractive compression: select key sentences."""
        prompt = f"""Extract the {num_sentences} most relevant sentences from the following context to answer: "{query}"

Return ONLY the selected sentences in order, without numbering."""
        
        # Implementation similar to abstractive
        pass

    def compress(
        self,
        context: str,
        query: str,
        compression_type: str = "abstractive",
        **kwargs: Any,
    ) -> str:
        """Compress context using specified technique."""
        if compression_type == "abstractive":
            return self.compress_abstractive(context, query, **kwargs)
        elif compression_type == "extractive":
            return self.compress_extractive(context, query, **kwargs)
        elif compression_type == "relevance_filter":
            return self.filter_by_relevance(context, query, **kwargs)
        else:
            raise ValueError(f"Unsupported compression type: {compression_type}")

Usage

from vectordb.haystack.components import ContextCompressor

compressor = ContextCompressor(model="llama-3.3-70b-versatile")

compressed = compressor.compress(
    context, query, compression_type="extractive", num_sentences=5
)

QueryEnhancer

Generates improved retrieval queries from the user’s original input.

Enhancement strategies

  • Multi-query: Generates N alternative phrasings of the original query (default N=3)
  • HyDE: Generates M hypothetical documents that would answer the query (default M=3)
  • Step-back: Generates a broader, more abstract version of the query

Implementation

src/vectordb/haystack/components/query_enhancer.py
class QueryEnhancer:
    """Enhance and expand queries using LLM-based techniques."""

    def __init__(
        self,
        model: str = "llama-3.3-70b-versatile",
        api_key: str | None = None,
    ) -> None:
        """Initialize query enhancer."""
        self.generator = OpenAIChatGenerator(
            api_key=Secret.from_token(api_key or os.environ.get("GROQ_API_KEY")),
            model=model,
            api_base_url="https://api.groq.com/openai/v1",
            generation_kwargs={"temperature": 0.7, "max_tokens": 1024},
        )

    def generate_multi_queries(
        self,
        query: str,
        num_queries: int = 3,
    ) -> list[str]:
        """Generate multiple query variations."""
        prompt = f"""Generate {num_queries} different queries that would help retrieve relevant information for: "{query}"

Return ONLY the queries, one per line, without numbering or extra text."""

        messages = [ChatMessage.from_user(prompt)]
        response = self.generator.run(messages=messages)
        content = response["replies"][0].text
        
        queries = [q.strip() for q in content.split("\n") if q.strip()]
        # Always include original query first
        return [query] + queries[:num_queries - 1]

    def generate_hypothetical_documents(
        self,
        query: str,
        num_docs: int = 3,
    ) -> list[str]:
        """Generate hypothetical relevant documents (HyDE)."""
        prompt = f"""Generate {num_docs} hypothetical document excerpts that would directly answer this question: "{query}"

Return ONLY the document excerpts, separated by "---", without numbering or extra text."""
        
        # Implementation returns list of hypothetical documents
        pass

    def enhance_query(
        self,
        query: str,
        enhancement_type: str = "multi_query",
        **kwargs: Any,
    ) -> list[str]:
        """Enhance query using specified technique."""
        if enhancement_type == "multi_query":
            return self.generate_multi_queries(query, **kwargs)
        elif enhancement_type == "hyde":
            return self.generate_hypothetical_documents(query, **kwargs)
        elif enhancement_type == "step_back":
            step_back = self.generate_step_back_query(query)
            return [query, step_back]
        else:
            raise ValueError(f"Unsupported enhancement type: {enhancement_type}")

Usage

from vectordb.haystack.components import QueryEnhancer

enhancer = QueryEnhancer(model="llama-3.3-70b-versatile")

queries = enhancer.enhance_query(
    "What causes inflation?", enhancement_type="multi_query", num_queries=3
)
# → ["What causes inflation?", "What drives rising prices?", "Factors behind monetary inflation"]

ResultMerger

Fuses results from multiple retrieval sources into a single ranked list.

Fusion strategies

  • RRF (Reciprocal Rank Fusion): Combines rankings using 1 / (k + rank) without requiring score normalization
  • Weighted fusion: Weights inverse-rank scores by explicit weights

Usage

See the Hybrid search page for detailed implementation examples.

LLM configuration

All LLM-based components use the Groq API via Haystack’s OpenAIChatGenerator:
generator = OpenAIChatGenerator(
    api_key=Secret.from_token(api_key),
    model="llama-3.3-70b-versatile",
    api_base_url="https://api.groq.com/openai/v1",
    generation_kwargs={"temperature": 0, "max_tokens": 1024},
)
Set the GROQ_API_KEY environment variable or pass api_key directly.

When to use components directly

  • Building a custom pipeline that does not fit existing feature module templates
  • Experimenting with one pipeline stage at a time
  • Combining components from different feature modules into a novel configuration

Common pitfalls

Over-composing before baseline validation: Build and validate the simplest pipeline first. Add components incrementally and measure the impact of each addition.
Inconsistent interfaces between custom stages: If you extend these components, maintain the same input/output conventions (Haystack Document objects, standard config dicts).
No tracing at component boundaries: Each component logs at INFO level. Set LOG_LEVEL=DEBUG to see detailed prompt and response content.

Next steps

Pipelines

Learn how to compose components into full pipelines

Semantic search

See components in action in semantic search pipelines