← Back to LangChain

Task Automation with LangChain

Building autonomous agents that perform complex workflows by reasoning about tasks, using tools, and adapting to outcomes.

Overview

LangChain agents combine language understanding with tool use to autonomously complete tasks. Using ReAct (Reasoning + Acting) pattern, agents think through problems and execute appropriate actions.

Basic Agent with Tools

Python - Simple Task Agent
from langchain.agents import AgentExecutor, create_react_agent
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate

# Define tools agent can use
@tool
def search_web(query: str) -> str:
    """Search the web for information"""
    return f"Search results for: {query}"

@tool
def fetch_email(sender: str = None) -> str:
    """Fetch emails, optionally filtered by sender"""
    return f"Emails from {sender or 'all'}"

@tool
def send_email(recipient: str, subject: str, body: str) -> str:
    """Send an email"""
    return f"Email sent to {recipient}"

@tool
def create_calendar_event(title: str, date: str, time: str) -> str:
    """Create a calendar event"""
    return f"Event '{title}' created for {date} at {time}"

# Create agent
tools = [search_web, fetch_email, send_email, create_calendar_event]
llm = ChatOpenAI(model="gpt-4")

agent = create_react_agent(
    llm=llm,
    tools=tools,
    prompt="You are a helpful assistant that uses available tools to complete tasks."
)

agent_executor = AgentExecutor.from_agent_and_tools(
    agent=agent,
    tools=tools,
    verbose=True,
    max_iterations=10,
)

# Use agent
result = agent_executor.invoke({
    "input": "Search for AI news and send me a summary via email"
})

Multi-Step Workflow Agent

Python - Complex Workflow Automation
from langchain.agents import AgentExecutor, create_structured_chat_agent
from langchain.tools import tool
import requests

# Complex workflow tools
@tool
def query_database(sql: str) -> str:
    """Execute SQL query on database"""
    # Implementation would connect to actual DB
    return "Query results: [data]"

@tool
def analyze_data(data: str, metric: str) -> str:
    """Analyze data and compute metrics"""
    return f"Analysis of {metric}: [results]"

@tool
def generate_report(title: str, content: str) -> str:
    """Generate formatted report"""
    return f"Report generated: {title}"

@tool
def send_report_slack(channel: str, report: str) -> str:
    """Send report to Slack channel"""
    return f"Report sent to #{channel}"

# Create workflow agent
tools = [query_database, analyze_data, generate_report, send_report_slack]

agent = create_structured_chat_agent(
    llm=llm,
    tools=tools,
    prompt="You are a business intelligence agent."
)

executor = AgentExecutor.from_agent_and_tools(
    agent=agent,
    tools=tools,
    verbose=True,
    max_iterations=15,
)

# Execute complex workflow
result = executor.invoke({
    "input": """
    1. Query sales data from database for last month
    2. Analyze top performing products
    3. Generate quarterly report
    4. Send report to #sales-team on Slack
    """
})

Error Handling & Retries

Python - Robust Agent with Fallbacks
from langchain.agents import AgentExecutor, create_react_agent
from langchain.callbacks import BaseCallbackHandler
from typing import Any

# Custom callback for monitoring
class AgentCallbacks(BaseCallbackHandler):
    def on_tool_error(self, error: Exception, **kwargs: Any) -> Any:
        print(f"Tool error: {error}")
        # Log to monitoring service
    
    def on_agent_finish(self, finish: dict, **kwargs: Any) -> Any:
        print(f"Agent finished with output: {finish['output']}")

# Error-resilient tools
@tool
def fetch_api_data(endpoint: str, retries: int = 3) -> str:
    """Fetch data with retry logic"""
    for attempt in range(retries):
        try:
            response = requests.get(f"https://api.example.com/{endpoint}")
            return response.json()
        except requests.RequestException as e:
            if attempt == retries - 1:
                return f"Failed after {retries} attempts: {str(e)}"
            continue
    return "Success"

# Agent with error handling
agent = create_react_agent(
    llm=ChatOpenAI(model="gpt-4", temperature=0),
    tools=[fetch_api_data],
    prompt="Handle errors gracefully and retry if needed."
)

executor = AgentExecutor.from_agent_and_tools(
    agent=agent,
    tools=[fetch_api_data],
    verbose=True,
    max_iterations=5,
    callbacks=[AgentCallbacks()],
    handle_parsing_errors=True,
)

# Execute with error resilience
try:
    result = executor.invoke({"input": "Fetch user data from API"})
except Exception as e:
    print(f"Agent execution failed: {e}")

Document Processing Agent

Python - Document Workflow Automation
from langchain_community.document_loaders import PyPDFLoader
from langchain_community.vectorstores import FAISS
from pathlib import Path

@tool
def load_document(file_path: str) -> str:
    """Load and summarize a document"""
    loader = PyPDFLoader(file_path)
    docs = loader.load()
    return f"Loaded {len(docs)} pages from {file_path}"

@tool
def extract_tables(file_path: str) -> str:
    """Extract tables from document"""
    return "Tables extracted: [table data]"

@tool
def search_documents(query: str, directory: str) -> str:
    """Search across multiple documents"""
    # Would search vector store of documents
    return f"Documents matching '{query}': [results]"

@tool
def extract_metadata(file_path: str) -> str:
    """Extract metadata from document"""
    return "Metadata: {title, author, date, ...}"

@tool
def organize_documents(source_dir: str, dest_dir: str, criteria: str) -> str:
    """Organize documents based on criteria"""
    return f"Documents organized in {dest_dir}"

# Document processing agent
tools = [load_document, extract_tables, search_documents, extract_metadata, organize_documents]

agent = create_react_agent(
    llm=llm,
    tools=tools,
    prompt="You are a document processing specialist."
)

executor = AgentExecutor.from_agent_and_tools(
    agent=agent,
    tools=tools,
    verbose=True,
)

# Process document batch
result = executor.invoke({
    "input": """
    Process documents in 'incoming_docs/':
    1. Load each PDF
    2. Extract tables
    3. Organize by document type
    4. Search for contracts
    """
})

Decision Tree Agent

Python - Conditional Logic Agent
from langchain.agents import AgentExecutor, create_structured_chat_agent
from enum import Enum

class ApprovalLevel(Enum):
    AUTO = "automatic"
    MANAGER = "manager_review"
    DIRECTOR = "director_review"

@tool
def check_request_amount(amount: float) -> str:
    """Determine approval level based on amount"""
    if amount < 1000:
        return "automatic"
    elif amount < 10000:
        return "manager_review"
    else:
        return "director_review"

@tool
def get_manager_approval(request_id: str, amount: float) -> str:
    """Get manager approval"""
    return f"Manager approved request {request_id} for ${amount}"

@tool
def notify_requester(request_id: str, status: str) -> str:
    """Notify requester of approval status"""
    return f"Notified requester: {status}"

# Approval workflow agent
tools = [check_request_amount, get_manager_approval, notify_requester]

agent = create_structured_chat_agent(
    llm=llm,
    tools=tools,
    prompt="You are an approval workflow agent. Route requests based on amount."
)

executor = AgentExecutor.from_agent_and_tools(
    agent=agent,
    tools=tools,
    verbose=True,
)

# Process approval request
result = executor.invoke({
    "input": "Process approval for request REQ-123 with amount $5000"
})

Async Agent Execution

Python - Concurrent Task Execution
import asyncio
from typing import List

class AsyncTaskExecutor:
    def __init__(self, llm, tools):
        self.agent = create_react_agent(llm, tools)
        self.executor = AgentExecutor.from_agent_and_tools(
            agent=self.agent,
            tools=tools,
        )
    
    async def execute_task(self, task: str):
        """Execute task asynchronously"""
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            None,
            self.executor.invoke,
            {"input": task}
        )
    
    async def execute_tasks_concurrent(self, tasks: List[str]):
        """Execute multiple tasks concurrently"""
        results = await asyncio.gather(*[
            self.execute_task(task) for task in tasks
        ])
        return results

# Use async executor
async def main():
    executor = AsyncTaskExecutor(llm, [search_web, send_email])
    
    tasks = [
        "Search for latest AI papers",
        "Send summary email to team",
        "Create calendar event for discussion"
    ]
    
    results = await executor.execute_tasks_concurrent(tasks)
    return results

# Run
asyncio.run(main())

Best Practices