TechScout/techscout/technology/grouper.py

422 lines
15 KiB
Python
Raw Normal View History

2026-01-22 13:02:09 -05:00
"""
Technology Grouper for TechScout.
Groups duplicate technologies across sources into single entities.
Uses conservative heuristic matching to avoid false merges.
This is Stage 4 of the Capability-Technology Matching pipeline.
"""
import logging
import uuid
from typing import List, Dict, Set, Tuple, Optional
from dataclasses import dataclass
from collections import defaultdict
from .types import ExtractedTechnology, GroupedTechnology, SourceEvidence, Developer
logger = logging.getLogger(__name__)
@dataclass
class SimilarityScore:
"""Similarity scores between two technologies."""
name_similarity: float # 0-1, string similarity
developer_match: bool # Same developer?
capability_overlap: float # 0-1, Jaccard similarity
description_similarity: float # 0-1, keyword overlap
overall: float # Combined score
class TechnologyGrouper:
"""
Groups extracted technologies that describe the same underlying technology.
Uses conservative matching:
- Same developer + similar name definite merge
- Very similar name + similar capabilities likely merge
- Otherwise keep separate (avoid false merges)
"""
# Thresholds for grouping decisions
HIGH_CONFIDENCE_NAME_THRESHOLD = 0.85
MEDIUM_CONFIDENCE_NAME_THRESHOLD = 0.70
CAPABILITY_OVERLAP_THRESHOLD = 0.40
SAME_DEVELOPER_NAME_THRESHOLD = 0.65
def __init__(self):
pass
def group(self, technologies: List[ExtractedTechnology]) -> List[GroupedTechnology]:
"""
Group extracted technologies into unique technologies.
Args:
technologies: List of extracted technologies (may have duplicates)
Returns:
List of grouped technologies (deduplicated)
"""
if not technologies:
return []
logger.info(f"Grouping {len(technologies)} extracted technologies...")
# Build groups using union-find-like approach
groups: Dict[str, List[ExtractedTechnology]] = {}
tech_to_group: Dict[str, str] = {}
for tech in technologies:
# Find if this tech should merge with an existing group
merge_group_id = self._find_merge_candidate(tech, groups, tech_to_group)
if merge_group_id:
# Merge into existing group
groups[merge_group_id].append(tech)
tech_to_group[tech.id] = merge_group_id
else:
# Create new group
group_id = tech.id
groups[group_id] = [tech]
tech_to_group[tech.id] = group_id
# Convert groups to GroupedTechnology objects
grouped_technologies = []
for group_id, tech_list in groups.items():
grouped = self._merge_group(tech_list)
grouped_technologies.append(grouped)
logger.info(f"Grouped into {len(grouped_technologies)} unique technologies")
return grouped_technologies
def _find_merge_candidate(
self,
tech: ExtractedTechnology,
groups: Dict[str, List[ExtractedTechnology]],
tech_to_group: Dict[str, str]
) -> Optional[str]:
"""Find an existing group to merge this technology into."""
best_match_group = None
best_match_score = 0.0
for group_id, group_techs in groups.items():
# Compare against the first (canonical) tech in the group
representative = group_techs[0]
similarity = self._calculate_similarity(tech, representative)
should_merge, confidence = self._should_merge(similarity)
if should_merge and confidence > best_match_score:
best_match_group = group_id
best_match_score = confidence
return best_match_group
def _calculate_similarity(
self,
tech1: ExtractedTechnology,
tech2: ExtractedTechnology
) -> SimilarityScore:
"""Calculate similarity between two technologies."""
# Name similarity (fuzzy string match)
name_sim = self._string_similarity(tech1.name, tech2.name)
# Developer match
dev1 = (tech1.developer or "").lower().strip()
dev2 = (tech2.developer or "").lower().strip()
developer_match = False
if dev1 and dev2:
developer_match = (
dev1 == dev2 or
self._string_similarity(dev1, dev2) > 0.8 or
dev1 in dev2 or
dev2 in dev1
)
# Capability overlap (Jaccard similarity)
caps1 = set(self._extract_keywords(tech1.capabilities))
caps2 = set(self._extract_keywords(tech2.capabilities))
cap_overlap = self._jaccard_similarity(caps1, caps2)
# Description similarity (keyword overlap)
desc1_words = set(self._extract_keywords([tech1.description]))
desc2_words = set(self._extract_keywords([tech2.description]))
desc_sim = self._jaccard_similarity(desc1_words, desc2_words)
# Overall score
if developer_match:
overall = 0.5 * name_sim + 0.3 * cap_overlap + 0.2 * desc_sim
else:
overall = 0.6 * name_sim + 0.25 * cap_overlap + 0.15 * desc_sim
return SimilarityScore(
name_similarity=name_sim,
developer_match=developer_match,
capability_overlap=cap_overlap,
description_similarity=desc_sim,
overall=overall
)
def _should_merge(self, similarity: SimilarityScore) -> Tuple[bool, float]:
"""
Decide whether to merge based on similarity scores.
Returns (should_merge, confidence)
"""
# High confidence: Same developer + reasonably similar name
if similarity.developer_match and similarity.name_similarity > self.SAME_DEVELOPER_NAME_THRESHOLD:
return True, 0.9
# High confidence: Very similar names
if similarity.name_similarity > self.HIGH_CONFIDENCE_NAME_THRESHOLD:
return True, 0.85
# Medium confidence: Similar name + good capability overlap
if (similarity.name_similarity > self.MEDIUM_CONFIDENCE_NAME_THRESHOLD and
similarity.capability_overlap > self.CAPABILITY_OVERLAP_THRESHOLD):
return True, 0.7
# Same developer but different names - might be related but keep separate
if similarity.developer_match and similarity.name_similarity > 0.5:
# Only merge if capabilities strongly overlap
if similarity.capability_overlap > 0.6:
return True, 0.6
return False, 0.0
def _merge_group(self, technologies: List[ExtractedTechnology]) -> GroupedTechnology:
"""Merge multiple extracted technologies into one grouped technology."""
if len(technologies) == 1:
tech = technologies[0]
return self._single_to_grouped(tech)
# Choose canonical name (most specific/complete)
canonical_name = self._choose_canonical_name([t.name for t in technologies])
# Collect alternate names
alternate_names = list(set(
t.name for t in technologies if t.name != canonical_name
))
# Choose best technology type
type_counts = defaultdict(int)
for t in technologies:
type_counts[t.technology_type] += 1
technology_type = max(type_counts, key=type_counts.get) if type_counts else "system"
# Merge descriptions (choose longest/most detailed)
descriptions = [t.description for t in technologies if t.description]
description = max(descriptions, key=len) if descriptions else ""
# Merge capabilities (deduplicate)
all_capabilities = []
seen_caps = set()
for t in technologies:
for cap in t.capabilities:
cap_lower = cap.lower()
if cap_lower not in seen_caps:
seen_caps.add(cap_lower)
all_capabilities.append(cap)
# Merge mechanisms
mechanisms = [t.mechanism for t in technologies if t.mechanism]
mechanism = max(mechanisms, key=len) if mechanisms else None
# Collect developers
developers = []
seen_devs = set()
for t in technologies:
if t.developer and t.developer.lower() not in seen_devs:
seen_devs.add(t.developer.lower())
developers.append(Developer(
name=t.developer,
type=t.developer_type or "unknown"
))
# Best TRL estimate (highest confidence based on source type)
trl_estimate = None
trl_confidence = 0.0
trl_evidence = []
for t in technologies:
if t.trl_estimate:
# SBIR/patent sources have higher confidence
conf = 0.9 if t.source_type in ("sbir", "patent") else 0.7
if conf > trl_confidence:
trl_estimate = t.trl_estimate
trl_confidence = conf
trl_evidence.extend(t.trl_evidence)
trl_evidence = list(set(trl_evidence))
# Build source evidence
sources = []
for t in technologies:
sources.append(SourceEvidence(
source_type=t.source_type,
source_name=t.source_type.upper(),
title=t.source_title,
url=t.source_url,
snippet=t.source_snippet,
contribution=self._determine_contribution(t),
))
return GroupedTechnology(
id=str(uuid.uuid4())[:8],
canonical_name=canonical_name,
alternate_names=alternate_names,
technology_type=technology_type,
description=description,
capabilities=all_capabilities,
mechanism=mechanism,
developers=developers,
trl_estimate=trl_estimate,
trl_confidence=trl_confidence,
trl_evidence=trl_evidence,
sources=sources,
source_count=len(sources),
grouping_confidence=0.8 if len(technologies) > 1 else 1.0,
grouped_from=[t.id for t in technologies],
)
def _single_to_grouped(self, tech: ExtractedTechnology) -> GroupedTechnology:
"""Convert a single extracted technology to grouped format."""
developers = []
if tech.developer:
developers.append(Developer(
name=tech.developer,
type=tech.developer_type or "unknown"
))
source = SourceEvidence(
source_type=tech.source_type,
source_name=tech.source_type.upper(),
title=tech.source_title,
url=tech.source_url,
snippet=tech.source_snippet,
contribution=self._determine_contribution(tech),
)
return GroupedTechnology(
id=tech.id,
canonical_name=tech.name,
alternate_names=[],
technology_type=tech.technology_type,
description=tech.description,
capabilities=tech.capabilities,
mechanism=tech.mechanism,
developers=developers,
trl_estimate=tech.trl_estimate,
trl_confidence=0.7,
trl_evidence=tech.trl_evidence,
sources=[source],
source_count=1,
grouping_confidence=1.0,
grouped_from=[tech.id],
)
def _choose_canonical_name(self, names: List[str]) -> str:
"""Choose the most specific/complete name as canonical."""
if not names:
return "Unknown Technology"
# Prefer longer, more specific names
# But penalize overly long names (likely full titles, not tech names)
def name_score(name: str) -> float:
length = len(name)
if length < 5:
return 0.3
elif length > 100:
return 0.4
elif length > 50:
return 0.6
else:
return 0.8 + (length / 100)
return max(names, key=name_score)
def _determine_contribution(self, tech: ExtractedTechnology) -> str:
"""Determine what this source contributes to the technology profile."""
contributions = []
if tech.source_type == "sbir":
contributions.append("R&D funding")
contributions.append("development status")
elif tech.source_type == "patent":
contributions.append("technical claims")
contributions.append("innovation details")
elif tech.source_type == "contract":
contributions.append("government interest")
contributions.append("deployment status")
elif tech.source_type == "news":
contributions.append("recent developments")
else:
contributions.append("general information")
if tech.developer:
contributions.append("developer info")
return ", ".join(contributions)
def _string_similarity(self, s1: str, s2: str) -> float:
"""Calculate string similarity using token-based comparison."""
if not s1 or not s2:
return 0.0
# Normalize
s1 = s1.lower().strip()
s2 = s2.lower().strip()
if s1 == s2:
return 1.0
# Token-based comparison
tokens1 = set(s1.split())
tokens2 = set(s2.split())
# Remove common words
stopwords = {'the', 'a', 'an', 'for', 'of', 'and', 'in', 'to', 'with'}
tokens1 -= stopwords
tokens2 -= stopwords
if not tokens1 or not tokens2:
return 0.0
# Jaccard similarity of tokens
intersection = len(tokens1 & tokens2)
union = len(tokens1 | tokens2)
return intersection / union if union > 0 else 0.0
def _jaccard_similarity(self, set1: Set[str], set2: Set[str]) -> float:
"""Calculate Jaccard similarity between two sets."""
if not set1 or not set2:
return 0.0
intersection = len(set1 & set2)
union = len(set1 | set2)
return intersection / union if union > 0 else 0.0
def _extract_keywords(self, texts: List[str]) -> List[str]:
"""Extract keywords from a list of texts."""
stopwords = {
'the', 'a', 'an', 'is', 'are', 'was', 'were', 'be', 'been',
'being', 'have', 'has', 'had', 'do', 'does', 'did', 'will',
'would', 'could', 'should', 'may', 'might', 'must', 'shall',
'can', 'need', 'for', 'of', 'to', 'in', 'on', 'at', 'by',
'from', 'with', 'and', 'or', 'not', 'that', 'this', 'it',
'as', 'its', 'also', 'than', 'such', 'into', 'which'
}
keywords = []
for text in texts:
if not text:
continue
words = text.lower().split()
for word in words:
word = word.strip('.,;:!?()[]{}"\'-')
if word and len(word) > 2 and word not in stopwords:
keywords.append(word)
return keywords