← Back to LangChain

RAG Systems with LangChain

Building retrieval-augmented generation systems that combine LLMs with proprietary documents for accurate, contextual responses grounded in specific knowledge.

Overview

RAG systems solve the problem of LLM hallucinations by retrieving relevant documents before generation. LangChain simplifies this with document loaders, vector stores, and retrieval chains.

Basic RAG Pipeline

Python - Simple RAG Setup
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_openai import ChatOpenAI
from langchain.chains import RetrievalQA

# 1. Load documents
loader = PyPDFLoader("documents/company_policies.pdf")
documents = loader.load()

# 2. Split documents into chunks
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200,
    length_function=len,
)
splits = text_splitter.split_documents(documents)

# 3. Create embeddings and vector store
embeddings = OpenAIEmbeddings()
vectorstore = FAISS.from_documents(splits, embeddings)

# 4. Create retrieval chain
llm = ChatOpenAI(model="gpt-4")
qa_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=vectorstore.as_retriever(search_kwargs={"k": 5}),
)

# 5. Query
result = qa_chain({"query": "What is our vacation policy?"})

Multi-Document RAG with Metadata

Python - Advanced RAG System
from langchain_community.document_loaders import DirectoryLoader
from langchain_community.vectorstores import Chroma
from langchain.chains import RetrievalQAWithSourcesChain
from langchain.prompts import PromptTemplate

# Load multiple documents with metadata
loader = DirectoryLoader(
    path="documents/",
    glob="**/*.pdf",
    loader_cls=PyPDFLoader,
)
documents = loader.load()

# Add metadata
for doc in documents:
    doc.metadata["source_type"] = "company_policy"
    doc.metadata["department"] = extract_department(doc.metadata["source"])

# Split and embed
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000)
splits = text_splitter.split_documents(documents)

# Use persistent vector store (Chroma)
vectorstore = Chroma.from_documents(
    documents=splits,
    embedding=OpenAIEmbeddings(),
    persist_directory="./chroma_db",
)

# Create RAG chain with sources
llm = ChatOpenAI(model="gpt-4")
qa_chain = RetrievalQAWithSourcesChain.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=vectorstore.as_retriever(),
    return_source_documents=True,
)

# Query with source tracking
result = qa_chain({"question": "What is the policy on remote work?"})
print(f"Answer: {result['answer']}")
print(f"Sources: {result['sources']}")

Custom Retrieval Chain

Python - Custom Retrieval Logic
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain.schema import Document
from typing import List

# Custom retrieval with filtering
def retrieve_with_filter(query: str, vectorstore, filters: dict = None) -> List[Document]:
    """Retrieve documents with optional metadata filtering"""
    
    # Search in vector store
    docs = vectorstore.similarity_search_with_score(query, k=10)
    
    # Post-filter by metadata
    if filters:
        docs = [
            (doc, score) for doc, score in docs
            if all(doc.metadata.get(k) == v for k, v in filters.items())
        ]
    
    return [doc for doc, _ in docs[:5]]

# Custom prompt
qa_prompt = PromptTemplate(
    template="""Use the following context to answer the question.
If you don't know the answer, say "I don't have information about this."
Include relevant policy numbers when applicable.

Context:
{context}

Question: {question}

Answer:""",
    input_variables=["context", "question"],
)

# Create custom chain
llm = ChatOpenAI(model="gpt-4")
qa_chain = LLMChain(llm=llm, prompt=qa_prompt)

# Execute with custom retrieval
query = "What is the professional development budget?"
docs = retrieve_with_filter(
    query,
    vectorstore,
    filters={"department": "HR", "topic": "benefits"}
)
context = "\n".join([f"{d.page_content}" for d in docs])

answer = qa_chain.run(context=context, question=query)

Hybrid Search with Re-ranking

Python - Hybrid RAG with Re-ranking
from langchain.retrievers import BM25Retriever, EnsembleRetriever
from langchain_cohere import CohereRerank
from langchain.retrievers.document_compressors import CohereRerank as LangchainCohereRerank
from langchain.retrievers import ContextualCompressionRetriever

# Create multiple retrievers
vector_retriever = vectorstore.as_retriever(search_kwargs={"k": 10})
bm25_retriever = BM25Retriever.from_documents(splits)

# Ensemble retriever (combines multiple sources)
ensemble_retriever = EnsembleRetriever(
    retrievers=[vector_retriever, bm25_retriever],
    weights=[0.5, 0.5],
)

# Add re-ranking to compress results
compressor = LangchainCohereRerank(model="rerank-english-v2.0", top_n=5)
compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor,
    base_retriever=ensemble_retriever,
)

# Use compressed retriever in QA chain
qa_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=compression_retriever,
)

# Query
result = qa_chain({"query": "Tell me about benefits"})

Dynamic Context Window

Python - Adaptive Retrieval
from langchain.chains.question_answering import load_qa_chain

# Multi-step RAG for complex questions
class AdaptiveRAG:
    def __init__(self, vectorstore, llm):
        self.vectorstore = vectorstore
        self.llm = llm
        self.retrieval_chain = load_qa_chain(
            llm, chain_type="stuff"
        )
    
    def answer_question(self, question: str) -> dict:
        # Step 1: Retrieve initial documents
        initial_docs = self.vectorstore.similarity_search(question, k=5)
        
        # Step 2: Generate answer with initial context
        answer = self.retrieval_chain.run(
            input_documents=initial_docs,
            question=question
        )
        
        # Step 3: If answer contains uncertainty, retrieve more docs
        if "uncertain" in answer.lower() or "unclear" in answer.lower():
            # Expand search
            expanded_docs = self.vectorstore.similarity_search(question, k=10)
            answer = self.retrieval_chain.run(
                input_documents=expanded_docs,
                question=question
            )
        
        return {
            "answer": answer,
            "docs_used": len(initial_docs),
            "confidence": self.estimate_confidence(answer)
        }
    
    def estimate_confidence(self, answer: str) -> float:
        """Estimate confidence based on answer language"""
        uncertainty_markers = ["possibly", "might", "unclear", "insufficient"]
        marker_count = sum(
            1 for marker in uncertainty_markers
            if marker in answer.lower()
        )
        return max(0, 1 - (marker_count * 0.1))

# Usage
rag = AdaptiveRAG(vectorstore, llm)
result = rag.answer_question("What benefits are available for employees?")

Document Ingestion Pipeline

Python - Batch Document Processing
from langchain_community.document_loaders import (
    PyPDFLoader,
    CSVLoader,
    JSONLoader,
    WebBaseLoader,
)
import os

class DocumentIngestionPipeline:
    def __init__(self, vectorstore):
        self.vectorstore = vectorstore
        self.loaders = {
            ".pdf": PyPDFLoader,
            ".csv": CSVLoader,
            ".json": JSONLoader,
        }
    
    def ingest_directory(self, directory: str, **kwargs):
        """Ingest all documents from directory"""
        documents = []
        
        for filename in os.listdir(directory):
            filepath = os.path.join(directory, filename)
            ext = os.path.splitext(filename)[1]
            
            if ext in self.loaders:
                loader_class = self.loaders[ext]
                loader = loader_class(filepath, **kwargs)
                docs = loader.load()
                documents.extend(docs)
        
        # Process and add to vectorstore
        self.add_documents(documents)
        return len(documents)
    
    def ingest_urls(self, urls: list):
        """Ingest web documents"""
        documents = []
        for url in urls:
            loader = WebBaseLoader(url)
            docs = loader.load()
            documents.extend(docs)
        
        self.add_documents(documents)
        return len(documents)
    
    def add_documents(self, documents: list):
        """Add documents to vector store"""
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
        )
        splits = text_splitter.split_documents(documents)
        self.vectorstore.add_documents(splits)

# Usage
pipeline = DocumentIngestionPipeline(vectorstore)
count = pipeline.ingest_directory("data/policies/")
print(f"Ingested {count} documents")

Best Practices