← Back to LangChain

Data Analysis with LangChain

Leveraging LLMs with Python execution capabilities to automatically analyze structured and unstructured data, generate insights, and create visualizations.

Overview

LLMs can reason about data, write analysis code, and generate insights. LangChain chains enable agents to execute Python code, query databases, and generate reports automatically.

SQL Analysis with ReAct Agent

Python - Database Analysis Agent
from langchain.agents import AgentExecutor, create_react_agent
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
import sqlite3
import pandas as pd

# Database connection
conn = sqlite3.connect("sales.db")

@tool
def execute_sql(query: str) -> str:
    """Execute SQL query and return results"""
    try:
        df = pd.read_sql_query(query, conn)
        return df.to_string()
    except Exception as e:
        return f"SQL Error: {str(e)}"

@tool
def get_schema() -> str:
    """Get database schema"""
    cursor = conn.cursor()
    cursor.execute("SELECT sql FROM sqlite_master WHERE type='table'")
    schema = "\n".join([row[0] for row in cursor.fetchall()])
    return schema

@tool
def generate_chart_code(data: str, chart_type: str) -> str:
    """Generate matplotlib code for chart"""
    code = f"""import matplotlib.pyplot as plt
{data}
plt.figure(figsize=(10, 6))
# {chart_type} chart code here
plt.show()
"""
    return code

# Create analysis agent
tools = [execute_sql, get_schema, generate_chart_code]
llm = ChatOpenAI(model="gpt-4")

agent = create_react_agent(
    llm=llm,
    tools=tools,
    prompt="You are a data analyst. Use tools to analyze database."
)

executor = AgentExecutor.from_agent_and_tools(
    agent=agent,
    tools=tools,
    verbose=True,
)

# Query database
result = executor.invoke({
    "input": "What are sales trends by month? Show top 10 products."
})

Python Code Execution Agent

Python - Code-Generating Analysis
from langchain.agents import AgentExecutor, create_react_agent
from langchain_core.tools import tool
import subprocess
import tempfile
import os

@tool
def write_python_code(code: str, filename: str = "analysis.py") -> str:
    """Write and execute Python code"""
    with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
        f.write(code)
        temp_file = f.name
    
    try:
        result = subprocess.run(
            ["python", temp_file],
            capture_output=True,
            text=True,
            timeout=30
        )
        output = result.stdout + result.stderr
        return output if output else "Code executed successfully"
    except subprocess.TimeoutExpired:
        return "Code execution timed out"
    except Exception as e:
        return f"Error: {str(e)}"
    finally:
        os.unlink(temp_file)

@tool
def load_csv_data(filepath: str) -> str:
    """Load and summarize CSV data"""
    import pandas as pd
    df = pd.read_csv(filepath)
    return f"""
Data shape: {df.shape}
Columns: {df.columns.tolist()}
Data types:\n{df.dtypes}
First 5 rows:\n{df.head()}
"""

@tool
def statistical_summary(data_desc: str) -> str:
    """Generate statistical summary code"""
    return """
import pandas as pd
import numpy as np
df = pd.read_csv('data.csv')
print(df.describe())
print("\\nCorrelation Matrix:")
print(df.corr())
"""

# Analysis agent
tools = [write_python_code, load_csv_data, statistical_summary]

agent = create_react_agent(
    llm=llm,
    tools=tools,
    prompt="You are a data science analyst. Write Python code to analyze data."
)

executor = AgentExecutor.from_agent_and_tools(
    agent=agent,
    tools=tools,
    verbose=True,
)

# Analyze data
result = executor.invoke({
    "input": """
    Analyze 'customer_data.csv':
    1. Load the data
    2. Generate statistics
    3. Find correlations
    4. Identify outliers
    """
})

Multi-Source Data Analysis

Python - Aggregate Analysis
from langchain.agents import AgentExecutor, create_structured_chat_agent
import requests
import json

@tool
def fetch_api_data(endpoint: str, params: dict = None) -> str:
    """Fetch data from API"""
    try:
        response = requests.get(endpoint, params=params)
        data = response.json()
        return json.dumps(data, indent=2)
    except Exception as e:
        return f"API Error: {str(e)}"

@tool
def query_data_warehouse(table: str, filters: dict = None) -> str:
    """Query data warehouse"""
    # Implementation for DW query
    return "Data warehouse results"

@tool
def merge_and_analyze(sources: list) -> str:
    """Combine data from multiple sources"""
    code = """
import pandas as pd
# Merge data from multiple sources
combined = pd.concat([source1, source2])
# Perform analysis
"""
    return code

# Multi-source agent
tools = [fetch_api_data, query_data_warehouse, merge_and_analyze]

agent = create_structured_chat_agent(
    llm=llm,
    tools=tools,
    prompt="Analyze data from multiple sources."
)

executor = AgentExecutor.from_agent_and_tools(
    agent=agent,
    tools=tools,
    verbose=True,
)

# Execute multi-source analysis
result = executor.invoke({
    "input": """
    Combine and analyze:
    1. Customer data from API
    2. Sales from data warehouse
    3. Find patterns and trends
    """
})

Natural Language to SQL

Python - Text-to-SQL Analysis
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
import pandas as pd

# Text-to-SQL prompt
sql_template = """Given the database schema:
{schema}

Convert this natural language request to SQL:
{natural_query}

SQL Query:"""

sql_prompt = PromptTemplate(
    template=sql_template,
    input_variables=["schema", "natural_query"]
)

# Create chain
llm = ChatOpenAI(model="gpt-4")
sql_chain = LLMChain(llm=llm, prompt=sql_prompt)

# Get schema
cursor = conn.cursor()
cursor.execute("SELECT sql FROM sqlite_master WHERE type='table'")
schema = "\n".join([row[0] for row in cursor.fetchall()])

# Convert natural language to SQL
natural_query = "Show me total sales by product category for Q4"
sql_query = sql_chain.run(
    schema=schema,
    natural_query=natural_query
)

print(f"Generated SQL: {sql_query}")

# Execute query
df = pd.read_sql_query(sql_query, conn)
print("\nResults:")
print(df)

Insight Generation

Python - Automated Insights
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate

# Insight generation prompt
insight_template = """Analyze this data and generate actionable insights:

Data Summary:
{data_summary}

Statistical Analysis:
{statistics}

Generate:
1. Key findings
2. Anomalies or trends
3. Recommendations
4. Next steps for investigation

Insights:"""

insight_prompt = PromptTemplate(
    template=insight_template,
    input_variables=["data_summary", "statistics"]
)

insight_chain = LLMChain(llm=llm, prompt=insight_prompt)

# Prepare data
df = pd.read_csv("sales_data.csv")
data_summary = df.describe().to_string()
statistics = f"Columns: {df.columns.tolist()}\nShape: {df.shape}"

# Generate insights
insights = insight_chain.run(
    data_summary=data_summary,
    statistics=statistics
)

print("Generated Insights:")
print(insights)

Report Generation

Python - Automated Report Creation
from langchain.chains import LLMChain, SequentialChain
from langchain.prompts import PromptTemplate

# Executive summary
summary_template = """Create an executive summary based on:
{analysis_results}
Max 3 paragraphs.
Summary:"""

summary_prompt = PromptTemplate(
    template=summary_template,
    input_variables=["analysis_results"]
)

# Detailed findings
findings_template = """Detail the findings from:
{analysis_results}

Include:
- Data sources
- Methodology
- Key metrics
- Validation

Findings:"""

findings_prompt = PromptTemplate(
    template=findings_template,
    input_variables=["analysis_results"]
)

# Recommendations
recommendations_template = """Based on:
{findings}

Generate recommendations:
1. Immediate actions
2. Medium-term strategy
3. Long-term initiatives

Recommendations:"""

recommendations_prompt = PromptTemplate(
    template=recommendations_template,
    input_variables=["findings"]
)

# Create chains
summary_chain = LLMChain(llm=llm, prompt=summary_prompt)
findings_chain = LLMChain(llm=llm, prompt=findings_prompt)
recommendations_chain = LLMChain(llm=llm, prompt=recommendations_prompt)

# Execute sequential chain
combined = SequentialChain(
    chains=[summary_chain, findings_chain, recommendations_chain],
    input_variables=["analysis_results"],
    output_variables=["text"],
    verbose=True
)

# Generate report
analysis_results = "Q4 Sales Analysis: 15% growth, highest in region X"
report = combined({"analysis_results": analysis_results})

print("Generated Report:")
print(report["text"])

Scheduled Analysis

Python - Recurring Analysis Pipeline
import schedule
import time
from datetime import datetime

class AnalysisPipeline:
    def __init__(self, agent):
        self.agent = agent
        self.results = []
    
    def run_daily_analysis(self):
        """Run analysis daily"""
        timestamp = datetime.now().isoformat()
        
        result = self.agent.invoke({
            "input": "Analyze today's metrics and generate insights"
        })
        
        self.results.append({
            "timestamp": timestamp,
            "analysis": result
        })
        
        # Save results
        self.save_results()
        
        # Send report
        self.send_report(result)
    
    def save_results(self):
        """Save analysis results"""
        import json
        with open("analysis_history.json", "w") as f:
            json.dump(self.results, f)
    
    def send_report(self, result):
        """Send report via email or Slack"""
        # Implementation for sending report
        pass

# Schedule analysis
pipeline = AnalysisPipeline(executor)

schedule.every().day.at("09:00").do(pipeline.run_daily_analysis)
schedule.every().week.do(pipeline.run_daily_analysis)

# Run scheduler
while True:
    schedule.run_pending()
    time.sleep(60)

Best Practices