TechScout/techscout/pipeline/capability_matcher.py

1220 lines
44 KiB
Python
Raw Permalink Normal View History

2026-01-22 13:02:09 -05:00
"""
Capability Matcher Pipeline for TechScout.
Orchestrates the full capability-to-technology matching process:
1. Parse capability need from natural language
2. Search sources for relevant content
3. Extract technologies from search results
4. Group duplicate technologies
5. Evaluate capability fit
6. Return ranked technologies
This is the main entry point for the new technology-centric approach.
"""
import json
import logging
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any, Optional, Generator
from ..config import TechScoutConfig, config as default_config
from ..extraction.llm_client import OllamaClient
from ..extraction.org_extractor import OrganizationExtractor
from ..search.web import WebSearcher
from ..search.base import SearchResult
from ..sources.sbir import SBIRSearcher
from ..sources.patents import PatentSearcher
from ..sources.contracts import ContractSearcher
from ..capability.parser import CapabilityParser
from ..capability.types import ParsedCapability, CapabilityNeed, CapabilityCriterion
from ..technology.extractor import TechnologyExtractor
from ..technology.grouper import TechnologyGrouper
from ..technology.evaluator import CapabilityEvaluator
from ..technology.types import EvaluatedTechnology, GroupedTechnology
logger = logging.getLogger(__name__)
@dataclass
class MatchSummary:
"""Summary of match results."""
total_technologies: int
high_fit_count: int
medium_fit_count: int
low_fit_count: int
uncertain_count: int
top_recommendation: Optional[str]
def to_dict(self) -> Dict[str, Any]:
return {
"total_technologies": self.total_technologies,
"high_fit_count": self.high_fit_count,
"medium_fit_count": self.medium_fit_count,
"low_fit_count": self.low_fit_count,
"uncertain_count": self.uncertain_count,
"top_recommendation": self.top_recommendation,
}
# ============================================================================
# Step-Based Pipeline Results (for guided workflow)
# ============================================================================
@dataclass
class SearchResultItem:
"""A single search result for the step-based workflow."""
id: str
title: str
snippet: str
url: str
source_type: str
source_name: str
organization: Optional[str] = None
published_date: Optional[str] = None
award_amount: Optional[float] = None
trl_estimate: Optional[int] = None
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"title": self.title,
"snippet": self.snippet,
"url": self.url,
"source_type": self.source_type,
"source_name": self.source_name,
"organization": self.organization,
"published_date": self.published_date,
"award_amount": self.award_amount,
"trl_estimate": self.trl_estimate,
}
@dataclass
class StepSearchResult:
"""
Result of Step 1: Search & Parse.
Contains parsed capability and raw search results for user review.
"""
id: str
timestamp: str
user_input: str
# Parsed capability info
capability_need: Optional[CapabilityNeed]
capability_criteria: List[CapabilityCriterion]
parsed_capability: Optional[ParsedCapability]
# Search results
search_results: List[SearchResultItem]
source_counts: Dict[str, int]
# Timing
processing_time_seconds: float
# Status
success: bool = True
error: Optional[str] = None
# Guidance
guidance_message: str = ""
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"timestamp": self.timestamp,
"user_input": self.user_input,
"capability_need": self.capability_need.to_dict() if self.capability_need else None,
"capability_criteria": [c.to_dict() for c in self.capability_criteria],
"parsed_capability": self.parsed_capability.to_dict() if self.parsed_capability else None,
"search_results": [r.to_dict() for r in self.search_results],
"source_counts": self.source_counts,
"processing_time_seconds": self.processing_time_seconds,
"success": self.success,
"error": self.error,
"guidance_message": self.guidance_message,
}
@dataclass
class TechnologyItem:
"""A technology item for the step-based workflow (pre-evaluation)."""
id: str
canonical_name: str
technology_type: str
description: str
capabilities: List[str]
developers: List[Dict[str, Any]]
trl_estimate: Optional[int]
source_count: int
sources: List[Dict[str, Any]]
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"canonical_name": self.canonical_name,
"technology_type": self.technology_type,
"description": self.description,
"capabilities": self.capabilities,
"developers": self.developers,
"trl_estimate": self.trl_estimate,
"source_count": self.source_count,
"sources": self.sources,
}
@dataclass
class StepExtractionResult:
"""
Result of Step 2: Technology Extraction.
Contains extracted and grouped technologies for user selection.
"""
id: str
timestamp: str
search_id: str # Links to StepSearchResult
# Technologies found
technologies: List[TechnologyItem]
# Stats
raw_extractions: int
after_grouping: int
# Timing
processing_time_seconds: float
# Status
success: bool = True
error: Optional[str] = None
# Guidance
guidance_message: str = ""
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"timestamp": self.timestamp,
"search_id": self.search_id,
"technologies": [t.to_dict() for t in self.technologies],
"raw_extractions": self.raw_extractions,
"after_grouping": self.after_grouping,
"processing_time_seconds": self.processing_time_seconds,
"success": self.success,
"error": self.error,
"guidance_message": self.guidance_message,
}
@dataclass
class StepEvaluationResult:
"""
Result of Step 3: Capability Evaluation.
Contains evaluated technologies with fit scores.
"""
id: str
timestamp: str
search_id: str # Links to original search
# Evaluated technologies
technologies: List[EvaluatedTechnology]
# Summary
summary: MatchSummary
# Timing
processing_time_seconds: float
# Status
success: bool = True
error: Optional[str] = None
# Guidance
guidance_message: str = ""
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"timestamp": self.timestamp,
"search_id": self.search_id,
"technologies": [t.to_dict() for t in self.technologies],
"summary": self.summary.to_dict(),
"processing_time_seconds": self.processing_time_seconds,
"success": self.success,
"error": self.error,
"guidance_message": self.guidance_message,
}
@dataclass
class SearchMetadata:
"""Metadata about the search process."""
total_documents_searched: int
technologies_extracted: int
technologies_after_grouping: int
sources_used: List[str]
processing_time_seconds: float
def to_dict(self) -> Dict[str, Any]:
return {
"total_documents_searched": self.total_documents_searched,
"technologies_extracted": self.technologies_extracted,
"technologies_after_grouping": self.technologies_after_grouping,
"sources_used": self.sources_used,
"processing_time_seconds": self.processing_time_seconds,
}
@dataclass
class CapabilityMatchResult:
"""
Complete result of capability-to-technology matching.
This is the main output type for the new pipeline.
"""
# Identity
id: str
timestamp: str
# Input
user_input: str
capability_need: Optional[CapabilityNeed]
capability_criteria: List[CapabilityCriterion]
# Output
technologies: List[EvaluatedTechnology]
# Summary
summary: MatchSummary
# Metadata
metadata: SearchMetadata
# Status
success: bool = True
error: Optional[str] = None
# Original parsing (for transparency)
parsed_capability: Optional[ParsedCapability] = None
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"timestamp": self.timestamp,
"user_input": self.user_input,
"capability_need": self.capability_need.to_dict() if self.capability_need else None,
"capability_criteria": [c.to_dict() for c in self.capability_criteria],
"technologies": [t.to_dict() for t in self.technologies],
"summary": self.summary.to_dict(),
"metadata": self.metadata.to_dict(),
"success": self.success,
"error": self.error,
"parsed_capability": self.parsed_capability.to_dict() if self.parsed_capability else None,
}
def save(self, path: Path):
"""Save result to JSON file."""
with open(path, "w") as f:
json.dump(self.to_dict(), f, indent=2)
@dataclass
class StatusUpdate:
"""Status update during pipeline execution."""
stage: str
message: str
progress: Optional[int] = None # 0-100
class CapabilityMatcherPipeline:
"""
Main pipeline for capability-to-technology matching.
Orchestrates:
1. Capability Parser (Stage 1)
2. Source Search (Stage 2)
3. Technology Extractor (Stage 3)
4. Technology Grouper (Stage 4)
5. Capability Evaluator (Stage 5)
"""
def __init__(
self,
config: Optional[TechScoutConfig] = None,
model: str = "mistral-nemo:12b"
):
self.config = config or default_config
self.model = model
# Initialize LLM client
self.llm_client = OllamaClient(
base_url=self.config.ollama.base_url,
default_model=model
)
# Initialize pipeline stages
self.capability_parser = CapabilityParser(self.llm_client, model)
self.technology_extractor = TechnologyExtractor(self.llm_client, model)
self.technology_grouper = TechnologyGrouper()
self.capability_evaluator = CapabilityEvaluator(self.llm_client, model)
self.org_extractor = OrganizationExtractor(self.llm_client, model)
# Initialize searchers
self.web_searcher = WebSearcher()
self.sbir_searcher = SBIRSearcher()
self.patent_searcher = PatentSearcher()
self.contract_searcher = ContractSearcher()
def match(
self,
user_input: str,
max_technologies: int = 15,
min_fit_score: int = 25,
sources: Optional[List[str]] = None
) -> CapabilityMatchResult:
"""
Run the full capability-to-technology matching pipeline.
Args:
user_input: Natural language capability need description
max_technologies: Maximum technologies to return
min_fit_score: Minimum fit score to include
sources: Which sources to search (default: all)
Returns:
CapabilityMatchResult with ranked technologies
"""
start_time = datetime.now()
result_id = str(uuid.uuid4())[:8]
sources = sources or ["sbir", "patents", "contracts", "web", "news"]
logger.info(f"Starting capability match for: {user_input[:100]}...")
# Stage 1: Parse capability need
logger.info("Stage 1: Parsing capability need...")
parsed = self.capability_parser.parse(user_input)
if not parsed.success:
return self._create_error_result(
result_id, user_input, start_time,
f"Capability parsing failed: {parsed.error}"
)
logger.info(f"Parsed capability need: {parsed.capability_need.functional_need[:100] if parsed.capability_need else 'N/A'}")
logger.info(f"Generated {len(parsed.capability_criteria)} evaluation criteria")
# Stage 2: Search sources
logger.info("Stage 2: Searching sources...")
search_results = self._search_all_sources(parsed, sources)
logger.info(f"Found {len(search_results)} total search results")
if not search_results:
return self._create_error_result(
result_id, user_input, start_time,
"No search results found", parsed
)
# Extract organizations for results missing them
self._extract_organizations(search_results)
# Stage 3: Extract technologies
logger.info("Stage 3: Extracting technologies from results...")
extraction_result = self.technology_extractor.extract_all(
search_results,
parsed.capability_need,
parsed.technology_indicators
)
logger.info(f"Extracted {len(extraction_result.technologies)} technologies")
if not extraction_result.technologies:
return self._create_empty_result(
result_id, user_input, start_time, parsed,
len(search_results), 0, 0
)
# Stage 4: Group technologies
logger.info("Stage 4: Grouping duplicate technologies...")
grouped_technologies = self.technology_grouper.group(extraction_result.technologies)
logger.info(f"Grouped into {len(grouped_technologies)} unique technologies")
# Stage 5: Evaluate capability fit
logger.info("Stage 5: Evaluating capability fit...")
evaluated_technologies = self.capability_evaluator.evaluate_all(
grouped_technologies,
parsed.capability_need,
parsed.capability_criteria
)
# Filter by minimum fit score
filtered_technologies = [
t for t in evaluated_technologies
if t.capability_match.fit_score >= min_fit_score
]
# Limit to max_technologies
final_technologies = filtered_technologies[:max_technologies]
# Build summary
summary = self._build_summary(final_technologies)
# Build metadata
duration = (datetime.now() - start_time).total_seconds()
metadata = SearchMetadata(
total_documents_searched=len(search_results),
technologies_extracted=len(extraction_result.technologies),
technologies_after_grouping=len(grouped_technologies),
sources_used=sources,
processing_time_seconds=duration
)
# Build result
result = CapabilityMatchResult(
id=result_id,
timestamp=datetime.now().isoformat(),
user_input=user_input,
capability_need=parsed.capability_need,
capability_criteria=parsed.capability_criteria,
technologies=final_technologies,
summary=summary,
metadata=metadata,
success=True,
parsed_capability=parsed
)
# Save result
save_path = self.config.analyses_dir / f"match_{result_id}.json"
result.save(save_path)
logger.info(f"Saved match result to {save_path}")
return result
def match_with_status(
self,
user_input: str,
max_technologies: int = 15,
min_fit_score: int = 25,
sources: Optional[List[str]] = None
) -> Generator[StatusUpdate | CapabilityMatchResult, None, None]:
"""
Run matching pipeline with status updates.
Yields StatusUpdate objects during processing, then final CapabilityMatchResult.
"""
start_time = datetime.now()
result_id = str(uuid.uuid4())[:8]
sources = sources or ["sbir", "patents", "contracts", "web", "news"]
# Stage 1: Parse
yield StatusUpdate("parsing", "Parsing capability need...", 10)
parsed = self.capability_parser.parse(user_input)
if not parsed.success:
yield self._create_error_result(
result_id, user_input, start_time,
f"Capability parsing failed: {parsed.error}"
)
return
# Stage 2: Search
yield StatusUpdate("searching", f"Searching {len(sources)} sources...", 20)
search_results = self._search_all_sources(parsed, sources)
yield StatusUpdate("searching", f"Found {len(search_results)} results", 40)
if not search_results:
yield self._create_error_result(
result_id, user_input, start_time,
"No search results found", parsed
)
return
self._extract_organizations(search_results)
# Stage 3: Extract
yield StatusUpdate("extracting", "Extracting technologies from results...", 50)
extraction_result = self.technology_extractor.extract_all(
search_results,
parsed.capability_need,
parsed.technology_indicators
)
yield StatusUpdate("extracting", f"Extracted {len(extraction_result.technologies)} technologies", 65)
if not extraction_result.technologies:
yield self._create_empty_result(
result_id, user_input, start_time, parsed,
len(search_results), 0, 0
)
return
# Stage 4: Group
yield StatusUpdate("grouping", "Grouping duplicate technologies...", 70)
grouped_technologies = self.technology_grouper.group(extraction_result.technologies)
yield StatusUpdate("grouping", f"Found {len(grouped_technologies)} unique technologies", 75)
# Stage 5: Evaluate
yield StatusUpdate("evaluating", "Evaluating capability fit...", 80)
evaluated_technologies = self.capability_evaluator.evaluate_all(
grouped_technologies,
parsed.capability_need,
parsed.capability_criteria
)
yield StatusUpdate("evaluating", "Evaluation complete", 95)
# Filter and finalize
filtered = [t for t in evaluated_technologies if t.capability_match.fit_score >= min_fit_score]
final = filtered[:max_technologies]
summary = self._build_summary(final)
duration = (datetime.now() - start_time).total_seconds()
metadata = SearchMetadata(
total_documents_searched=len(search_results),
technologies_extracted=len(extraction_result.technologies),
technologies_after_grouping=len(grouped_technologies),
sources_used=sources,
processing_time_seconds=duration
)
result = CapabilityMatchResult(
id=result_id,
timestamp=datetime.now().isoformat(),
user_input=user_input,
capability_need=parsed.capability_need,
capability_criteria=parsed.capability_criteria,
technologies=final,
summary=summary,
metadata=metadata,
success=True,
parsed_capability=parsed
)
save_path = self.config.analyses_dir / f"match_{result_id}.json"
result.save(save_path)
yield StatusUpdate("complete", f"Found {len(final)} technologies", 100)
yield result
def _search_all_sources(
self,
parsed: ParsedCapability,
sources: List[str]
) -> List[SearchResult]:
"""Search all configured sources."""
all_results: List[SearchResult] = []
# SBIR/STTR
if "sbir" in sources:
logger.info("Searching SBIR/STTR...")
for query in parsed.sbir_queries[:3]:
try:
results = self.sbir_searcher.search(query, max_results=15)
all_results.extend(results)
except Exception as e:
logger.warning(f"SBIR search failed: {e}")
# Patents
if "patents" in sources:
logger.info("Searching patents...")
for query in parsed.patent_queries[:3]:
try:
results = self.patent_searcher.search(query, max_results=15)
all_results.extend(results)
except Exception as e:
logger.warning(f"Patent search failed: {e}")
# Contracts
if "contracts" in sources:
logger.info("Searching federal contracts...")
for query in parsed.search_queries[:2]:
try:
results = self.contract_searcher.search_dod(query, max_results=10)
all_results.extend(results)
except Exception as e:
logger.warning(f"Contract search failed: {e}")
# Web
if "web" in sources:
logger.info("Searching web...")
for query in parsed.search_queries[:4]:
try:
results = self.web_searcher.search(query, max_results=10)
all_results.extend(results)
except Exception as e:
logger.warning(f"Web search failed: {e}")
# News
if "news" in sources:
logger.info("Searching news...")
for query in parsed.news_queries[:2]:
try:
results = self.web_searcher.search(query, max_results=10, news_only=True)
all_results.extend(results)
except Exception as e:
logger.warning(f"News search failed: {e}")
# Deduplicate by URL
seen_urls = set()
unique_results = []
for result in all_results:
if result.url and result.url not in seen_urls:
seen_urls.add(result.url)
unique_results.append(result)
return unique_results
def _extract_organizations(self, results: List[SearchResult]):
"""Extract organizations for results that don't have them."""
results_needing_org = [
(i, r) for i, r in enumerate(results)
if r.source_type in ("web", "news", "government", "academic") and not r.organization
]
if results_needing_org:
items_to_extract = [(r.title, r.snippet) for _, r in results_needing_org]
extractions = self.org_extractor.extract_batch(items_to_extract, use_llm_fallback=True)
for (idx, result), extraction in zip(results_needing_org, extractions):
if extraction.organization:
results[idx].organization = extraction.organization
def _build_summary(self, technologies: List[EvaluatedTechnology]) -> MatchSummary:
"""Build summary of match results."""
high_count = sum(1 for t in technologies if t.capability_match.overall_fit == "HIGH")
medium_count = sum(1 for t in technologies if t.capability_match.overall_fit == "MEDIUM")
low_count = sum(1 for t in technologies if t.capability_match.overall_fit == "LOW")
uncertain_count = sum(1 for t in technologies if t.capability_match.overall_fit == "UNCERTAIN")
top_rec = None
if technologies and technologies[0].capability_match.overall_fit in ("HIGH", "MEDIUM"):
top_rec = technologies[0].technology.canonical_name
return MatchSummary(
total_technologies=len(technologies),
high_fit_count=high_count,
medium_fit_count=medium_count,
low_fit_count=low_count,
uncertain_count=uncertain_count,
top_recommendation=top_rec
)
def _create_error_result(
self,
result_id: str,
user_input: str,
start_time: datetime,
error: str,
parsed: Optional[ParsedCapability] = None
) -> CapabilityMatchResult:
"""Create an error result."""
duration = (datetime.now() - start_time).total_seconds()
return CapabilityMatchResult(
id=result_id,
timestamp=datetime.now().isoformat(),
user_input=user_input,
capability_need=parsed.capability_need if parsed else None,
capability_criteria=parsed.capability_criteria if parsed else [],
technologies=[],
summary=MatchSummary(0, 0, 0, 0, 0, None),
metadata=SearchMetadata(0, 0, 0, [], duration),
success=False,
error=error,
parsed_capability=parsed
)
def _create_empty_result(
self,
result_id: str,
user_input: str,
start_time: datetime,
parsed: ParsedCapability,
docs_searched: int,
techs_extracted: int,
techs_grouped: int
) -> CapabilityMatchResult:
"""Create a result with no technologies found."""
duration = (datetime.now() - start_time).total_seconds()
return CapabilityMatchResult(
id=result_id,
timestamp=datetime.now().isoformat(),
user_input=user_input,
capability_need=parsed.capability_need,
capability_criteria=parsed.capability_criteria,
technologies=[],
summary=MatchSummary(0, 0, 0, 0, 0, None),
metadata=SearchMetadata(docs_searched, techs_extracted, techs_grouped, [], duration),
success=True,
error=None,
parsed_capability=parsed
)
# ========================================================================
# Step-Based Pipeline Methods (for guided workflow)
# ========================================================================
def step_search(
self,
user_input: str,
sources: Optional[List[str]] = None
) -> StepSearchResult:
"""
Step 1: Parse capability need and search sources.
This is the first step in the guided workflow. Returns search results
for user review before proceeding to technology extraction.
Args:
user_input: Natural language capability need description
sources: Which sources to search (default: all)
Returns:
StepSearchResult with parsed capability and search results
"""
start_time = datetime.now()
result_id = str(uuid.uuid4())[:8]
sources = sources or ["sbir", "patents", "contracts", "web", "news"]
logger.info(f"Step 1: Search & Parse for: {user_input[:100]}...")
# Parse capability need
logger.info("Parsing capability need...")
parsed = self.capability_parser.parse(user_input)
if not parsed.success:
duration = (datetime.now() - start_time).total_seconds()
return StepSearchResult(
id=result_id,
timestamp=datetime.now().isoformat(),
user_input=user_input,
capability_need=None,
capability_criteria=[],
parsed_capability=None,
search_results=[],
source_counts={},
processing_time_seconds=duration,
success=False,
error=f"Capability parsing failed: {parsed.error}",
guidance_message="Failed to understand your capability need. Please try rephrasing your query."
)
# Search all sources
logger.info(f"Searching {len(sources)} sources...")
raw_results = self._search_all_sources(parsed, sources)
# Extract organizations for results missing them
self._extract_organizations(raw_results)
# Convert to SearchResultItem
search_results = []
source_counts: Dict[str, int] = {}
for i, r in enumerate(raw_results):
item = SearchResultItem(
id=f"{result_id}-{i}",
title=r.title,
snippet=r.snippet,
url=r.url,
source_type=r.source_type,
source_name=r.source,
organization=r.organization,
published_date=r.published_date,
award_amount=r.award_amount,
trl_estimate=r.trl_estimate,
)
search_results.append(item)
source_counts[r.source_type] = source_counts.get(r.source_type, 0) + 1
duration = (datetime.now() - start_time).total_seconds()
# Generate guidance message
if not search_results:
guidance = "No results found. Consider broadening your search terms or enabling more sources."
elif len(search_results) < 10:
guidance = f"Found only **{len(search_results)} results**. You may want to broaden your query. If these look relevant, proceed to **Extract Technologies**."
else:
source_summary = ", ".join([f"{count} {src}" for src, count in sorted(source_counts.items(), key=lambda x: -x[1])])
guidance = f"Found **{len(search_results)} results** ({source_summary}). Review the results below. If they look relevant to your capability need, proceed to **Extract Technologies**. Otherwise, refine your query."
# Store the raw results and parsed capability for later steps
# We'll save this to disk so the extraction step can retrieve it
step_data = {
"id": result_id,
"user_input": user_input,
"parsed_capability": parsed.to_dict(),
"raw_results": [self._search_result_to_dict(r) for r in raw_results],
}
save_path = self.config.analyses_dir / f"step_search_{result_id}.json"
with open(save_path, "w") as f:
json.dump(step_data, f, indent=2)
return StepSearchResult(
id=result_id,
timestamp=datetime.now().isoformat(),
user_input=user_input,
capability_need=parsed.capability_need,
capability_criteria=parsed.capability_criteria,
parsed_capability=parsed,
search_results=search_results,
source_counts=source_counts,
processing_time_seconds=duration,
success=True,
guidance_message=guidance,
)
def step_extract(self, search_id: str) -> StepExtractionResult:
"""
Step 2: Extract technologies from search results.
Takes the search results from Step 1 and extracts/groups technologies.
Args:
search_id: ID from StepSearchResult
Returns:
StepExtractionResult with extracted technologies
"""
start_time = datetime.now()
result_id = str(uuid.uuid4())[:8]
logger.info(f"Step 2: Extract Technologies for search {search_id}...")
# Load the saved search data
search_path = self.config.analyses_dir / f"step_search_{search_id}.json"
if not search_path.exists():
return StepExtractionResult(
id=result_id,
timestamp=datetime.now().isoformat(),
search_id=search_id,
technologies=[],
raw_extractions=0,
after_grouping=0,
processing_time_seconds=0,
success=False,
error=f"Search results not found. Please run search step first.",
guidance_message="Search results expired or not found. Please start a new search."
)
with open(search_path) as f:
search_data = json.load(f)
# Reconstruct parsed capability
parsed = self._dict_to_parsed_capability(search_data["parsed_capability"])
# Reconstruct search results
raw_results = [self._dict_to_search_result(r) for r in search_data["raw_results"]]
# Extract technologies
logger.info(f"Extracting technologies from {len(raw_results)} results...")
extraction_result = self.technology_extractor.extract_all(
raw_results,
parsed.capability_need,
parsed.technology_indicators
)
if not extraction_result.technologies:
duration = (datetime.now() - start_time).total_seconds()
return StepExtractionResult(
id=result_id,
timestamp=datetime.now().isoformat(),
search_id=search_id,
technologies=[],
raw_extractions=0,
after_grouping=0,
processing_time_seconds=duration,
success=True,
guidance_message="No specific technologies could be extracted from the search results. The results may be too general. Try refining your search with more specific technical terms."
)
# Group technologies
logger.info(f"Grouping {len(extraction_result.technologies)} extracted technologies...")
grouped = self.technology_grouper.group(extraction_result.technologies)
# Convert to TechnologyItem
technology_items = []
for tech in grouped:
item = TechnologyItem(
id=tech.id,
canonical_name=tech.canonical_name,
technology_type=tech.technology_type,
description=tech.description,
capabilities=tech.capabilities,
developers=[d.to_dict() for d in tech.developers],
trl_estimate=tech.trl_estimate,
source_count=tech.source_count,
sources=[s.to_dict() for s in tech.sources],
)
technology_items.append(item)
duration = (datetime.now() - start_time).total_seconds()
# Generate guidance
if len(technology_items) == 0:
guidance = "No technologies found. Try a different search."
elif len(technology_items) <= 3:
guidance = f"Found **{len(technology_items)} technologies**. Select the ones you want to evaluate against your capability criteria, then click **Evaluate Selected**."
else:
guidance = f"Found **{len(technology_items)} unique technologies** (grouped from {len(extraction_result.technologies)} extractions). Review the list and select promising candidates, then click **Evaluate Selected** to see how well they match your capability need."
# Save extraction data for evaluation step
step_data = {
"id": result_id,
"search_id": search_id,
"user_input": search_data["user_input"],
"parsed_capability": search_data["parsed_capability"],
"grouped_technologies": [self._grouped_tech_to_dict(t) for t in grouped],
}
save_path = self.config.analyses_dir / f"step_extract_{result_id}.json"
with open(save_path, "w") as f:
json.dump(step_data, f, indent=2)
return StepExtractionResult(
id=result_id,
timestamp=datetime.now().isoformat(),
search_id=search_id,
technologies=technology_items,
raw_extractions=len(extraction_result.technologies),
after_grouping=len(grouped),
processing_time_seconds=duration,
success=True,
guidance_message=guidance,
)
def step_evaluate(
self,
extraction_id: str,
technology_ids: List[str]
) -> StepEvaluationResult:
"""
Step 3: Evaluate selected technologies against capability criteria.
Args:
extraction_id: ID from StepExtractionResult
technology_ids: List of technology IDs to evaluate
Returns:
StepEvaluationResult with fit assessments
"""
start_time = datetime.now()
result_id = str(uuid.uuid4())[:8]
logger.info(f"Step 3: Evaluate {len(technology_ids)} technologies...")
# Load extraction data
extract_path = self.config.analyses_dir / f"step_extract_{extraction_id}.json"
if not extract_path.exists():
return StepEvaluationResult(
id=result_id,
timestamp=datetime.now().isoformat(),
search_id="",
technologies=[],
summary=MatchSummary(0, 0, 0, 0, 0, None),
processing_time_seconds=0,
success=False,
error="Extraction results not found. Please run extraction step first.",
guidance_message="Extraction results expired. Please start over."
)
with open(extract_path) as f:
extract_data = json.load(f)
# Reconstruct parsed capability
parsed = self._dict_to_parsed_capability(extract_data["parsed_capability"])
# Get selected technologies
all_grouped = [self._dict_to_grouped_technology(t) for t in extract_data["grouped_technologies"]]
selected = [t for t in all_grouped if t.id in technology_ids]
if not selected:
return StepEvaluationResult(
id=result_id,
timestamp=datetime.now().isoformat(),
search_id=extract_data["search_id"],
technologies=[],
summary=MatchSummary(0, 0, 0, 0, 0, None),
processing_time_seconds=0,
success=False,
error="No valid technologies selected.",
guidance_message="Please select at least one technology to evaluate."
)
# Evaluate selected technologies
logger.info(f"Evaluating {len(selected)} technologies against criteria...")
evaluated = self.capability_evaluator.evaluate_all(
selected,
parsed.capability_need,
parsed.capability_criteria
)
# Sort by fit score
evaluated.sort(key=lambda t: t.capability_match.fit_score, reverse=True)
duration = (datetime.now() - start_time).total_seconds()
summary = self._build_summary(evaluated)
# Generate guidance
if summary.high_fit_count > 0:
guidance = f"**{summary.high_fit_count} technologies** show HIGH fit for your capability need! Click on each to see detailed criteria assessment. Consider running a **Deep Dive** on promising candidates."
elif summary.medium_fit_count > 0:
guidance = f"Found **{summary.medium_fit_count} technologies** with MEDIUM fit. These partially address your need. Review the criteria breakdown to understand gaps."
else:
guidance = "No technologies show strong fit for your criteria. Consider broadening your search or adjusting your requirements."
return StepEvaluationResult(
id=result_id,
timestamp=datetime.now().isoformat(),
search_id=extract_data["search_id"],
technologies=evaluated,
summary=summary,
processing_time_seconds=duration,
success=True,
guidance_message=guidance,
)
# Helper methods for serialization/deserialization
def _search_result_to_dict(self, r) -> Dict[str, Any]:
"""Convert SearchResult to dict for JSON storage.
Handles both SearchResult from base.py and SearchResultItem from this module.
"""
# Generate an id if not present (SearchResult from base.py uses rank instead)
result_id = getattr(r, 'id', None) or f"{r.source_type}_{r.rank}" if hasattr(r, 'rank') else f"{r.source_type}_{hash(r.url) % 100000}"
return {
"id": result_id,
"title": r.title,
"snippet": r.snippet,
"url": r.url,
"source_type": r.source_type,
"source": getattr(r, 'source', getattr(r, 'source_name', '')),
"organization": r.organization,
"published_date": r.published_date,
"award_amount": r.award_amount,
"award_id": getattr(r, 'award_id', None),
"trl_estimate": r.trl_estimate,
"patent_number": getattr(r, 'patent_number', None),
"relevance_score": getattr(r, 'relevance_score', 0.0),
}
def _dict_to_search_result(self, d: Dict[str, Any]) -> SearchResult:
"""Reconstruct SearchResult from dict.
Note: SearchResult from base.py uses 'rank' not 'id', and 'source' not 'source_name'.
We store extra fields in raw_data for later use.
"""
return SearchResult(
title=d.get("title", ""),
snippet=d.get("snippet", ""),
url=d.get("url", ""),
source_type=d.get("source_type", ""),
source=d.get("source", ""),
rank=0, # Rank is lost during serialization, default to 0
organization=d.get("organization"),
published_date=d.get("published_date"),
award_amount=d.get("award_amount"),
award_id=d.get("award_id"),
trl_estimate=d.get("trl_estimate"),
patent_number=d.get("patent_number"),
raw_data={
"id": d.get("id", ""),
"relevance_score": d.get("relevance_score", 0.0),
},
)
def _dict_to_parsed_capability(self, d: Dict[str, Any]) -> ParsedCapability:
"""Reconstruct ParsedCapability from dict."""
cap_need = None
if d.get("capability_need"):
cn = d["capability_need"]
cap_need = CapabilityNeed(
functional_need=cn.get("functional_need", ""),
domain=cn.get("domain", ""),
implied_constraints=cn.get("implied_constraints", []),
technology_types_sought=cn.get("technology_types_sought", []),
)
criteria = []
for c in d.get("capability_criteria", []):
criteria.append(CapabilityCriterion(
criterion=c.get("criterion", ""),
weight=c.get("weight", "should_have"),
keywords=c.get("keywords", []),
))
from ..capability.types import TechnologyIndicators
tech_ind = None
if d.get("technology_indicators"):
ti = d["technology_indicators"]
tech_ind = TechnologyIndicators(
positive=ti.get("positive", []),
negative=ti.get("negative", []),
)
return ParsedCapability(
original_query=d.get("original_query", ""),
understanding=d.get("understanding", ""),
technical_domains=d.get("technical_domains", []),
search_queries=d.get("search_queries", []),
sbir_queries=d.get("sbir_queries", []),
patent_queries=d.get("patent_queries", []),
news_queries=d.get("news_queries", []),
keywords=d.get("keywords", []),
exclusions=d.get("exclusions", []),
target_trl_range=tuple(d.get("target_trl_range", [4, 7])),
capability_need=cap_need,
capability_criteria=criteria,
technology_indicators=tech_ind,
success=d.get("success", True),
error=d.get("error"),
)
def _grouped_tech_to_dict(self, t: GroupedTechnology) -> Dict[str, Any]:
"""Convert GroupedTechnology to dict for JSON storage."""
return t.to_dict()
def _dict_to_grouped_technology(self, d: Dict[str, Any]) -> GroupedTechnology:
"""Reconstruct GroupedTechnology from dict."""
from ..technology.types import Developer, SourceEvidence
developers = []
for dev in d.get("developers", []):
developers.append(Developer(
name=dev.get("name", ""),
type=dev.get("type", "unknown"),
location=dev.get("location"),
))
sources = []
for src in d.get("sources", []):
sources.append(SourceEvidence(
source_type=src.get("source_type", ""),
source_name=src.get("source_name", ""),
title=src.get("title", ""),
url=src.get("url", ""),
snippet=src.get("snippet", ""),
contribution=src.get("contribution", ""),
published_date=src.get("published_date"),
award_amount=src.get("award_amount"),
award_id=src.get("award_id"),
patent_number=src.get("patent_number"),
))
return GroupedTechnology(
id=d.get("id", ""),
canonical_name=d.get("canonical_name", ""),
alternate_names=d.get("alternate_names", []),
technology_type=d.get("technology_type", ""),
description=d.get("description", ""),
capabilities=d.get("capabilities", []),
mechanism=d.get("mechanism"),
developers=developers,
trl_estimate=d.get("trl_estimate"),
trl_confidence=d.get("trl_confidence", 0.0),
trl_evidence=d.get("trl_evidence", []),
sources=sources,
source_count=d.get("source_count", 0),
grouping_confidence=d.get("grouping_confidence", 1.0),
grouped_from=d.get("grouped_from", []),
)