CrewAI

CrewAI is a Python framework for orchestrating teams of autonomous AI agents. Each agent has a role, goal, backstory, and a set of tools. Agents are assigned Tasks and assembled into a Crew that runs them in sequential or hierarchical order. CrewAI abstracts away the LLM-loop complexity so you can focus on workflow design. For data engineers, it excels at parallelisable research, automated reporting, and pipeline triage workflows where multiple specialised agents collaborate.

Python 3.10+ crewai ≥ 0.28 Multi-Agent Sequential / Hierarchical Memory Tool Use

Table of Contents

  1. Core Concepts
  2. Industry Use Cases
  3. Code Examples
  4. Comparison Table
  5. Gotchas & Pitfalls
  6. Exercises
  7. Quiz
  8. Further Reading

Core Concepts

1. Agent

The fundamental actor in CrewAI. An agent wraps an LLM with a persona: role (job title), goal (what it wants to achieve), and backstory (context shaping its reasoning). Agents can be given a list of tools, a specific llm, and flags like allow_delegation (can pass work to other agents) and memory=True.

2. Task

A unit of work assigned to an agent. Key fields: description (detailed natural-language instructions), agent (assigned agent), expected_output (contract for the result), and optional context (list of prior tasks whose output is passed in). Tasks are the composable building blocks; the same agent can handle many tasks.

3. Crew

The orchestration container that holds agents, tasks, and a process type. Calling crew.kickoff() starts execution and returns the final task's output. Optional params: verbose, memory, full_output (returns all task outputs), max_rpm (rate-limit LLM calls).

4. Process Types

ProcessExecution OrderManager AgentBest For
Process.sequentialTasks run in list order; each gets previous output as contextNoLinear pipelines, chained summarisation
Process.hierarchicalA manager agent decomposes the goal, delegates sub-tasks, and aggregates resultsRequiredComplex goals with flexible sub-task allocation

5. Memory System

CrewAI provides three memory scopes that persist context across agent interactions:

6. Tools

Agents extend their capabilities with tools. Built-in tools include SerperDevTool (web search), FileReadTool, DirectoryReadTool, and CodeInterpreterTool. Custom tools are created by subclassing BaseTool or using the @tool decorator from LangChain (CrewAI is LangChain-compatible).

↑ Back to top

Industry Use Cases

1. Automated Data Research Pipeline

A Researcher agent crawls the web for domain data, a Data Analyst agent cleans and structures findings, and a Report Writer agent formats them into a Markdown or PDF document. Replaces hours of manual research with an on-demand pipeline.

2. Incident Triage & Root-Cause Analysis

A Log Analyst agent parses Airflow/dbt failure logs, a Schema Inspector agent queries the data catalog, and an RCA Synthesiser agent combines findings into a structured incident report — all triggered automatically on pipeline failure via webhook.

3. Content & Documentation Generation

A Researcher writes first-draft documentation from code, a Reviewer critiques it for accuracy, and a Formatter applies style guidelines. Keeps technical docs in sync with codebase changes at low cost.

4. Customer Data Analysis & Response

In a SaaS context, a Data Extractor agent pulls customer usage metrics from a warehouse, an Insight Agent identifies at-risk accounts, and a Communication Agent drafts personalised outreach email templates — all from a single kickoff() call.

↑ Back to top

Code Examples

1. Sequential Crew — Research → Analyse → Report

from crewai import Agent, Task, Crew, Process
from crewai_tools import SerperDevTool
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.3)
search = SerperDevTool()

# --- Agents ---
researcher = Agent(
    role="Data Engineering Researcher",
    goal="Find current best practices and tools for real-time data pipelines",
    backstory="Senior DE with 10 years experience across Kafka, Flink, and cloud platforms",
    tools=[search],
    llm=llm,
    verbose=True,
)

analyst = Agent(
    role="Data Analyst",
    goal="Distil research into actionable insights with concrete recommendations",
    backstory="Analyst specialising in evaluating trade-offs across DE architectures",
    llm=llm,
    verbose=True,
)

writer = Agent(
    role="Technical Writer",
    goal="Produce clear, well-structured Markdown reports for engineering teams",
    backstory="Technical writer with background in open-source documentation",
    llm=llm,
    verbose=True,
)

# --- Tasks ---
research_task = Task(
    description="Research the top 5 real-time streaming frameworks in 2025. Include adoption, performance benchmarks, and cloud integration support.",
    agent=researcher,
    expected_output="A structured summary of 5 frameworks with pros, cons, and benchmark data",
)

analysis_task = Task(
    description="Analyse the research findings and recommend the best framework for a fintech company processing 1M events/sec on AWS.",
    agent=analyst,
    expected_output="A recommendation with justification, risk factors, and migration considerations",
    context=[research_task],  # receives research_task output
)

report_task = Task(
    description="Write a 2-page engineering decision record (EDR) in Markdown format summarising the research and recommendation.",
    agent=writer,
    expected_output="Markdown EDR with sections: Background, Options Considered, Decision, Consequences",
    context=[research_task, analysis_task],
)

# --- Crew ---
crew = Crew(
    agents=[researcher, analyst, writer],
    tasks=[research_task, analysis_task, report_task],
    process=Process.sequential,
    verbose=True,
)

result = crew.kickoff()
print(result)

2. Hierarchical Crew with Manager Agent

from crewai import Agent, Task, Crew, Process

manager = Agent(
    role="Data Platform Lead",
    goal="Coordinate the team to deliver a complete data quality audit report",
    backstory="Experienced data platform manager who delegates effectively",
    allow_delegation=True,
    llm=llm,
)

schema_inspector = Agent(
    role="Schema Inspector",
    goal="Analyse table schemas for type inconsistencies and missing constraints",
    backstory="DBA with expertise in data modelling and schema design",
    tools=[run_sql_tool],  # custom tool
    llm=llm,
)

profiler = Agent(
    role="Data Profiler",
    goal="Compute null rates, cardinality, and outlier statistics for each column",
    backstory="Data quality engineer specialised in profiling large datasets",
    tools=[run_sql_tool],
    llm=llm,
)

audit_task = Task(
    description="Perform a full data quality audit of the 'orders' table in the production warehouse.",
    agent=manager,
    expected_output="Comprehensive DQ report with schema issues, null rates, outliers, and severity ratings",
)

crew = Crew(
    agents=[manager, schema_inspector, profiler],
    tasks=[audit_task],
    process=Process.hierarchical,
    manager_llm=llm,
    verbose=True,
)

result = crew.kickoff()

3. Crew with Long-Term Memory

from crewai import Agent, Task, Crew, Process
from crewai.memory import LongTermMemory
from crewai.memory.storage.ltm_sqlite_storage import LTMSQLiteStorage

# Long-term memory persisted to local SQLite
ltm = LongTermMemory(
    storage=LTMSQLiteStorage(db_path="./crew_memory.db")
)

incident_agent = Agent(
    role="Incident Analyst",
    goal="Diagnose pipeline failures using historical incident knowledge",
    backstory="SRE who has handled hundreds of data pipeline incidents",
    memory=True,
    llm=llm,
)

triage_task = Task(
    description="Analyse the attached Airflow task failure log and suggest a fix based on past incidents.",
    agent=incident_agent,
    expected_output="Root cause, similar past incidents, and recommended fix with confidence score",
)

crew = Crew(
    agents=[incident_agent],
    tasks=[triage_task],
    memory=True,
    long_term_memory=ltm,
    verbose=True,
)

result = crew.kickoff(inputs={"log": open("airflow_error.log").read()})

4. Custom Tool for SQL Querying

from crewai.tools import BaseTool
from pydantic import BaseModel, Field
import duckdb

class SQLInput(BaseModel):
    query: str = Field(description="READ-ONLY SQL query to execute on the warehouse")

class WarehouseTool(BaseTool):
    name: str = "warehouse_query"
    description: str = "Execute a read-only SQL query against the DuckDB warehouse and return results."
    args_schema: type[BaseModel] = SQLInput

    def _run(self, query: str) -> str:
        if any(kw in query.upper() for kw in ["DROP", "DELETE", "UPDATE", "INSERT"]):
            raise ValueError("Only SELECT queries are permitted")
        conn = duckdb.connect("warehouse.ddb", read_only=True)
        df = conn.execute(query).fetchdf()
        return df.to_string(max_rows=30)

run_sql_tool = WarehouseTool()
↑ Back to top

Comparison Table

Framework Multi-Agent Execution Model Memory Learning Curve Best For
CrewAI Native Sequential / Hierarchical Short + Long + Entity Low Role-based team workflows
LangChain Agents Basic (single agent loops) ReAct loop Pluggable Medium Tool-heavy single-agent tasks
AutoGen Native Conversational (message passing) Limited Medium Debate / critique agent patterns
Google ADK Native Event-driven orchestration Full (short + long) Medium GCP-integrated autonomous agents
LangGraph Via graph nodes Stateful DAG / cyclic graph State channels High Complex conditional branching
↑ Back to top

Gotchas & Pitfalls

↑ Back to top

Exercises

  1. Pipeline Incident Crew: Build a 3-agent crew (Log Analyst, Schema Inspector, Report Writer) that reads a sample Airflow task log, queries a mock DuckDB schema, and outputs a structured incident report with root cause and fix recommendation.
  2. Hierarchical Data Audit: Implement the DQ audit example with a real Postgres table. The manager agent should instruct the schema inspector and profiler agents, then synthesise both outputs into a final JSON report with an overall quality score.
  3. Memory-Enhanced Recommender: Create a crew that, over multiple kickoff calls, learns which frameworks a user's team has already evaluated (via long-term memory) and skips re-recommending them in subsequent research sessions.
↑ Back to top

Quiz

  1. What is the difference between Process.sequential and Process.hierarchical?
    Answer: Sequential runs tasks in the declared list order, passing each output to the next. Hierarchical uses a manager agent that dynamically decomposes the goal, delegates to worker agents, and aggregates their results — requiring a manager_llm.
  2. What are the three memory scopes in CrewAI and what does each store?
    Answer: Short-term (in-session RAG scratch pad), Long-term (persisted task outputs and learnings across runs, SQLite by default), and Entity memory (tracks named entities mentioned across tasks).
  3. When would you set allow_delegation=True on an agent?
    Answer: When the agent should be able to subcontract part of its task to another agent in the crew — typically for manager agents in hierarchical crews. Keep it False for leaf/worker agents to avoid unintended delegation chains.
  4. How does the context parameter on a Task work?
    Answer: It takes a list of prior Task objects. When the task runs, the outputs of those prior tasks are appended to the current task's prompt as additional context, enabling information flow between agents.
  5. Why should you use args_schema (Pydantic model) when defining a custom BaseTool?
    Answer: Pydantic validation ensures the LLM-generated arguments conform to the expected types and constraints before the tool runs. Without it, type mismatches or missing fields can cause silent failures or unexpected behaviour.
↑ Back to top

Further Reading

↑ Back to top