LangChain is a Python/TypeScript framework for composing LLM-powered applications from modular components — chains, agents, retrievers, memory, and tools. Version 0.3 introduced the LangChain Expression Language (LCEL) as the universal composition primitive, replacing legacy LLMChain/SequentialChain APIs. For data engineers, LangChain is the fastest path to production RAG pipelines, SQL agents, and document processing workflows.
Python 3.9+ langchain 0.3 LCEL RAG Agents / ReAct Vector Stores
LCEL is the backbone of LangChain 0.3. Every component — prompts, models, retrievers, output parsers — implements the Runnable interface with invoke, stream, and batch. Chains are composed with the pipe operator |. This replaces the legacy LLMChain and SequentialChain classes that existed in 0.1/0.2.
# Modern LCEL pattern
chain = prompt | llm | output_parser
result = chain.invoke({"topic": "data pipelines"})
ChatPromptTemplate constructs structured messages with system, human, and AI turns. MessagesPlaceholder inserts dynamic message history. PartialPromptTemplate binds static variables at build time, reducing runtime overhead.
Memory preserves conversation context across turns. Key implementations:
| Memory Class | How It Works | Best For |
|---|---|---|
ConversationBufferMemory | Stores every message verbatim | Short conversations |
ConversationSummaryMemory | LLM compresses history into a running summary | Long sessions, token savings |
ConversationBufferWindowMemory | Keeps last k exchanges | Fixed-window context |
VectorStoreRetrieverMemory | Embeds & retrieves relevant past messages | Large, semantic recall |
An agent is a runnable that loops: think → call tool → observe → think again. The ReAct (Reasoning + Acting) pattern prompts the LLM to emit a Thought, Action, Action Input, and read an Observation. AgentExecutor manages the loop with configurable max_iterations and handle_parsing_errors.
Over 100 loaders (PyPDFLoader, WebBaseLoader, S3FileLoader, BigQueryLoader) convert raw data into Document objects. Text splitters (RecursiveCharacterTextSplitter recommended) chunk documents while respecting semantic boundaries. Key params: chunk_size, chunk_overlap, separators.
Any vector store (FAISS, Chroma, pgvector, Pinecone) plugs in as a VectorStoreRetriever. Advanced retrieval strategies:
Ingest PDFs, SharePoint files, Confluence pages into a vector store. Surface accurate answers with source citations via RetrievalQA or LCEL chains. Common in HR chatbots, legal review tools, and knowledge-base Q&A systems.
LangChain's SQLDatabaseChain or a custom agent wrapping SQLAlchemy translates natural language to SQL, executes queries, and formats results. Data-team assistants that let non-technical stakeholders query warehouses without writing SQL.
Agents equipped with tools (web search, email, calendar, API calls, Slack) can orchestrate multi-step workflows: pull sales data → summarise → generate PDF report → post to Slack. Replaces brittle RPA scripts with adaptive reasoning.
An agent monitors dbt test failures, reads error logs from cloud storage, calls a schema registry, and proposes — or auto-applies — fixes. Combines LLM reasoning with programmatic tool access to reduce MTTD/MTTR on data quality issues.
↑ Back to topfrom langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
prompt = ChatPromptTemplate.from_messages([
("system", "You are a data engineering assistant."),
("human", "{input}"),
])
chain = prompt | llm | StrOutputParser()
# Invoke
answer = chain.invoke({"input": "Explain the medallion architecture."})
# Stream (real-time tokens)
for chunk in chain.stream({"input": "Explain Delta Lake."}):
print(chunk, end="", flush=True)
# Batch multiple inputs in parallel
results = chain.batch([
{"input": "What is Iceberg?"},
{"input": "What is Hudi?"},
])
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain.chains import RetrievalQA
# 1. Load document
loader = PyPDFLoader("architecture-standards.pdf")
docs = loader.load()
# 2. Chunk it
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=150,
separators=["\n\n", "\n", ". ", " "],
)
chunks = splitter.split_documents(docs)
# 3. Embed & store
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
persist_directory="./chroma_db",
)
# 4. Build retrieval chain
retriever = vectorstore.as_retriever(
search_type="mmr", # Max Marginal Relevance ─ reduces redundancy
search_kwargs={"k": 5, "fetch_k": 20},
)
qa = RetrievalQA.from_chain_type(
llm=ChatOpenAI(model="gpt-4o-mini"),
chain_type="stuff",
retriever=retriever,
return_source_documents=True,
)
result = qa.invoke({"query": "What are our SLA requirements?"})
print(result["result"])
for doc in result["source_documents"]:
print(doc.metadata["source"], doc.metadata["page"])
from langchain.agents import AgentExecutor, create_react_agent
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langchain import hub
@tool
def run_sql(query: str) -> str:
"""Execute a read-only SQL query and return the results as a string."""
# inject connection from env
import duckdb
conn = duckdb.connect("warehouse.ddb")
result = conn.execute(query).fetchdf()
return result.to_string(max_rows=20)
@tool
def check_pipeline_status(pipeline_name: str) -> str:
"""Return the last run status of an Airflow DAG."""
import requests
resp = requests.get(
f"http://airflow:8080/api/v1/dags/{pipeline_name}/dagRuns",
auth=("admin", "admin"),
)
runs = resp.json()["dag_runs"]
return str(runs[-1]["state"]) if runs else "no runs"
tools = [run_sql, check_pipeline_status]
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# Pull standard ReAct prompt from LangChain Hub
prompt = hub.pull("hwchase17/react")
agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)
executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
max_iterations=8,
handle_parsing_errors=True,
)
result = executor.invoke({
"input": "Check the sales_etl pipeline status and query the last 5 failed rows from errors table"
})
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field
from typing import List, Optional
class DataQualityIssue(BaseModel):
column: str = Field(description="Affected column name")
severity: str = Field(description="low | medium | high | critical")
description: str = Field(description="Human-readable issue description")
suggested_fix: Optional[str] = Field(None, description="Recommended remediation")
class DQReport(BaseModel):
table: str
total_rows: int
issues: List[DataQualityIssue]
overall_score: float = Field(description="0-100 data quality score")
llm = ChatOpenAI(model="gpt-4o-mini")
structured_llm = llm.with_structured_output(DQReport)
prompt = ChatPromptTemplate.from_messages([
("system", "Analyse the profiling stats and return a DQ report."),
("human", "Table: {table}\nStats:\n{stats}"),
])
chain = prompt | structured_llm
report: DQReport = chain.invoke({
"table": "orders",
"stats": "customer_id null_pct=12%, amount negative_count=47, ...",
})
print(report.overall_score, report.issues)
↑ Back to top
| Framework | Primary Focus | Composition Model | Multi-Agent | Best For DE |
|---|---|---|---|---|
| LangChain 0.3 | Composable LLM pipelines | LCEL pipe operator | Basic (via agents) | RAG, SQL agents, doc processing |
| LlamaIndex | Data-centric indexing & RAG | Pipelines & query engines | Limited | Complex multi-index RAG |
| CrewAI | Multi-agent role orchestration | Agent → Task → Crew | Native | Parallel agentic workflows |
| Raw OpenAI SDK | Direct API access | Manual | DIY | Tight control / minimal deps |
| Haystack | Search & NLP pipelines | Pipeline DAG | No | Elasticsearch-centered RAG |
LLMChain, SequentialChain, and ConversationalRetrievalChain are deprecated in 0.3. Rewrite all chains with LCEL before upgrading; the API surface changed significantly between 0.1 → 0.2 → 0.3.ConversationBufferMemory keeps the entire history. Long sessions blow past model context limits. Switch to ConversationSummaryMemory or sliding-window memory for production apps.langchain, langchain-core, langchain-community, langchain-openai, etc. Import paths shifted with each sub-package. Always pin exact versions in requirements.txt.EnsembleRetriever that combines BM25 keyword search (via BM25Retriever) and Chroma dense retrieval with weights 0.4 / 0.6. Evaluate retrieval precision on 20 hand-labeled Q&A pairs from a PDF of your choice.SQLDatabaseChain agent that wraps a DuckDB warehouse. Add a custom tool validator that rejects any query containing DROP, DELETE, or UPDATE before execution.psycopg, (b) run them through the LLM DQ chain, (c) write the resulting JSON report to an S3 bucket using boto3.| operator mean in an LCEL chain like prompt | llm | parser?RunnableSequence([prompt, llm, parser]).
ConversationSummaryMemory over ConversationBufferMemory?retriever and a vectorstore in LangChain?get_relevant_documents(query) method — making it composable in LCEL chains.
handle_parsing_errors=False and an agent emits a malformed action?OutputParserException and the chain fails. With handle_parsing_errors=True, the error message is fed back to the LLM as an observation, giving it a chance to self-correct.
MMR (Max Marginal Relevance) preferred over plain similarity_search in RAG?