Part 7: Agentic Systems & Orchestration

Chapter 43: Competitive Intelligence & Research Bots

Hire Us
7Part 7: Agentic Systems & Orchestration

43. Competitive Intelligence & Research Bots

Chapter 43 — Competitive Intelligence & Research Bots

Overview

Competitive intelligence (CI) and research bots represent a critical capability for modern enterprises. These autonomous systems monitor competitors, track market trends, extract insights from public data, and surface actionable intelligence—all while respecting legal boundaries and maintaining data provenance.

This chapter covers the full lifecycle of building compliant, effective CI and research systems:

  • Intelligent Crawling: Politeness policies, deduplication, and change detection
  • Structured Extraction: Entity recognition, price tracking, and claim identification
  • Evidence Preservation: Snapshots, diffs, and chain-of-custody for audit trails
  • Grounded Summarization: Citations, confidence scoring, and contradiction detection
  • Legal Compliance: robots.txt, ToS respect, and PII filtering

Why It Matters

Competitive intelligence thrives on freshness and accuracy. Legal and ethical constraints require careful scoping, evidence capture, and respect for site policies.

Business Value & Risks

Value CategoryImpactMetric
Research Time90% reduction20 hours/week → 2 hours/week
Competitor Coverage5x increase5 competitors → 25 competitors
Update FrequencyReal-time vs. weekly2 hours vs. 7 days to detect changes
Insights Generated24x increase5/month → 120/month actionable insights
Sources Monitored10x increase500 products → 10,000 products

Critical Risks:

Risk CategoryImpactMitigationPriority
ToS ViolationsLegal action, IP blocksStrict robots.txt compliance, 10 req/min limitsCritical
Inaccurate IntelligenceBad decisionsCitation linking, confidence scoringHigh
Stale DataOutdated insightsHourly change detection, timestampsHigh
PII ExposureGDPR finesPII filtering before storageCritical
Misleading SummariesStrategic errorsContradiction checking, fact verificationHigh

Architecture

graph TB subgraph "Source Discovery" A[Source Registry] --> B[Sitemap Parser] A --> C[RSS Feed Monitor] A --> D[API Connector] A --> E[Manual URL Queue] end subgraph "Crawl Orchestrator" B --> F[Crawl Scheduler] C --> F D --> F E --> F F --> G[Politeness Engine] G --> H[robots.txt Checker] G --> I[Rate Limiter] G --> J[Allowlist Filter] end subgraph "Content Processing" J --> K[Content Fetcher] K --> L[Deduplicator] L --> M[Canonicalizer] M --> N[Change Detector] N --> O{Changed?} O -->|Yes| P[Structured Extractors] O -->|No| Q[Skip Processing] end subgraph "Extraction Layer" P --> R[Entity Extractor] P --> S[Price Extractor] P --> T[Feature Extractor] P --> U[Claim Extractor] R --> V[Structured Data Store] S --> V T --> V U --> V end subgraph "Intelligence Generation" V --> W[Change Analyzer] V --> X[Summarizer] W --> Y[Alert Generator] X --> Z[Citation Linker] Z --> AA[Confidence Scorer] AA --> AB[Contradiction Checker] end subgraph "Evidence & Audit" K --> AC[Snapshot Store] M --> AD[URL Canonicalization Log] N --> AE[Diff Store] V --> AF[Provenance Tracker] AC --> AG[Evidence Archive] AD --> AG AE --> AG AF --> AG end subgraph "Analyst Interface" Y --> AH[Alert Dashboard] AB --> AI[Intelligence Report] AG --> AJ[Audit Viewer] end style G fill:#ffe1e1 style V fill:#e1f5ff style AG fill:#e1ffe1 style AB fill:#fff3e1

Core Components

1. Source Discovery & Management

Source Types & Crawl Frequencies:

Source TypeFrequencyPriorityUse Case
Product PagesHourly10Price monitoring
Press ReleasesDaily8New announcements
Blog PostsDaily6Thought leadership
SitemapsWeekly5Comprehensive coverage
RSS FeedsHourly9Real-time updates
APIsHourly10Official data

Implementation Pattern:

class SourceRegistry:
    def register_source(self, url: str, competitor: str, source_type: str, priority: int = 5):
        """Register intelligence source with crawl frequency"""
        source = {
            'url': url,
            'competitor': competitor,
            'type': source_type,
            'priority': priority,
            'frequency': self._get_frequency(source_type),
            'last_crawled': None
        }
        self.sources[self._generate_id(source)] = source

    def get_sources_to_crawl(self) -> List[dict]:
        """Get sources due for crawl, sorted by priority"""
        now = datetime.utcnow()
        due_sources = [
            s for s in self.sources.values()
            if not s['last_crawled'] or (now - s['last_crawled']) >= s['frequency']
        ]
        return sorted(due_sources, key=lambda s: s['priority'], reverse=True)

class SitemapParser:
    async def discover_and_parse(self, domain: str) -> List[str]:
        """Discover sitemaps from robots.txt and parse URLs"""
        # Check robots.txt for sitemap directive
        sitemaps = await self._parse_robots_txt(f"https://{domain}/robots.txt")

        # Try common locations
        sitemaps += await self._try_common_locations(domain)

        # Parse all sitemaps and extract URLs
        urls = []
        for sitemap_url in sitemaps:
            urls.extend(await self._parse_sitemap_xml(sitemap_url))

        return urls

2. Politeness Engine

Politeness Checks (Sequential):

graph TD A[URL Request] --> B{In Allowlist?} B -->|No| C[Reject: Not Allowed] B -->|Yes| D{robots.txt OK?} D -->|No| E[Reject: Disallowed] D -->|Yes| F{Rate Limit OK?} F -->|No| G[Wait Until Window Opens] F -->|Yes| H[Allow Crawl] G --> F style C fill:#ffe1e1 style E fill:#ffe1e1 style H fill:#e1f5ff

Rate Limits & Rules:

CheckRuleAction
AllowlistDomain must be in allowlistReject if not present
robots.txtMust respect User-agent rulesCheck cache (24h TTL)
Rate LimitMax 10 requests/minute per domainWait if exceeded
User-AgentIdentify as "ResearchBot/1.0"Include contact URL
Cache robots.txt24 hour TTLRefresh daily

Implementation Pattern:

class PolitenessEngine:
    async def can_fetch(self, url: str) -> dict:
        """3-layer politeness check"""
        domain = urlparse(url).netloc

        # 1. Allowlist check
        if domain not in self.allowlist:
            return {'can_fetch': False, 'reason': 'not_in_allowlist'}

        # 2. robots.txt check (24h cache)
        if not await self._check_robots(url):
            return {'can_fetch': False, 'reason': 'disallowed_by_robots'}

        # 3. Rate limit (10 req/min per domain)
        if not await self._check_rate_limit(domain):
            return {'can_fetch': False, 'reason': 'rate_limit_exceeded'}

        return {'can_fetch': True}

    async def _check_robots(self, url: str) -> bool:
        """Check robots.txt with 24h cache"""
        domain = urlparse(url).netloc
        if domain in self.robots_cache:
            cache_entry = self.robots_cache[domain]
            if (datetime.utcnow() - cache_entry['fetched_at']) < timedelta(hours=24):
                return cache_entry['parser'].can_fetch(self.user_agent, url)

        # Fetch and cache robots.txt
        rp = RobotFileParser()
        rp.set_url(f"https://{domain}/robots.txt")
        rp.read()
        self.robots_cache[domain] = {'parser': rp, 'fetched_at': datetime.utcnow()}
        return rp.can_fetch(self.user_agent, url)

    async def _check_rate_limit(self, domain: str) -> bool:
        """10 requests per minute sliding window"""
        now = datetime.utcnow()
        limiter = self.rate_limiters.get(domain, {'rpm': 10, 'requests': []})

        # Remove requests outside 1-minute window
        limiter['requests'] = [t for t in limiter['requests'] if (now - t).seconds < 60]

        # Wait if limit exceeded
        if len(limiter['requests']) >= limiter['rpm']:
            await asyncio.sleep(6)  # Wait 6 seconds
            return await self._check_rate_limit(domain)

        limiter['requests'].append(now)
        return True

3. Content Processing Pipeline

URL Canonicalization Rules:

TransformationExampleReason
Remove tracking params?utm_source=twitter → ``Deduplication
Sort query params?b=2&a=1?a=1&b=2Consistent hashing
Remove trailing slash/page//pageNormalize paths
Lowercase domainExample.COMexample.comCase-insensitive

Change Detection:

graph LR A[Fetch Content] --> B[Calculate Hash] B --> C{Previous Snapshot?} C -->|No| D[First Seen] C -->|Yes| E{Hash Match?} E -->|Yes| F[No Changes - Skip] E -->|No| G[Generate Diff] G --> H[Store New Snapshot] D --> H style F fill:#e1ffe1 style G fill:#ffe1e1 style H fill:#e1f5ff

Implementation Pattern:

class ContentDeduplicator:
    def canonicalize_url(self, url: str) -> str:
        """Remove tracking params, sort query, normalize"""
        tracking_params = {'utm_source', 'utm_medium', 'fbclid', 'gclid', 'ref'}
        parsed = urlparse(url)
        params = {k: v for k, v in parse_qs(parsed.query).items() if k not in tracking_params}
        canonical = f"{parsed.scheme}://{parsed.netloc.lower()}{parsed.path.rstrip('/')}"
        if params:
            canonical += f"?{urlencode(sorted(params.items()))}"
        return canonical

    def is_duplicate(self, url: str, content: str) -> bool:
        """Check if content hash exists"""
        canonical = self.canonicalize_url(url)
        content_hash = hashlib.sha256(' '.join(content.split()).encode()).hexdigest()
        return content_hash in self.content_hashes

class ChangeDetector:
    async def detect_changes(self, url: str, content: str) -> dict:
        """Compare with previous snapshot"""
        previous = await self.snapshot_store.get_latest(url)
        if not previous:
            return {'is_new': True, 'has_changes': False}

        current_hash = hashlib.sha256(content.encode()).hexdigest()
        if current_hash == previous['hash']:
            return {'is_new': False, 'has_changes': False}

        # Generate diff
        diff = difflib.unified_diff(previous['content'].splitlines(), content.splitlines())
        return {'is_new': False, 'has_changes': True, 'diff': list(diff)[:100]}

4. Structured Extraction

Extraction Targets:

ExtractorTarget DataSelectorsConfidence
PriceAmount + currency.price, [itemprop="price"]High
FeaturesProduct specs.features li, .specs tdMedium
ClaimsPerformance metricsSentences with %, $, "faster"Variable
EntitiesCompanies, productsCapitalized words, proper nounsMedium

Implementation Pattern:

class EntityExtractor:
    def extract(self, url: str, html: str, competitor: str) -> dict:
        """Extract using registered CSS selectors"""
        soup = BeautifulSoup(html, 'html.parser')
        rules = self.extraction_rules.get(competitor, {})
        data = {}

        for field, selector_config in rules.items():
            element = soup.select_one(selector_config['selector'])
            data[field] = element.get_text(strip=True) if element else None

        return {'url': url, 'competitor': competitor, 'data': data}

class PriceExtractor:
    def extract_prices(self, html: str) -> List[dict]:
        """Extract prices from common selectors"""
        selectors = ['.price', '.product-price', '[itemprop="price"]']
        prices = []

        soup = BeautifulSoup(html, 'html.parser')
        for selector in selectors:
            for elem in soup.select(selector):
                text = elem.get_text(strip=True)
                # Parse: $1,234.56 → {amount: 1234.56, currency: 'USD'}
                match = re.search(r'[$€£¥]?([\d,]+\.?\d*)', text)
                if match:
                    amount = float(match.group(1).replace(',', ''))
                    currency = 'USD' if '$' in text else 'EUR' if '€' in text else 'USD'
                    prices.append({'amount': amount, 'currency': currency, 'selector': selector})

        return prices

class ClaimExtractor:
    def extract_claims(self, text: str) -> List[dict]:
        """Extract sentences with metrics, comparisons, superlatives"""
        claim_patterns = [r'\d+%', r'\$[\d,]+', r'\d+x', r'(faster|better|leading|first)']
        claims = []

        for sentence in text.split('.'):
            if any(re.search(p, sentence, re.I) for p in claim_patterns):
                claims.append({
                    'text': sentence.strip(),
                    'type': self._classify(sentence),
                    'confidence': 0.7
                })

        return claims

    def _classify(self, claim: str) -> str:
        """Classify claim type"""
        if '$' in claim:
            return 'pricing'
        elif '%' in claim:
            return 'performance'
        elif re.search(r'faster|slower', claim, re.I):
            return 'speed'
        else:
            return 'general'

5. Intelligence Generation

Confidence Scoring:

FactorWeightCalculation
Citation Density20%Citations per 50 words
Source Freshness30%1 - (age_hours / 168)
Source Diversity20%Unique domains / 5
Data Completeness30%Extracted fields / Total fields

Implementation Pattern:

class IntelligenceSummarizer:
    async def generate_summary(self, competitor: str, data: List[dict], period: str) -> dict:
        """Generate LLM summary with citations and confidence scoring"""
        # Prepare context with source numbers
        context = '\n'.join([
            f"[{i+1}] {d['url']}: {d['data']}"
            for i, d in enumerate(data)
        ])

        # LLM prompt with citation requirement
        prompt = f"""Summarize competitive intelligence for {competitor} ({period}).
        Requirements: Cite sources as [n], include specific metrics, avoid speculation.

        Data:
        {context}

        Summary (Overview → Key Findings → Changes):"""

        summary = await self.llm.generate(prompt)

        # Extract citations: [1], [2], etc.
        citations = [
            {'number': int(m), 'url': data[int(m)-1]['url']}
            for m in re.findall(r'\[(\d+)\]', summary)
            if int(m) <= len(data)
        ]

        # Calculate confidence score (4 factors)
        confidence = self._score_confidence(summary, data, citations)

        # Check contradictions
        contradictions = self._check_contradictions(summary, data)

        return {
            'summary': summary,
            'citations': citations,
            'confidence_score': confidence,
            'contradictions': contradictions
        }

    def _score_confidence(self, summary: str, data: List[dict], citations: List[dict]) -> float:
        """Weighted confidence: citation density (20%) + freshness (30%) + diversity (20%) + completeness (30%)"""
        # Citation density: citations per 50 words
        citation_density = min(len(citations) / (len(summary.split()) / 50), 1.0) * 0.2

        # Source freshness: 1 - (avg_age_hours / 168)
        avg_age = sum((datetime.utcnow() - datetime.fromisoformat(d['extracted_at'].replace('Z', ''))).total_seconds() / 3600
                     for d in data if 'extracted_at' in d) / len(data) if data else 0
        freshness = max(0, 1 - (avg_age / 168)) * 0.3

        # Source diversity: unique domains / 5
        unique_domains = len(set(urlparse(d['url']).netloc for d in data))
        diversity = min(unique_domains / 5, 1.0) * 0.2

        # Data completeness: non-null fields / total fields
        completeness = sum(1 for d in data for v in d['data'].values() if v) / sum(len(d['data']) for d in data) * 0.3

        return citation_density + freshness + diversity + completeness

    def _check_contradictions(self, summary: str, data: List[dict]) -> List[dict]:
        """Verify numbers in summary appear in source data"""
        contradictions = []
        numbers = re.findall(r'\d+(?:\.\d+)?', summary)
        data_str = str(data)

        for num in numbers:
            if num not in data_str:
                contradictions.append({'issue': f"Number {num} not in sources", 'severity': 'warning'})

        return contradictions

Evaluation Metrics

Metric CategoryMetricTargetMeasurement Method
FreshnessTime to Index< 1 hourTime from page update to indexed
FreshnessUpdate Latency (p95)< 4 hoursTime from change detection to alert
CoverageSource Coverage> 95%Monitored sources / Target sources
CoverageEntity Completeness> 90%Extracted fields / Total fields
AccuracyExtraction Precision> 95%Correct extractions / Total extractions
AccuracyPrice Accuracy> 99%Correct prices / Total prices
AccuracyClaim Verification Rate> 90%Verified claims / Total claims
QualitySummary Confidence> 0.8Average confidence score
QualityContradiction Rate< 2%Summaries with contradictions / Total
UtilityAnalyst Time Saved> 70%(Old time - New time) / Old time
UtilityActionable Insights Rate> 60%Insights acted upon / Total insights
Compliancerobots.txt Violations0Violations detected / Total crawls
ComplianceToS Violations0Violations detected / Total operations

Real-World Case Study: Retail Competitive Intelligence

Scenario: A major retailer needs to monitor pricing and product offerings across 25 competitors, tracking 10,000+ products.

System Flow:

graph TD A[Hourly Cron Job] --> B[Get Sources to Crawl] B --> C[Politeness Check] C -->|Pass| D[Fetch Content] C -->|Fail| E[Skip] D --> F[Deduplicate] F -->|Duplicate| E F -->|New| G[Change Detection] G -->|No Changes| E G -->|Changes| H[Extract Prices + Entities] H --> I[Store Snapshot] I --> J[Generate Alerts] J --> K[Create Summary] style C fill:#ffe1e1 style H fill:#e1f5ff style K fill:#e1ffe1

Implementation Pattern:

class RetailCIBot:
    async def monitor_competitor_pricing(self, competitor: str) -> dict:
        """End-to-end monitoring with 7-step pipeline"""
        sources = self.source_registry.get_competitor_sources(competitor)
        changes = []
        pricing_data = []

        for source in sources:
            # 1. Politeness check
            if not (await self.politeness.can_fetch(source.url))['can_fetch']:
                continue

            # 2. Fetch content
            content = await self._fetch(source.url)

            # 3. Deduplication
            if self.deduplicator.is_duplicate(source.url, content):
                continue

            # 4. Change detection
            change = await self.change_detector.detect_changes(source.url, content)
            if not change['has_changes']:
                continue

            changes.append(change)

            # 5. Extract prices + entities
            prices = self.price_extractor.extract_prices(content)
            entities = self.entity_extractor.extract(source.url, content, competitor)
            pricing_data.extend(prices)

            # 6. Store snapshot
            await self._store_snapshot(source.url, content, prices, entities)

        # 7. Generate summary and alerts
        if changes:
            summary = await self.summarizer.generate_summary(competitor, pricing_data, "24h")
            alerts = self._generate_alerts(changes, pricing_data)
            return {'competitor': competitor, 'changes': len(changes), 'summary': summary, 'alerts': alerts}

        return {'competitor': competitor, 'changes': 0, 'message': 'No changes'}

    def _generate_alerts(self, changes: List[dict], prices: List[dict]) -> List[dict]:
        """Alert on price drops >10% and new products"""
        alerts = []
        # Price drops
        for p in prices:
            if 'previous_price' in p and ((p['previous_price'] - p['amount']) / p['previous_price']) > 0.1:
                alerts.append({'type': 'price_drop', 'severity': 'high', 'product': p['context']['product_name']})
        return alerts

Results After 6 Months:

MetricBefore CI BotAfter CI BotImprovement
Competitors Monitored525400% increase
Products Tracked50010,0001900% increase
Update FrequencyWeekly manualHourly automatedReal-time vs. weekly
Analyst Time on Data Collection20 hours/week2 hours/week90% reduction
Price Change Detection Time7 days average2 hours average98% faster
Pricing Errors Caught2 per month45 per month2150% increase
Actionable Insights Generated5 per month120 per month2300% increase
ToS/Legal Violations00Maintained compliance

Key Success Factors:

  • ✅ Strict politeness controls prevented any legal issues
  • ✅ Change detection reduced noise, focusing only on updates
  • ✅ Citation linking enabled rapid verification of insights
  • ✅ Contradiction checking caught 15 potential errors before delivery
  • ✅ Evidence snapshots resolved 8 disputes about competitor claims

Implementation Checklist

Phase 1: Source Discovery (Week 1-2)

  • Define target competitors (minimum 10) and source types
  • Build source registry with priority and frequency configuration
  • Implement sitemap parser and RSS feed monitor
  • Create manual URL queue for ad-hoc monitoring

Phase 2: Politeness & Compliance (Week 3-4)

  • Implement robots.txt checker with 24h caching
  • Build rate limiter (10 req/min per domain default)
  • Configure domain allowlist for all competitors
  • Review ToS for each competitor and document policies

Phase 3: Content Processing (Week 5-6)

  • Implement URL canonicalization (tracking param removal, sorting)
  • Build content hash deduplication
  • Create snapshot storage and diff generation
  • Configure change detection thresholds (>10 lines = significant)

Phase 4: Extraction (Week 7-9)

  • Define extraction schemas per competitor (CSS selectors)
  • Implement price extractor with currency detection
  • Build claim extractor (patterns: %, $, comparatives, superlatives)
  • Create extraction validation and error handling

Phase 5: Intelligence Generation (Week 10-12)

  • Integrate LLM for summary generation with citation requirements
  • Implement confidence scoring (4 factors: density, freshness, diversity, completeness)
  • Build contradiction checker (verify numbers in sources)
  • Create alert rules (price drops >10%, new products, feature changes)

Phase 6: Evidence & Deployment (Week 13-16)

  • Implement snapshot storage with chain-of-custody logging
  • Build compliance audit logging and reports
  • Test all extraction types and change detection
  • Gradual rollout: 5 competitors → 15 → full (25)

Phase 7: Ongoing Operations

  • Weekly: Review extraction accuracy, update rules as sites change
  • Monthly: Evaluate source coverage, summary quality, compliance
  • Quarterly: Add competitors/sources, improve prompts, expand alerts

Common Pitfalls and Solutions

PitfallImpactSolution
Ignoring robots.txtLegal action, IP bansAlways check robots.txt; honor all directives
Aggressive crawlingIP bans, reputation damageConservative rate limits (10 req/min or less)
Poor URL canonicalizationDuplicate processing, wasted resourcesNormalize URLs; filter tracking parameters
No change detectionProcessing unchanged contentImplement content hashing and diff generation
Extraction brittlenessBreaks when sites updateMultiple fallback selectors; schema versioning
Missing citationsCannot verify claimsLink every fact to source URL and timestamp
Stale dataOutdated intelligenceTimestamp everything; show data age in reports
No contradiction checkingMisleading summariesValidate summary claims against source data
PII in snapshotsPrivacy violationsFilter PII before storage; redact in evidence
No source diversityBiased intelligenceMonitor multiple sources per competitor

Summary

Building effective competitive intelligence systems requires balancing three priorities:

  1. Legal Compliance: Strict adherence to robots.txt, ToS, and rate limits
  2. Data Quality: Accurate extraction, freshness tracking, and citation linking
  3. Analyst Utility: Actionable insights, not just raw data dumps

The retail CI bot case study demonstrates that with proper implementation, CI systems can:

  • Monitor 5x more competitors with 90% less manual effort
  • Detect competitive moves 98% faster than manual monitoring
  • Generate 24x more actionable insights while maintaining zero legal violations

The key success factors are:

  • Conservative politeness settings (err on the side of caution)
  • Comprehensive evidence preservation (every fact must be verifiable)
  • Change detection to reduce noise and focus on what matters
  • Grounded summarization with citations and contradiction checking
  • Continuous monitoring of extraction accuracy and legal compliance

By following the implementation checklist and avoiding common pitfalls, you can deploy CI systems that provide strategic advantage while respecting legal and ethical boundaries.