Part 7: Agentic Systems & Orchestration

Chapter 39: Task-Oriented Agents (Coding, Research, Ops)

Hire Us
7Part 7: Agentic Systems & Orchestration

39. Task-Oriented Agents (Coding, Research, Ops)

Chapter 39 — Task-Oriented Agents (Coding, Research, Ops)

Overview

While general-purpose agents capture headlines, task-oriented agents deliver measurable business value quickly and reliably. By narrowing the scope to specific domains like coding, research, or operations, you can build robust tooling, define clear success metrics, and implement effective safety controls. This chapter provides practical guidance for building production-ready agents across three high-value domains.

Core Focus:

  • Domain-specific tool design and safety boundaries
  • Evaluation frameworks with quantifiable metrics
  • Human-in-the-loop workflows for high-risk operations
  • Evidence preservation and audit trails
  • Real-world deployment patterns and ROI measurement

Why It Matters

General-purpose agents are exciting, but targeted task agents deliver dependable value fast. Narrow scopes enable robust tools, clear success metrics, and safe operations.

Key Advantages of Task-Oriented Agents:

  • Faster Time-to-Value: Deploy in weeks instead of months
  • Measurable ROI: Clear metrics tied to business outcomes
  • Safer Operations: Bounded capabilities reduce risk surface
  • Easier Evaluation: Domain-specific success criteria
  • Better User Trust: Predictable, reliable behavior builds confidence

Business Impact:

  • Coding agents: 60-70% automation of routine development tasks
  • Research agents: 3-5x faster literature reviews and analysis
  • Ops agents: 35-50% reduction in mean time to acknowledge incidents
  • Cost savings: 50K50K-200K annually per agent deployment

Domain-Specific Agent Patterns

Agent Type Comparison

DomainPrimary Use CasesKey ToolsSuccess MetricsRisk Level
CodingCode review, test generation, refactoringFile read/write, syntax check, test runnerTest pass rate, compilation successMedium
ResearchLiterature review, fact-checking, synthesisSearch, PDF extraction, citation managerAccuracy, coverage, citation qualityLow
OpsIncident triage, runbook execution, monitoringLog query, service restart, alert managerMTTA, MTTR, false positive rateHigh
DataAnalysis, visualization, quality checksSQL, pandas, plottingQuery correctness, insight qualityMedium
ContentWriting, editing, translationGrammar check, style guide, translation APIQuality score, style complianceLow

Domain 1: Coding Agents

Architecture

graph TD A[Code Task Request] --> B[Task Analyzer] B --> C{Task Type} C -->|Q&A| D[Code Search Agent] C -->|Test Gen| E[Test Writer Agent] C -->|Refactor| F[Refactoring Agent] D --> G[Repository Index] E --> H[Test Framework] F --> I[AST Parser] G --> J[Generate Response] H --> K[Generate Tests] I --> L[Generate Diff] J --> M[Human Review] K --> N[Dry-Run Tests] L --> O[Preview Changes] M --> P[Return Result] N --> Q{Tests Pass?} O --> R{Approve?} Q -->|Yes| S[Commit Tests] Q -->|No| E R -->|Yes| T[Apply Refactor] R -->|No| F style B fill:#e1f5ff style D fill:#fff4e1 style E fill:#fff4e1 style F fill:#fff4e1 style N fill:#ffe1e1 style O fill:#ffe1e1

Coding Agent Tool Contracts

Essential Tool Schema:

from pydantic import BaseModel, Field, validator

class FileReadRequest(BaseModel):
    file_path: str = Field(..., description="Absolute path within workspace")
    max_lines: int = Field(default=1000, ge=1, le=10000)

    @validator('file_path')
    def validate_path(cls, v):
        if '..' in str(v) or not str(v).startswith('/workspace/'):
            raise ValueError("Path validation failed")
        return v

class FileWriteRequest(BaseModel):
    file_path: str
    content: str
    dry_run: bool = True  # Always preview first
    requires_approval: bool = True

class TestGenerationRequest(BaseModel):
    source_file: str
    test_framework: Literal["pytest", "unittest", "jest"]
    coverage_target: float = 0.8

Safety Controls:

Control LayerMechanismExample
Path ValidationWorkspace sandboxingBlock ../etc/passwd access
Secret DetectionRegex patternsPrevent committing api_key="..."
Approval GatesHuman review queueFile writes require confirmation
Dry-Run ModePreview before executeShow diff before applying refactor
Test ValidationAutomated test runsEnsure refactors don't break tests

Domain 2: Research Agents

Architecture

graph TD A[Research Query] --> B[Query Analyzer] B --> C[Search Planner] C --> D[Multi-Source Search] D --> E[Academic DBs] D --> F[Web Search] D --> G[Internal Docs] E --> H[Result Aggregator] F --> H G --> H H --> I[Relevance Ranker] I --> J[Content Extractor] J --> K[Claim Extraction] K --> L[Citation Manager] L --> M[Synthesis Engine] M --> N[Fact Checker] N --> O[Draft Generator] O --> P[Quality Review] P --> Q{Quality OK?} Q -->|No| M Q -->|Yes| R[Final Report] style B fill:#e1f5ff style I fill:#fff4e1 style N fill:#ffe1e1 style R fill:#e8f5e9

Research Agent Tools

from typing import List, Dict, Optional
from pydantic import BaseModel, HttpUrl, Field
from datetime import datetime

class SearchRequest(BaseModel):
    """Multi-source search request."""
    query: str = Field(..., max_length=500)
    sources: List[str] = Field(default=["semantic_scholar", "pubmed", "arxiv"])
    date_range: Optional[tuple[datetime, datetime]] = None
    max_results: int = Field(default=20, ge=1, le=100)
    require_full_text: bool = False

class ClaimExtractionRequest(BaseModel):
    """Extract claims from document."""
    document_text: str
    extract_evidence: bool = True
    confidence_threshold: float = Field(default=0.7, ge=0.0, le=1.0)
    max_claims: int = Field(default=10, ge=1, le=50)

class Citation(BaseModel):
    """Structured citation."""
    title: str
    authors: List[str]
    year: int
    venue: str
    url: Optional[HttpUrl]
    doi: Optional[str]
    citation_key: str  # Auto-generated

class Claim(BaseModel):
    """Extracted claim with evidence."""
    claim_text: str
    evidence: List[str]
    confidence: float
    citations: List[Citation]
    claim_type: str  # finding, methodology, limitation, etc.

class ResearchSynthesisRequest(BaseModel):
    """Synthesize research findings."""
    claims: List[Claim]
    synthesis_type: Literal["literature_review", "meta_analysis", "summary"]
    include_gaps: bool = True
    include_contradictions: bool = True
    max_length: int = Field(default=2000, ge=100, le=10000)

class ResearchAgentTools:
    """
    Tools for research agents.
    """
    def __init__(self, api_keys: Dict[str, str]):
        self.api_keys = api_keys
        self.citation_count = 0

    def search_literature(self, request: SearchRequest) -> Dict[str, Any]:
        """Search across academic databases."""
        all_results = []

        for source in request.sources:
            if source == "semantic_scholar":
                results = self._search_semantic_scholar(request)
            elif source == "pubmed":
                results = self._search_pubmed(request)
            elif source == "arxiv":
                results = self._search_arxiv(request)
            else:
                continue

            all_results.extend(results)

        # Deduplicate by DOI/title
        deduped = self._deduplicate_papers(all_results)

        # Rank by relevance
        ranked = self._rank_by_relevance(deduped, request.query)

        return {
            "query": request.query,
            "total_results": len(ranked),
            "papers": ranked[:request.max_results],
            "sources_searched": request.sources
        }

    def extract_claims(self, request: ClaimExtractionRequest) -> List[Claim]:
        """Extract verifiable claims from text."""
        # Use LLM to extract structured claims
        prompt = f"""Extract key claims from this research text:

{request.document_text[:3000]}

For each claim:
1. State the claim clearly
2. Provide supporting evidence from text
3. Classify claim type (finding/methodology/limitation)
4. Assign confidence (0-1)

Return JSON array of claims."""

        # LLM call
        extracted = self._call_llm(prompt)

        claims = []
        for item in extracted:
            if item["confidence"] >= request.confidence_threshold:
                claims.append(Claim(
                    claim_text=item["claim"],
                    evidence=item["evidence"] if request.extract_evidence else [],
                    confidence=item["confidence"],
                    citations=[],  # Added later
                    claim_type=item["type"]
                ))

        return claims[:request.max_claims]

    def manage_citations(self, papers: List[Dict]) -> List[Citation]:
        """Generate structured citations."""
        citations = []

        for paper in papers:
            self.citation_count += 1

            citation = Citation(
                title=paper["title"],
                authors=paper.get("authors", []),
                year=paper.get("year", datetime.now().year),
                venue=paper.get("venue", "Unknown"),
                url=paper.get("url"),
                doi=paper.get("doi"),
                citation_key=f"ref{self.citation_count}"
            )

            citations.append(citation)

        return citations

    def synthesize_research(self, request: ResearchSynthesisRequest) -> Dict[str, Any]:
        """Synthesize claims into coherent narrative."""
        # Group claims by type
        by_type = {}
        for claim in request.claims:
            by_type.setdefault(claim.claim_type, []).append(claim)

        # Identify contradictions
        contradictions = []
        if request.include_contradictions:
            contradictions = self._find_contradictions(request.claims)

        # Identify gaps
        gaps = []
        if request.include_gaps:
            gaps = self._identify_research_gaps(request.claims)

        # Generate synthesis
        prompt = f"""Synthesize these research findings into a {request.synthesis_type}:

Findings ({len(by_type.get('finding', []))} claims):
{self._format_claims(by_type.get('finding', []))}

Methodologies ({len(by_type.get('methodology', []))} claims):
{self._format_claims(by_type.get('methodology', []))}

Limitations ({len(by_type.get('limitation', []))} claims):
{self._format_claims(by_type.get('limitation', []))}

Contradictions found: {len(contradictions)}
Research gaps identified: {len(gaps)}

Generate a {request.max_length}-word synthesis with proper citations."""

        synthesis_text = self._call_llm(prompt)

        return {
            "synthesis": synthesis_text,
            "synthesis_type": request.synthesis_type,
            "claims_used": len(request.claims),
            "contradictions": contradictions,
            "research_gaps": gaps,
            "word_count": len(synthesis_text.split())
        }

    def fact_check_claim(self, claim: str, sources: List[Citation]) -> Dict[str, Any]:
        """Verify claim against sources."""
        # Search for supporting/contradicting evidence
        evidence_for = []
        evidence_against = []

        for source in sources:
            # Retrieve full text or abstract
            content = self._fetch_paper_content(source)

            # Check if claim is supported
            verification = self._verify_claim_in_text(claim, content)

            if verification["supports"]:
                evidence_for.append({
                    "source": source.citation_key,
                    "excerpt": verification["excerpt"],
                    "confidence": verification["confidence"]
                })
            elif verification["contradicts"]:
                evidence_against.append({
                    "source": source.citation_key,
                    "excerpt": verification["excerpt"],
                    "confidence": verification["confidence"]
                })

        # Calculate overall confidence
        total_evidence = len(evidence_for) + len(evidence_against)
        confidence = len(evidence_for) / total_evidence if total_evidence > 0 else 0

        return {
            "claim": claim,
            "verified": confidence > 0.6,
            "confidence": confidence,
            "evidence_for": evidence_for,
            "evidence_against": evidence_against,
            "sources_checked": len(sources)
        }

    def _deduplicate_papers(self, papers: List[Dict]) -> List[Dict]:
        """Remove duplicate papers."""
        seen_dois = set()
        seen_titles = set()
        unique = []

        for paper in papers:
            doi = paper.get("doi")
            title = paper.get("title", "").lower().strip()

            if doi and doi in seen_dois:
                continue
            if title and title in seen_titles:
                continue

            if doi:
                seen_dois.add(doi)
            if title:
                seen_titles.add(title)

            unique.append(paper)

        return unique

    def _rank_by_relevance(self, papers: List[Dict], query: str) -> List[Dict]:
        """Rank papers by relevance to query."""
        # Simplified - would use embeddings in production
        scored = []
        for paper in papers:
            score = self._compute_relevance_score(paper, query)
            scored.append((score, paper))

        scored.sort(reverse=True, key=lambda x: x[0])
        return [paper for score, paper in scored]

    def _compute_relevance_score(self, paper: Dict, query: str) -> float:
        """Compute relevance score (simplified)."""
        title = paper.get("title", "").lower()
        abstract = paper.get("abstract", "").lower()
        query_lower = query.lower()

        score = 0.0
        # Title match
        if query_lower in title:
            score += 1.0
        # Abstract match
        if query_lower in abstract:
            score += 0.5
        # Citation count (if available)
        score += min(paper.get("citation_count", 0) / 100, 0.5)

        return score

    def _find_contradictions(self, claims: List[Claim]) -> List[Dict]:
        """Identify contradictory claims."""
        contradictions = []

        for i, claim1 in enumerate(claims):
            for claim2 in claims[i+1:]:
                if self._claims_contradict(claim1, claim2):
                    contradictions.append({
                        "claim_1": claim1.claim_text,
                        "claim_2": claim2.claim_text,
                        "confidence": min(claim1.confidence, claim2.confidence)
                    })

        return contradictions

    def _claims_contradict(self, claim1: Claim, claim2: Claim) -> bool:
        """Check if two claims contradict."""
        # Simplified - would use semantic similarity and entailment
        return False

    def _identify_research_gaps(self, claims: List[Claim]) -> List[str]:
        """Identify gaps in research coverage."""
        # Use LLM to analyze claim coverage
        prompt = f"""Analyze these research claims and identify gaps:

{self._format_claims(claims)}

What areas are under-researched or need further investigation?
Return list of 3-5 research gaps."""

        gaps = self._call_llm(prompt)
        return gaps

    def _format_claims(self, claims: List[Claim]) -> str:
        """Format claims for prompt."""
        return '\n'.join(
            f"- {claim.claim_text} (confidence: {claim.confidence:.2f})"
            for claim in claims
        )

    def _call_llm(self, prompt: str) -> Any:
        """Call LLM."""
        # Mock implementation
        return []

    def _search_semantic_scholar(self, request: SearchRequest) -> List[Dict]:
        """Search Semantic Scholar API."""
        # Implementation would use actual API
        return []

    def _search_pubmed(self, request: SearchRequest) -> List[Dict]:
        """Search PubMed API."""
        return []

    def _search_arxiv(self, request: SearchRequest) -> List[Dict]:
        """Search arXiv API."""
        return []

    def _fetch_paper_content(self, citation: Citation) -> str:
        """Fetch full text or abstract."""
        return ""

    def _verify_claim_in_text(self, claim: str, text: str) -> Dict:
        """Verify if claim is supported by text."""
        return {"supports": False, "contradicts": False, "excerpt": "", "confidence": 0.0}

Domain 3: Operations Agents

Architecture

graph TD A[Alert/Incident] --> B[Triage Agent] B --> C[Severity Classifier] C --> D{Severity} D -->|P0/P1| E[Immediate Escalation] D -->|P2/P3| F[Runbook Retrieval] F --> G[Context Gatherer] G --> H[Log Analyzer] G --> I[Metric Checker] G --> J[Service Map] H --> K[Root Cause Analyzer] I --> K J --> K K --> L[Remediation Planner] L --> M{Auto-Remediate?} M -->|Safe| N[Execute Actions] M -->|Risky| O[Human Approval] N --> P[Validate Fix] O --> Q[Await Approval] Q --> N P --> R{Resolved?} R -->|Yes| S[Close Incident] R -->|No| T[Escalate] style C fill:#e1f5ff style K fill:#fff4e1 style M fill:#ffe1e1 style P fill:#ffe1e1 style S fill:#e8f5e9

Operations Agent Tools

from enum import Enum
from typing import List, Dict, Optional, Literal
from pydantic import BaseModel, Field
from datetime import datetime, timedelta

class Severity(Enum):
    P0 = "critical"
    P1 = "high"
    P2 = "medium"
    P3 = "low"

class Alert(BaseModel):
    """Incoming alert."""
    alert_id: str
    title: str
    description: str
    service: str
    timestamp: datetime
    raw_data: Dict[str, Any]

class IncidentContext(BaseModel):
    """Gathered context for incident."""
    alert: Alert
    recent_logs: List[str]
    metrics: Dict[str, Any]
    related_services: List[str]
    recent_deployments: List[Dict]

class RemediationAction(BaseModel):
    """Proposed remediation."""
    action_type: Literal["restart", "scale", "rollback", "config_change", "manual"]
    target: str
    parameters: Dict[str, Any]
    risk_level: Literal["safe", "moderate", "high"]
    requires_approval: bool
    dry_run_available: bool

class OpsAgentTools:
    """
    Tools for operations agents.
    """
    def __init__(self, log_client, metrics_client, k8s_client):
        self.logs = log_client
        self.metrics = metrics_client
        self.k8s = k8s_client

    def classify_severity(self, alert: Alert) -> Severity:
        """Classify alert severity."""
        # Use rules + LLM for classification
        rules_score = self._apply_severity_rules(alert)

        # LLM classification
        prompt = f"""Classify this alert severity (P0=critical, P1=high, P2=medium, P3=low):

Title: {alert.title}
Description: {alert.description}
Service: {alert.service}

Consider:
- User impact
- Service criticality
- Error rate
- Duration

Return JSON: {{"severity": "P0/P1/P2/P3", "reasoning": "..."}}"""

        llm_result = self._call_llm(prompt)

        # Combine rules and LLM
        final_severity = max(rules_score, llm_result["severity"])

        return Severity[final_severity]

    def gather_context(self, alert: Alert) -> IncidentContext:
        """Gather diagnostic context."""
        # Fetch recent logs
        logs = self.logs.query(
            service=alert.service,
            level="ERROR",
            time_range=(alert.timestamp - timedelta(minutes=15), alert.timestamp),
            limit=100
        )

        # Fetch metrics
        metrics = self.metrics.query(
            service=alert.service,
            metrics=["error_rate", "latency_p99", "cpu_usage", "memory_usage"],
            time_range=(alert.timestamp - timedelta(hours=1), alert.timestamp)
        )

        # Get related services
        related = self._get_related_services(alert.service)

        # Check recent deployments
        deployments = self._get_recent_deployments(alert.service, hours=24)

        return IncidentContext(
            alert=alert,
            recent_logs=logs,
            metrics=metrics,
            related_services=related,
            recent_deployments=deployments
        )

    def analyze_root_cause(self, context: IncidentContext) -> Dict[str, Any]:
        """Analyze root cause using logs and metrics."""
        prompt = f"""Analyze this incident and identify root cause:

Alert: {context.alert.title}
Service: {context.alert.service}

Recent Errors:
{chr(10).join(context.recent_logs[:20])}

Metrics:
- Error Rate: {context.metrics.get('error_rate')}
- P99 Latency: {context.metrics.get('latency_p99')}
- CPU: {context.metrics.get('cpu_usage')}
- Memory: {context.metrics.get('memory_usage')}

Recent Deployments:
{chr(10).join(str(d) for d in context.recent_deployments)}

Provide:
1. Most likely root cause
2. Contributing factors
3. Confidence level (0-1)
4. Supporting evidence

Return JSON."""

        analysis = self._call_llm(prompt)

        return {
            "root_cause": analysis.get("root_cause"),
            "factors": analysis.get("factors", []),
            "confidence": analysis.get("confidence", 0.5),
            "evidence": analysis.get("evidence", [])
        }

    def propose_remediation(
        self,
        context: IncidentContext,
        root_cause: Dict[str, Any]
    ) -> List[RemediationAction]:
        """Propose remediation actions."""
        actions = []

        # Check runbooks
        runbook = self._fetch_runbook(context.alert.service, root_cause["root_cause"])

        if runbook:
            # Extract actions from runbook
            actions.extend(self._parse_runbook_actions(runbook))

        # LLM-generated actions
        prompt = f"""Propose remediation for this incident:

Root Cause: {root_cause['root_cause']}
Service: {context.alert.service}
Confidence: {root_cause['confidence']}

Available actions:
- restart_service
- scale_up
- scale_down
- rollback_deployment
- update_config
- drain_traffic

For each action, specify:
1. Action type
2. Parameters
3. Risk level (safe/moderate/high)
4. Whether approval needed

Return JSON array of actions."""

        llm_actions = self._call_llm(prompt)

        for action_data in llm_actions:
            actions.append(RemediationAction(
                action_type=action_data["type"],
                target=context.alert.service,
                parameters=action_data["parameters"],
                risk_level=action_data["risk_level"],
                requires_approval=action_data["risk_level"] in ["moderate", "high"],
                dry_run_available=action_data["type"] in ["restart", "scale", "config_change"]
            ))

        return actions

    def execute_action(
        self,
        action: RemediationAction,
        dry_run: bool = True
    ) -> Dict[str, Any]:
        """Execute remediation action."""
        if action.requires_approval and not dry_run:
            return {
                "status": "pending_approval",
                "action": action.action_type,
                "message": "Human approval required"
            }

        if dry_run and action.dry_run_available:
            # Simulate action
            return self._dry_run_action(action)

        # Execute actual action
        try:
            if action.action_type == "restart":
                result = self.k8s.restart_service(
                    action.target,
                    **action.parameters
                )
            elif action.action_type == "scale":
                result = self.k8s.scale_service(
                    action.target,
                    replicas=action.parameters["replicas"]
                )
            elif action.action_type == "rollback":
                result = self.k8s.rollback_deployment(
                    action.target,
                    revision=action.parameters.get("revision")
                )
            else:
                return {"error": f"Unknown action type: {action.action_type}"}

            return {
                "status": "executed",
                "action": action.action_type,
                "result": result
            }

        except Exception as e:
            return {
                "status": "failed",
                "action": action.action_type,
                "error": str(e)
            }

    def validate_remediation(
        self,
        action: RemediationAction,
        wait_time: int = 60
    ) -> Dict[str, Any]:
        """Validate that remediation resolved the issue."""
        import time

        # Wait for changes to take effect
        time.sleep(wait_time)

        # Check metrics
        current_metrics = self.metrics.query(
            service=action.target,
            metrics=["error_rate", "latency_p99"],
            time_range=(datetime.now() - timedelta(minutes=5), datetime.now())
        )

        # Check for new alerts
        new_alerts = self._check_for_alerts(action.target, minutes=5)

        error_rate = current_metrics.get("error_rate", 0)
        latency = current_metrics.get("latency_p99", 0)

        resolved = (
            error_rate < 0.01 and  # Less than 1% errors
            latency < 1000 and     # Less than 1s P99
            len(new_alerts) == 0   # No new alerts
        )

        return {
            "resolved": resolved,
            "error_rate": error_rate,
            "latency_p99": latency,
            "new_alerts": len(new_alerts),
            "validation_time": wait_time
        }

    def _apply_severity_rules(self, alert: Alert) -> str:
        """Apply rule-based severity classification."""
        # Simplified rules
        if "outage" in alert.title.lower() or "down" in alert.title.lower():
            return "P0"
        if alert.service in ["payment", "auth", "api-gateway"]:
            return "P1"
        return "P2"

    def _get_related_services(self, service: str) -> List[str]:
        """Get services that depend on or are depended by this service."""
        # Would query service mesh or dependency graph
        return []

    def _get_recent_deployments(self, service: str, hours: int) -> List[Dict]:
        """Get recent deployments for service."""
        # Would query deployment history
        return []

    def _fetch_runbook(self, service: str, issue: str) -> Optional[Dict]:
        """Fetch runbook for service/issue."""
        # Would query runbook database
        return None

    def _parse_runbook_actions(self, runbook: Dict) -> List[RemediationAction]:
        """Extract actions from runbook."""
        return []

    def _dry_run_action(self, action: RemediationAction) -> Dict[str, Any]:
        """Simulate action execution."""
        return {
            "status": "dry_run",
            "action": action.action_type,
            "estimated_impact": "Service will restart, ~30s downtime",
            "rollback_plan": "Previous deployment available"
        }

    def _check_for_alerts(self, service: str, minutes: int) -> List[Alert]:
        """Check for recent alerts."""
        return []

    def _call_llm(self, prompt: str) -> Any:
        """Call LLM."""
        return {}

Reliability & Cost Controls

Budget Management

class AgentBudgetManager:
    """
    Manage costs and enforce budgets for agent operations.
    """
    def __init__(self, daily_budget: float, cost_per_1k_tokens: float):
        self.daily_budget = daily_budget
        self.cost_per_1k_tokens = cost_per_1k_tokens
        self.daily_spend = 0.0
        self.task_costs = []

    def check_budget(self, estimated_tokens: int) -> tuple[bool, str]:
        """Check if task fits within budget."""
        estimated_cost = (estimated_tokens / 1000) * self.cost_per_1k_tokens

        if self.daily_spend + estimated_cost > self.daily_budget:
            remaining = self.daily_budget - self.daily_spend
            return False, f"Insufficient budget. Remaining: ${remaining:.2f}"

        return True, ""

    def record_usage(self, tokens_used: int, task_id: str):
        """Record token usage."""
        cost = (tokens_used / 1000) * self.cost_per_1k_tokens
        self.daily_spend += cost
        self.task_costs.append({
            "task_id": task_id,
            "tokens": tokens_used,
            "cost": cost,
            "timestamp": datetime.now()
        })

    def get_metrics(self) -> Dict[str, Any]:
        """Get budget metrics."""
        return {
            "daily_spend": self.daily_spend,
            "daily_budget": self.daily_budget,
            "utilization": self.daily_spend / self.daily_budget,
            "tasks_completed": len(self.task_costs),
            "avg_cost_per_task": (
                self.daily_spend / len(self.task_costs)
                if self.task_costs else 0
            )
        }

Case Study: Operations Agent

Problem Statement

A SaaS company received 200-300 alerts daily across their microservices architecture. The oncall engineers spent 60-70% of their time on alert triage and routine remediation, leading to:

  • High oncall burnout
  • Slow response to critical issues (MTTA: 45 minutes)
  • Missed patterns in recurring incidents
  • Inconsistent remediation quality

Solution Implementation

They deployed an operations agent with:

  1. Auto-triage: Classify alerts by severity and route appropriately
  2. Context gathering: Automatically fetch logs, metrics, and deployment history
  3. Root cause analysis: Use LLM to analyze patterns
  4. Safe remediation: Execute low-risk actions automatically, escalate risky ones
  5. Validation: Verify fixes before closing incidents

Results

Before Agent:

  • MTTA (Mean Time to Acknowledge): 45 minutes
  • MTTR (Mean Time to Resolve): 3.5 hours
  • Alerts triaged daily: 100% manual (200-300 alerts)
  • Oncall engineer workload: 6-8 hours/day

After Agent (3 months):

  • MTTA: 29 minutes (35% reduction)
  • MTTR: 2.1 hours (40% reduction)
  • Auto-triaged: 75% of alerts (150-225 alerts)
  • Auto-resolved: 40% of P3 incidents
  • Oncall workload: 3-4 hours/day (50% reduction)
  • False positive suppression: 85% of noisy alerts
  • Cost: $800/month (LLM API costs)
  • ROI: $15K/month (reduced oncall hours)

Key Learnings:

  1. Start with read-only analysis before auto-remediation
  2. Implement comprehensive dry-run testing
  3. Build trust gradually with low-risk actions
  4. Maintain detailed audit logs for retrospectives
  5. Use feedback loops to improve classification accuracy

Implementation Checklist

Phase 1: Domain Selection & Scoping (Week 1)

  • Choose target domain (coding/research/ops)
  • Identify 3-5 high-value, repetitive tasks
  • Define success metrics and ROI targets
  • Assess risk levels for each task
  • Document current manual process

Phase 2: Tool Development (Weeks 2-3)

  • Design tool contracts with JSON schemas
  • Implement read-only tools first
  • Add dry-run capabilities for write operations
  • Build tool validation and testing
  • Create tool documentation

Phase 3: Safety & Approval Workflows (Week 4)

  • Define risk levels for each tool
  • Implement approval workflows for high-risk actions
  • Add output validation and filtering
  • Build comprehensive audit logging
  • Set up alerts for anomalous behavior

Phase 4: Evaluation Framework (Week 5)

  • Create task suite with 20-50 test cases
  • Define gold standards or validation functions
  • Implement automated evaluation pipeline
  • Build offline replay capability
  • Set up dashboards for metrics tracking

Phase 5: Agent Implementation (Week 6)

  • Choose agent pattern (ReAct, Plan-Execute, etc.)
  • Implement core agent logic
  • Add retry and error handling
  • Integrate budget management
  • Build monitoring and alerting

Phase 6: Testing & Validation (Week 7)

  • Run offline evaluation suite
  • Conduct manual testing on diverse scenarios
  • Validate safety controls
  • Test approval workflows
  • Measure cost per task

Phase 7: Pilot Deployment (Week 8)

  • Deploy to staging environment
  • Run shadow mode (observe without acting)
  • A/B test on small percentage of tasks
  • Collect user feedback
  • Iterate based on failures

Phase 8: Production Rollout (Week 9)

  • Gradual rollout (10% → 50% → 100%)
  • Monitor success rate, cost, latency
  • Maintain human-in-the-loop review queue
  • Document known limitations
  • Train users and stakeholders

Best Practices

Tool Design

  1. Start Read-Only: Begin with information retrieval before enabling writes
  2. Explicit Schemas: Use Pydantic or JSON Schema for all parameters
  3. Idempotency: Design tools to be safely retryable
  4. Error Taxonomy: Structured errors enable better retry logic
  5. Versioning: Version tool interfaces for backward compatibility

Evaluation

  1. Domain-Specific Metrics: Code quality, research accuracy, ops MTTA/MTTR
  2. Task Suites: Maintain diverse, representative test cases
  3. Regression Testing: Replay historical traces on new versions
  4. Human Validation: Sample-based review for quality assurance
  5. Cost Tracking: Monitor per-task costs and optimize

Safety

  1. Defense in Depth: Multiple layers of validation
  2. Approval Gates: Human-in-the-loop for irreversible actions
  3. Dry-Run First: Preview changes before executing
  4. Audit Everything: Comprehensive logging for debugging
  5. Graceful Degradation: Fallback to manual process on failures

Cost Optimization

  1. Caching: Store and reuse expensive API results
  2. Lightweight Models: Use smaller models for planning/routing
  3. Early Stopping: Halt on low-confidence signals
  4. Batching: Process multiple items together
  5. Budget Caps: Hard limits on daily/per-task spending

Common Pitfalls

PitfallImpactSolution
Overly broad scopeLow success rate, high complexityStart narrow, expand gradually
Weak tool contractsBrittle execution, hard to debugStrict JSON schemas, validation
No dry-runDangerous production changesImplement preview for all writes
Poor error handlingAgent crashes, wastes budgetStructured errors, retry logic
Inadequate evaluationUnknown failure modesComprehensive task suites
Missing audit logsCan't debug or explainLog all inputs, outputs, decisions
No approval workflowRisky autonomous actionsHuman gates for high-risk tools
Cost overrunsExceeds budgetBudget caps, early stopping

Further Reading

  • Frameworks: LangChain Agents, AutoGPT, GPT-Engineer
  • Coding Agents: GitHub Copilot, Amazon CodeWhisperer, Cursor
  • Research Tools: Semantic Scholar API, Elicit, Consensus
  • Ops Automation: PagerDuty Runbooks, Blameless, Shoreline.io