← Back to Google ADK

Data Research & Analysis with Google ADK

Autonomous agents that explore diverse data sources, perform analysis, and generate insights without human guidance.

Overview

Data research agents use ADK to autonomously query databases, APIs, and files, analyze results, identify patterns, and generate comprehensive reports. These agents adapt their research strategy based on findings.

Research Agent Architecture

Python - Research Agent
from google_adk import Agent, Tool, ResearchStrategy
from typing import Dict, List

class DataResearchAgent(Agent):
    def __init__(self):
        super().__init__()
        self.research_history = []
    
    @Tool(description="Query database for specific data")
    def query_database(self, query: str, table: str) -> List[Dict]:
        """Execute database query"""
        # Agent formulates SQL based on research goal
        return [
            {"id": 1, "metric": "value1", "date": "2025-01-01"},
            {"id": 2, "metric": "value2", "date": "2025-01-02"}
        ]
    
    @Tool(description="Fetch external API data")
    def fetch_external_data(self, source: str, params: Dict) -> Dict:
        """Retrieve data from external APIs"""
        # Agent decides which APIs to query and parameters
        return {"status": "success", "data": []}
    
    @Tool(description="Analyze data for patterns")
    def analyze_data(self, data: List[Dict], analysis_type: str) -> Dict:
        """Perform statistical or ML analysis"""
        # Agent chooses appropriate analysis method
        return {
            "analysis_type": analysis_type,
            "patterns_found": [],
            "confidence": 0.85
        }
    
    @Tool(description="Generate research report")
    def generate_report(self, findings: Dict, format: str = "text") -> str:
        """Compile findings into formatted report"""
        return "Research Report..."
    
    async def autonomous_research(self, research_goal: str):
        """Agent autonomously conducts research"""
        
        # Phase 1: Formulate research strategy
        strategy = await self.formulate_strategy(research_goal)
        self.research_history.append({"phase": "strategy", "content": strategy})
        
        # Phase 2: Gather data from multiple sources
        data = await self.gather_data(strategy)
        self.research_history.append({"phase": "data_gathering", "count": len(data)})
        
        # Phase 3: Analyze data
        analysis = await self.analyze_findings(data)
        self.research_history.append({"phase": "analysis", "patterns": analysis})
        
        # Phase 4: Generate report
        report = self.generate_report(analysis)
        
        return {
            "goal": research_goal,
            "strategy": strategy,
            "data_points": len(data),
            "findings": analysis,
            "report": report
        }
    
    async def formulate_strategy(self, goal: str) -> Dict:
        """Agent creates research strategy"""
        return {
            "sources": ["database", "api", "files"],
            "analysis_methods": ["statistical", "trend"],
            "expected_duration": "5 minutes"
        }
    
    async def gather_data(self, strategy: Dict) -> List[Dict]:
        """Agent gathers data from strategy-specified sources"""
        return []
    
    async def analyze_findings(self, data: List[Dict]) -> Dict:
        """Agent analyzes collected data"""
        return {"trends": [], "anomalies": [], "conclusions": []}

Adaptive Research with Agent Learning

Python - Adaptive Research Agent
from google_adk import Agent, Learning, Adaptation

class AdaptiveResearchAgent(Agent):
    def __init__(self):
        super().__init__()
        self.learning_model = Learning()
        self.adaptation_engine = Adaptation()
    
    async def conduct_research_with_adaptation(self, goal: str):
        """Agent adapts research approach based on findings"""
        
        # Initial data gathering
        initial_data = await self.gather_initial_data(goal)
        
        # Agent learns from initial results
        insights = await self.learning_model.extract_insights(initial_data)
        
        # Adapt research strategy
        if insights["more_data_needed"]:
            # Agent decides to gather more specific data
            refined_sources = await self.adaptation_engine.recommend_sources(insights)
            additional_data = await self.gather_focused_data(refined_sources)
            
            # Combine and re-analyze
            combined = initial_data + additional_data
            final_analysis = await self.deep_analysis(combined)
        else:
            final_analysis = await self.initial_analysis(initial_data)
        
        return {
            "initial_insights": insights,
            "adapted_research": refined_sources if insights["more_data_needed"] else None,
            "final_findings": final_analysis
        }
    
    async def gather_initial_data(self, goal: str):
        """Initial exploratory data gathering"""
        pass
    
    async def gather_focused_data(self, sources: List[str]):
        """Focused data gathering based on initial findings"""
        pass
    
    async def initial_analysis(self, data: List[Dict]):
        """First-pass analysis"""
        pass
    
    async def deep_analysis(self, data: List[Dict]):
        """Deeper analysis after adaptive gathering"""
        pass

Multi-Source Data Integration

Python - Multi-Source Research
from google_adk import DataSource, Agent, Integration
from typing import Dict, Any

class MultiSourceAgent(Agent):
    def __init__(self):
        super().__init__()
        self.integrator = Integration()
        self.data_sources = [
            DataSource("sales_db", "postgresql://sales"),
            DataSource("customer_api", "https://api.customers.com"),
            DataSource("logs", "/data/logs/*.json")
        ]
    
    async def unified_research(self, query: str):
        """Query and correlate data across multiple sources"""
        
        results = {}
        
        # Gather from all sources
        for source in self.data_sources:
            try:
                data = await self.query_source(source, query)
                results[source.name] = data
            except Exception as e:
                results[source.name] = {"error": str(e)}
        
        # Integrate and correlate findings
        correlated = await self.integrator.correlate(results)
        
        # Derive unified insights
        insights = await self.derive_insights(correlated)
        
        return {
            "sources_queried": len(self.data_sources),
            "correlations": correlated,
            "insights": insights
        }
    
    async def query_source(self, source: DataSource, query: str):
        """Query individual source"""
        pass
    
    async def derive_insights(self, correlated: Dict[str, Any]):
        """Extract insights from correlated data"""
        pass

Best Practices

.build() val tokens = withContext(Dispatchers.IO) { authService.performTokenRequest(request) } storeTokensSecurely(tokens) } }

Encrypted Local Storage

Kotlin - Encrypted Shared Preferences
class SecurePreferences(context: Context) {
    private val masterKey = MasterKey.Builder(context)
        .setKeyScheme(MasterKey.KeyScheme.AES256_GCM)
        .build()

    private val preferences = EncryptedSharedPreferences.create(
        context,
        "secret_preferences",
        masterKey,
        EncryptedSharedPreferences.PrefKeyEncryptionScheme.AES256_SIV,
        EncryptedSharedPreferences.PrefValueEncryptionScheme.AES256_GCM
    )

    fun saveApiToken(token: String) {
        preferences.edit().putString("api_token", token).apply()
    }

    fun getApiToken(): String? {
        return preferences.getString("api_token", null)
    }

    fun clearAllData() {
        preferences.edit().clear().apply()
    }
}

Offline-First Sync

Kotlin - WorkManager for Background Sync
class DataSyncWorker(context: Context, params: WorkerParameters) : CoroutineWorker(context, params) {
    override suspend fun doWork(): Result = withContext(Dispatchers.IO) {
        try {
            val pendingChanges = database.getPendingChanges()
            
            for (change in pendingChanges) {
                when (change.type) {
                    ChangeType.CREATE -> api.createRecord(change.data)
                    ChangeType.UPDATE -> api.updateRecord(change.id, change.data)
                    ChangeType.DELETE -> api.deleteRecord(change.id)
                }
                database.markChangeAsSync(change.id)
            }
            
            Result.success()
        } catch (e: Exception) {
            Result.retry()
        }
    }
}

// Schedule sync when device connects to network
val constraints = Constraints.Builder()
    .setRequiredNetworkType(NetworkType.CONNECTED)
    .build()

val syncRequest = PeriodicWorkRequestBuilder<DataSyncWorker>(
    15, TimeUnit.MINUTES
).setConstraints(constraints).build()

WorkManager.getInstance(context).enqueueUniquePeriodicWork(
    "data_sync",
    ExistingPeriodicWorkPolicy.KEEP,
    syncRequest
)

Role-Based Features

Kotlin - Permission Management
data class UserRole(
    val id: String,
    val name: String,
    val permissions: List<String>
)

class PermissionManager(private val repository: UserRepository) {
    suspend fun hasPermission(permission: String): Boolean {
        val user = repository.getCurrentUser()
        return user?.role?.permissions?.contains(permission) ?: false
    }

    fun canAccessFeature(featureName: String): Boolean {
        val requiredPermission = featurePermissionMap[featureName] ?: return true
        return runBlocking { hasPermission(requiredPermission) }
    }
}

@Composable
fun AdminPanel(permissionManager: PermissionManager) {
    if (permissionManager.canAccessFeature("ADMIN_PANEL")) {
        AdminContent()
    } else {
        Text("Access Denied")
    }
}

Audit Logging

Kotlin - Event Logging
data class AuditEvent(
    val userId: String,
    val action: String,
    val resource: String,
    val timestamp: Long = System.currentTimeMillis(),
    val details: Map<String, Any>
)

class AuditLogger(private val repository: AuditRepository) {
    suspend fun logAction(userId: String, action: String, resource: String, details: Map<String, Any> = emptyMap()) {
        val event = AuditEvent(userId, action, resource, details = details)
        repository.save(event)
        // Also sync to server
        apiClient.sendAuditEvent(event)
    }
}

// Usage
auditLogger.logAction(
    userId = currentUser.id,
    action = "VIEW",
    resource = "CUSTOMER_RECORD",
    details = mapOf("customerId" to "12345")
)

Best Practices