Google Agent Development Kit (ADK) is an open-source Python framework from Google for building, evaluating, and deploying AI agents and multi-agent systems. ADK provides a composable architecture: agents are defined as Python classes or functions, tools are type-annotated callables, and orchestration uses either sequential, loop, or parallel workflows. ADK integrates natively with Vertex AI and Gemini models, making it the natural choice when building agents on GCP. For data engineers, ADK enables autonomous pipeline orchestration, data research workflows, and intelligent monitoring systems deployed to Cloud Run or Agent Engine.
Python 3.9+ google-adk Gemini / Vertex AI Multi-Agent GCP-native MCP-compatible
An ADK agent is a Python class inheriting from the appropriate base (LlmAgent for LLM-driven agents, BaseAgent for custom logic). Key constructor params: name, model (e.g. "gemini-2.0-flash"), instruction (system prompt), tools (list of callables or Tool objects), and sub_agents (for multi-agent architectures).
Tools in ADK are plain Python functions with type-annotated signatures and docstrings — the framework auto-generates the JSON schema for the model. Tools can be wrapped from external libraries (LangChain tools, MCP servers). ADK supports three tool types:
google_search, built_in_code_execution)ADK provides pluggable memory services:
| Memory Type | Scope | Default Backend |
|---|---|---|
| Session State | Within a single conversation session | In-memory dict |
| Persistent Memory | Across sessions for a user | Vertex AI Memory Bank / Firestore |
| Shared Agent State | Across agents within a workflow | In-session state dictionary |
ADK supports declarative multi-agent workflows via sub-agent composition:
ADK ships a built-in dev web UI (adk web) for local testing and conversation replay. For production, agents deploy to:
ADK includes a built-in evaluation module. Test cases are defined as JSON files with input, expected intermediate_steps (tool calls), and expected final_response. Run with adk eval <agent_module> <eval_set.json> to produce pass/fail metrics per test case.
A SequentialAgent workflow: a Discovery agent queries BigQuery Information Schema for tables modified in the last 24 hours, a Quality agent runs dbt tests on changed models, and a Notification agent posts a summary to Slack. Triggered by Cloud Scheduler — fully serverless DE automation.
A root LlmAgent receives a business question ("Which markets underperformed last quarter?"), dynamically delegates to a SQL Agent (query warehouse), a Trends Agent (Google Search for market context), and a Synthesis Agent (write narrative). Combines structured and unstructured data analysis in one workflow.
An agent backed by Vertex AI Search over the company data catalog answers natural-language questions about dataset lineage, ownership, SLAs, and field definitions. Deployed to a corporate Slack bot via Cloud Run, reducing time-to-answer for data consumers.
On a PagerDuty alert, a LoopAgent: reads the Airflow task failure → looks up the affected table definition → checks recent dbt test history → queries the error table → generates a fix PR via GitHub API → loops until the pipeline run succeeds. Closes low-complexity incidents without human intervention.
↑ Back to topfrom google.adk.agents import LlmAgent
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
import duckdb
# --- Tools (plain annotated functions) ---
def query_warehouse(sql: str) -> dict:
"""Execute a read-only SQL query against the data warehouse.
Args:
sql: A valid SELECT statement.
Returns:
A dict with 'columns' and 'rows' keys.
"""
conn = duckdb.connect("warehouse.ddb", read_only=True)
df = conn.execute(sql).fetchdf()
return {"columns": list(df.columns), "rows": df.head(20).to_dict("records")}
def list_tables() -> list[str]:
"""Return all table names in the warehouse."""
conn = duckdb.connect("warehouse.ddb", read_only=True)
return [r[0] for r in conn.execute("SHOW TABLES").fetchall()]
# --- Agent ---
warehouse_agent = LlmAgent(
name="warehouse_agent",
model="gemini-2.0-flash",
instruction="""You are a data warehouse assistant.
Use the available tools to answer SQL questions accurately.
Always validate your SQL syntax before executing.""",
tools=[query_warehouse, list_tables],
)
# --- Run ---
session_service = InMemorySessionService()
runner = Runner(
agent=warehouse_agent,
app_name="warehouse_assistant",
session_service=session_service,
)
session = session_service.create_session(app_name="warehouse_assistant", user_id="user_1")
from google.genai.types import Content, Part
response = runner.run(
user_id="user_1",
session_id=session.id,
new_message=Content(role="user", parts=[Part(text="Which tables exist and what is the row count of the orders table?")]),
)
for event in response:
if event.is_final_response():
print(event.content.parts[0].text)
from google.adk.agents import LlmAgent, SequentialAgent
# Sub-agent 1: Fetch pipeline status
status_agent = LlmAgent(
name="status_agent",
model="gemini-2.0-flash",
instruction="Check all Airflow DAG statuses and store failures in state['failures'].",
tools=[check_airflow_status], # custom tool
output_key="failures",
)
# Sub-agent 2: Root cause analysis
rca_agent = LlmAgent(
name="rca_agent",
model="gemini-2.0-pro",
instruction="""\
Given the pipeline failures in state['failures']:
1. Identify the root cause for each failure
2. Check related table schemas using the warehouse tool
3. Provide a structured diagnosis
Store results in state['diagnoses'].""",
tools=[query_warehouse],
output_key="diagnoses",
)
# Sub-agent 3: Generate report
report_agent = LlmAgent(
name="report_agent",
model="gemini-2.0-flash",
instruction="Format state['diagnoses'] into a Markdown incident report with severity ratings.",
output_key="report",
)
# Compose sequential workflow
pipeline_triage = SequentialAgent(
name="pipeline_triage",
sub_agents=[status_agent, rca_agent, report_agent],
)
# Run the full workflow
runner = Runner(agent=pipeline_triage, app_name="triage", session_service=session_service)
session = session_service.create_session(app_name="triage", user_id="oncall")
events = runner.run(
user_id="oncall",
session_id=session.id,
new_message=Content(role="user", parts=[Part(text="Run daily pipeline health check")]),
)
for e in events:
if e.is_final_response():
print(e.content.parts[0].text)
from google.adk.agents import LlmAgent, LoopAgent
def run_dbt_test(model_name: str) -> dict:
"""Run dbt tests for a model and return pass/fail status.
Args:
model_name: The dbt model name to test.
Returns:
Dict with 'status' ('passed'|'failed') and 'errors' list.
"""
import subprocess, json
result = subprocess.run(
["dbt", "test", "--select", model_name, "--output", "json"],
capture_output=True, text=True
)
return {"status": "passed" if result.returncode == 0 else "failed", "output": result.stdout}
fixer_agent = LlmAgent(
name="fixer",
model="gemini-2.0-pro",
instruction="""\
Run dbt tests for the model in state['model_name'].
If tests fail, analyse the errors and apply a fix via the patch_sql tool.
Set state['done'] = True when all tests pass.""",
tools=[run_dbt_test, patch_sql_tool],
)
loop = LoopAgent(
name="auto_fixer",
sub_agents=[fixer_agent],
max_iterations=5,
termination_condition="state.get('done') == True",
)
# requirements: google-cloud-aiplatform[adk,agent_engines]
import vertexai
from vertexai.preview import reasoning_engines
vertexai.init(project="my-gcp-project", location="us-central1")
# Wrap ADK agent for Agent Engine
app = reasoning_engines.AdkApp(
agent=warehouse_agent,
enable_tracing=True,
)
# Deploy (takes ~3 min, creates a managed endpoint)
remote_app = reasoning_engines.ReasoningEngine.create(
app,
requirements=[
"google-adk==1.0.0",
"duckdb==1.1.0",
],
display_name="Warehouse Assistant",
)
# Query the deployed agent
session = remote_app.create_session(user_id="prod_user")
result = remote_app.stream_query(
user_id="prod_user",
session_id=session["id"],
message="How many orders were placed yesterday?",
)
↑ Back to top
| Framework | Primary Backing | Multi-Agent | GCP Integration | Open Source | Best For |
|---|---|---|---|---|---|
| Google ADK | Google / Gemini | Native (Sequential/Parallel/Loop) | Native | Yes | GCP-native agent workflows |
| CrewAI | Independent | Native (crew metaphor) | Via LangChain tools | Yes | Role-based team agents |
| LangChain | LangChain Inc. | Via LangGraph | Via integrations | Yes | Composable pipelines, RAG |
| Azure AI Agents | Microsoft / OpenAI | Via AutoGen | Azure-native | Partial | Azure-native deployments |
| AWS Bedrock Agents | AWS / Anthropic | Multi-agent collaboration | AWS-native | No | AWS-native deployments |
Args: section with a clear description — treat docstrings as the API contract.state silently fail or corrupt. Convert to primitives (dicts, lists, strings) before storing.termination_condition never becomes True, the loop runs to max_iterations and exits with no clear error. Always initialise the termination state key and add a fallback condition after max iteration.gemini-pro, gemini-1.5-pro, gemini-2.0-flash) change rapidly. Pin model versions in production and test upgrades in staging before promoting.LlmAgent with tools for list_tables(), describe_table(table_name), and query_warehouse(sql) backed by a real DuckDB file. Evaluate it against 10 test questions using the ADK eval framework (adk eval).ParallelAgent with three sub-agents that simultaneously search for: (a) BigQuery pricing updates, (b) Snowflake pricing updates, (c) Databricks pricing updates. A final LlmAgent synthesises all three into a cost-comparison table.AdkApp wrapper, and run at least three queries against the remote endpoint. Capture traces in Cloud Trace and identify the slowest tool call.Args: section) to automatically generate a JSON Schema compatible with the Gemini function-calling protocol. No separate schema definition is needed.
SequentialAgent and LlmAgent with sub_agents?output_key parameter on an LlmAgent used for in a SequentialAgent pipeline?min-instances=1 to avoid cold starts. For infrequent/batch: Vertex AI Agent Engine (scales to zero, lower cost, built-in session management) accepting higher cold start latency.