1220 lines
44 KiB
Python
1220 lines
44 KiB
Python
|
|
"""
|
||
|
|
Capability Matcher Pipeline for TechScout.
|
||
|
|
|
||
|
|
Orchestrates the full capability-to-technology matching process:
|
||
|
|
1. Parse capability need from natural language
|
||
|
|
2. Search sources for relevant content
|
||
|
|
3. Extract technologies from search results
|
||
|
|
4. Group duplicate technologies
|
||
|
|
5. Evaluate capability fit
|
||
|
|
6. Return ranked technologies
|
||
|
|
|
||
|
|
This is the main entry point for the new technology-centric approach.
|
||
|
|
"""
|
||
|
|
|
||
|
|
import json
|
||
|
|
import logging
|
||
|
|
import uuid
|
||
|
|
from dataclasses import dataclass, field
|
||
|
|
from datetime import datetime
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import List, Dict, Any, Optional, Generator
|
||
|
|
|
||
|
|
from ..config import TechScoutConfig, config as default_config
|
||
|
|
from ..extraction.llm_client import OllamaClient
|
||
|
|
from ..extraction.org_extractor import OrganizationExtractor
|
||
|
|
from ..search.web import WebSearcher
|
||
|
|
from ..search.base import SearchResult
|
||
|
|
from ..sources.sbir import SBIRSearcher
|
||
|
|
from ..sources.patents import PatentSearcher
|
||
|
|
from ..sources.contracts import ContractSearcher
|
||
|
|
from ..capability.parser import CapabilityParser
|
||
|
|
from ..capability.types import ParsedCapability, CapabilityNeed, CapabilityCriterion
|
||
|
|
from ..technology.extractor import TechnologyExtractor
|
||
|
|
from ..technology.grouper import TechnologyGrouper
|
||
|
|
from ..technology.evaluator import CapabilityEvaluator
|
||
|
|
from ..technology.types import EvaluatedTechnology, GroupedTechnology
|
||
|
|
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class MatchSummary:
|
||
|
|
"""Summary of match results."""
|
||
|
|
total_technologies: int
|
||
|
|
high_fit_count: int
|
||
|
|
medium_fit_count: int
|
||
|
|
low_fit_count: int
|
||
|
|
uncertain_count: int
|
||
|
|
top_recommendation: Optional[str]
|
||
|
|
|
||
|
|
def to_dict(self) -> Dict[str, Any]:
|
||
|
|
return {
|
||
|
|
"total_technologies": self.total_technologies,
|
||
|
|
"high_fit_count": self.high_fit_count,
|
||
|
|
"medium_fit_count": self.medium_fit_count,
|
||
|
|
"low_fit_count": self.low_fit_count,
|
||
|
|
"uncertain_count": self.uncertain_count,
|
||
|
|
"top_recommendation": self.top_recommendation,
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
# ============================================================================
|
||
|
|
# Step-Based Pipeline Results (for guided workflow)
|
||
|
|
# ============================================================================
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class SearchResultItem:
|
||
|
|
"""A single search result for the step-based workflow."""
|
||
|
|
id: str
|
||
|
|
title: str
|
||
|
|
snippet: str
|
||
|
|
url: str
|
||
|
|
source_type: str
|
||
|
|
source_name: str
|
||
|
|
organization: Optional[str] = None
|
||
|
|
published_date: Optional[str] = None
|
||
|
|
award_amount: Optional[float] = None
|
||
|
|
trl_estimate: Optional[int] = None
|
||
|
|
|
||
|
|
def to_dict(self) -> Dict[str, Any]:
|
||
|
|
return {
|
||
|
|
"id": self.id,
|
||
|
|
"title": self.title,
|
||
|
|
"snippet": self.snippet,
|
||
|
|
"url": self.url,
|
||
|
|
"source_type": self.source_type,
|
||
|
|
"source_name": self.source_name,
|
||
|
|
"organization": self.organization,
|
||
|
|
"published_date": self.published_date,
|
||
|
|
"award_amount": self.award_amount,
|
||
|
|
"trl_estimate": self.trl_estimate,
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class StepSearchResult:
|
||
|
|
"""
|
||
|
|
Result of Step 1: Search & Parse.
|
||
|
|
|
||
|
|
Contains parsed capability and raw search results for user review.
|
||
|
|
"""
|
||
|
|
id: str
|
||
|
|
timestamp: str
|
||
|
|
user_input: str
|
||
|
|
|
||
|
|
# Parsed capability info
|
||
|
|
capability_need: Optional[CapabilityNeed]
|
||
|
|
capability_criteria: List[CapabilityCriterion]
|
||
|
|
parsed_capability: Optional[ParsedCapability]
|
||
|
|
|
||
|
|
# Search results
|
||
|
|
search_results: List[SearchResultItem]
|
||
|
|
source_counts: Dict[str, int]
|
||
|
|
|
||
|
|
# Timing
|
||
|
|
processing_time_seconds: float
|
||
|
|
|
||
|
|
# Status
|
||
|
|
success: bool = True
|
||
|
|
error: Optional[str] = None
|
||
|
|
|
||
|
|
# Guidance
|
||
|
|
guidance_message: str = ""
|
||
|
|
|
||
|
|
def to_dict(self) -> Dict[str, Any]:
|
||
|
|
return {
|
||
|
|
"id": self.id,
|
||
|
|
"timestamp": self.timestamp,
|
||
|
|
"user_input": self.user_input,
|
||
|
|
"capability_need": self.capability_need.to_dict() if self.capability_need else None,
|
||
|
|
"capability_criteria": [c.to_dict() for c in self.capability_criteria],
|
||
|
|
"parsed_capability": self.parsed_capability.to_dict() if self.parsed_capability else None,
|
||
|
|
"search_results": [r.to_dict() for r in self.search_results],
|
||
|
|
"source_counts": self.source_counts,
|
||
|
|
"processing_time_seconds": self.processing_time_seconds,
|
||
|
|
"success": self.success,
|
||
|
|
"error": self.error,
|
||
|
|
"guidance_message": self.guidance_message,
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class TechnologyItem:
|
||
|
|
"""A technology item for the step-based workflow (pre-evaluation)."""
|
||
|
|
id: str
|
||
|
|
canonical_name: str
|
||
|
|
technology_type: str
|
||
|
|
description: str
|
||
|
|
capabilities: List[str]
|
||
|
|
developers: List[Dict[str, Any]]
|
||
|
|
trl_estimate: Optional[int]
|
||
|
|
source_count: int
|
||
|
|
sources: List[Dict[str, Any]]
|
||
|
|
|
||
|
|
def to_dict(self) -> Dict[str, Any]:
|
||
|
|
return {
|
||
|
|
"id": self.id,
|
||
|
|
"canonical_name": self.canonical_name,
|
||
|
|
"technology_type": self.technology_type,
|
||
|
|
"description": self.description,
|
||
|
|
"capabilities": self.capabilities,
|
||
|
|
"developers": self.developers,
|
||
|
|
"trl_estimate": self.trl_estimate,
|
||
|
|
"source_count": self.source_count,
|
||
|
|
"sources": self.sources,
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class StepExtractionResult:
|
||
|
|
"""
|
||
|
|
Result of Step 2: Technology Extraction.
|
||
|
|
|
||
|
|
Contains extracted and grouped technologies for user selection.
|
||
|
|
"""
|
||
|
|
id: str
|
||
|
|
timestamp: str
|
||
|
|
search_id: str # Links to StepSearchResult
|
||
|
|
|
||
|
|
# Technologies found
|
||
|
|
technologies: List[TechnologyItem]
|
||
|
|
|
||
|
|
# Stats
|
||
|
|
raw_extractions: int
|
||
|
|
after_grouping: int
|
||
|
|
|
||
|
|
# Timing
|
||
|
|
processing_time_seconds: float
|
||
|
|
|
||
|
|
# Status
|
||
|
|
success: bool = True
|
||
|
|
error: Optional[str] = None
|
||
|
|
|
||
|
|
# Guidance
|
||
|
|
guidance_message: str = ""
|
||
|
|
|
||
|
|
def to_dict(self) -> Dict[str, Any]:
|
||
|
|
return {
|
||
|
|
"id": self.id,
|
||
|
|
"timestamp": self.timestamp,
|
||
|
|
"search_id": self.search_id,
|
||
|
|
"technologies": [t.to_dict() for t in self.technologies],
|
||
|
|
"raw_extractions": self.raw_extractions,
|
||
|
|
"after_grouping": self.after_grouping,
|
||
|
|
"processing_time_seconds": self.processing_time_seconds,
|
||
|
|
"success": self.success,
|
||
|
|
"error": self.error,
|
||
|
|
"guidance_message": self.guidance_message,
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class StepEvaluationResult:
|
||
|
|
"""
|
||
|
|
Result of Step 3: Capability Evaluation.
|
||
|
|
|
||
|
|
Contains evaluated technologies with fit scores.
|
||
|
|
"""
|
||
|
|
id: str
|
||
|
|
timestamp: str
|
||
|
|
search_id: str # Links to original search
|
||
|
|
|
||
|
|
# Evaluated technologies
|
||
|
|
technologies: List[EvaluatedTechnology]
|
||
|
|
|
||
|
|
# Summary
|
||
|
|
summary: MatchSummary
|
||
|
|
|
||
|
|
# Timing
|
||
|
|
processing_time_seconds: float
|
||
|
|
|
||
|
|
# Status
|
||
|
|
success: bool = True
|
||
|
|
error: Optional[str] = None
|
||
|
|
|
||
|
|
# Guidance
|
||
|
|
guidance_message: str = ""
|
||
|
|
|
||
|
|
def to_dict(self) -> Dict[str, Any]:
|
||
|
|
return {
|
||
|
|
"id": self.id,
|
||
|
|
"timestamp": self.timestamp,
|
||
|
|
"search_id": self.search_id,
|
||
|
|
"technologies": [t.to_dict() for t in self.technologies],
|
||
|
|
"summary": self.summary.to_dict(),
|
||
|
|
"processing_time_seconds": self.processing_time_seconds,
|
||
|
|
"success": self.success,
|
||
|
|
"error": self.error,
|
||
|
|
"guidance_message": self.guidance_message,
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class SearchMetadata:
|
||
|
|
"""Metadata about the search process."""
|
||
|
|
total_documents_searched: int
|
||
|
|
technologies_extracted: int
|
||
|
|
technologies_after_grouping: int
|
||
|
|
sources_used: List[str]
|
||
|
|
processing_time_seconds: float
|
||
|
|
|
||
|
|
def to_dict(self) -> Dict[str, Any]:
|
||
|
|
return {
|
||
|
|
"total_documents_searched": self.total_documents_searched,
|
||
|
|
"technologies_extracted": self.technologies_extracted,
|
||
|
|
"technologies_after_grouping": self.technologies_after_grouping,
|
||
|
|
"sources_used": self.sources_used,
|
||
|
|
"processing_time_seconds": self.processing_time_seconds,
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class CapabilityMatchResult:
|
||
|
|
"""
|
||
|
|
Complete result of capability-to-technology matching.
|
||
|
|
|
||
|
|
This is the main output type for the new pipeline.
|
||
|
|
"""
|
||
|
|
# Identity
|
||
|
|
id: str
|
||
|
|
timestamp: str
|
||
|
|
|
||
|
|
# Input
|
||
|
|
user_input: str
|
||
|
|
capability_need: Optional[CapabilityNeed]
|
||
|
|
capability_criteria: List[CapabilityCriterion]
|
||
|
|
|
||
|
|
# Output
|
||
|
|
technologies: List[EvaluatedTechnology]
|
||
|
|
|
||
|
|
# Summary
|
||
|
|
summary: MatchSummary
|
||
|
|
|
||
|
|
# Metadata
|
||
|
|
metadata: SearchMetadata
|
||
|
|
|
||
|
|
# Status
|
||
|
|
success: bool = True
|
||
|
|
error: Optional[str] = None
|
||
|
|
|
||
|
|
# Original parsing (for transparency)
|
||
|
|
parsed_capability: Optional[ParsedCapability] = None
|
||
|
|
|
||
|
|
def to_dict(self) -> Dict[str, Any]:
|
||
|
|
return {
|
||
|
|
"id": self.id,
|
||
|
|
"timestamp": self.timestamp,
|
||
|
|
"user_input": self.user_input,
|
||
|
|
"capability_need": self.capability_need.to_dict() if self.capability_need else None,
|
||
|
|
"capability_criteria": [c.to_dict() for c in self.capability_criteria],
|
||
|
|
"technologies": [t.to_dict() for t in self.technologies],
|
||
|
|
"summary": self.summary.to_dict(),
|
||
|
|
"metadata": self.metadata.to_dict(),
|
||
|
|
"success": self.success,
|
||
|
|
"error": self.error,
|
||
|
|
"parsed_capability": self.parsed_capability.to_dict() if self.parsed_capability else None,
|
||
|
|
}
|
||
|
|
|
||
|
|
def save(self, path: Path):
|
||
|
|
"""Save result to JSON file."""
|
||
|
|
with open(path, "w") as f:
|
||
|
|
json.dump(self.to_dict(), f, indent=2)
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class StatusUpdate:
|
||
|
|
"""Status update during pipeline execution."""
|
||
|
|
stage: str
|
||
|
|
message: str
|
||
|
|
progress: Optional[int] = None # 0-100
|
||
|
|
|
||
|
|
|
||
|
|
class CapabilityMatcherPipeline:
|
||
|
|
"""
|
||
|
|
Main pipeline for capability-to-technology matching.
|
||
|
|
|
||
|
|
Orchestrates:
|
||
|
|
1. Capability Parser (Stage 1)
|
||
|
|
2. Source Search (Stage 2)
|
||
|
|
3. Technology Extractor (Stage 3)
|
||
|
|
4. Technology Grouper (Stage 4)
|
||
|
|
5. Capability Evaluator (Stage 5)
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(
|
||
|
|
self,
|
||
|
|
config: Optional[TechScoutConfig] = None,
|
||
|
|
model: str = "mistral-nemo:12b"
|
||
|
|
):
|
||
|
|
self.config = config or default_config
|
||
|
|
self.model = model
|
||
|
|
|
||
|
|
# Initialize LLM client
|
||
|
|
self.llm_client = OllamaClient(
|
||
|
|
base_url=self.config.ollama.base_url,
|
||
|
|
default_model=model
|
||
|
|
)
|
||
|
|
|
||
|
|
# Initialize pipeline stages
|
||
|
|
self.capability_parser = CapabilityParser(self.llm_client, model)
|
||
|
|
self.technology_extractor = TechnologyExtractor(self.llm_client, model)
|
||
|
|
self.technology_grouper = TechnologyGrouper()
|
||
|
|
self.capability_evaluator = CapabilityEvaluator(self.llm_client, model)
|
||
|
|
self.org_extractor = OrganizationExtractor(self.llm_client, model)
|
||
|
|
|
||
|
|
# Initialize searchers
|
||
|
|
self.web_searcher = WebSearcher()
|
||
|
|
self.sbir_searcher = SBIRSearcher()
|
||
|
|
self.patent_searcher = PatentSearcher()
|
||
|
|
self.contract_searcher = ContractSearcher()
|
||
|
|
|
||
|
|
def match(
|
||
|
|
self,
|
||
|
|
user_input: str,
|
||
|
|
max_technologies: int = 15,
|
||
|
|
min_fit_score: int = 25,
|
||
|
|
sources: Optional[List[str]] = None
|
||
|
|
) -> CapabilityMatchResult:
|
||
|
|
"""
|
||
|
|
Run the full capability-to-technology matching pipeline.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
user_input: Natural language capability need description
|
||
|
|
max_technologies: Maximum technologies to return
|
||
|
|
min_fit_score: Minimum fit score to include
|
||
|
|
sources: Which sources to search (default: all)
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
CapabilityMatchResult with ranked technologies
|
||
|
|
"""
|
||
|
|
start_time = datetime.now()
|
||
|
|
result_id = str(uuid.uuid4())[:8]
|
||
|
|
sources = sources or ["sbir", "patents", "contracts", "web", "news"]
|
||
|
|
|
||
|
|
logger.info(f"Starting capability match for: {user_input[:100]}...")
|
||
|
|
|
||
|
|
# Stage 1: Parse capability need
|
||
|
|
logger.info("Stage 1: Parsing capability need...")
|
||
|
|
parsed = self.capability_parser.parse(user_input)
|
||
|
|
|
||
|
|
if not parsed.success:
|
||
|
|
return self._create_error_result(
|
||
|
|
result_id, user_input, start_time,
|
||
|
|
f"Capability parsing failed: {parsed.error}"
|
||
|
|
)
|
||
|
|
|
||
|
|
logger.info(f"Parsed capability need: {parsed.capability_need.functional_need[:100] if parsed.capability_need else 'N/A'}")
|
||
|
|
logger.info(f"Generated {len(parsed.capability_criteria)} evaluation criteria")
|
||
|
|
|
||
|
|
# Stage 2: Search sources
|
||
|
|
logger.info("Stage 2: Searching sources...")
|
||
|
|
search_results = self._search_all_sources(parsed, sources)
|
||
|
|
logger.info(f"Found {len(search_results)} total search results")
|
||
|
|
|
||
|
|
if not search_results:
|
||
|
|
return self._create_error_result(
|
||
|
|
result_id, user_input, start_time,
|
||
|
|
"No search results found", parsed
|
||
|
|
)
|
||
|
|
|
||
|
|
# Extract organizations for results missing them
|
||
|
|
self._extract_organizations(search_results)
|
||
|
|
|
||
|
|
# Stage 3: Extract technologies
|
||
|
|
logger.info("Stage 3: Extracting technologies from results...")
|
||
|
|
extraction_result = self.technology_extractor.extract_all(
|
||
|
|
search_results,
|
||
|
|
parsed.capability_need,
|
||
|
|
parsed.technology_indicators
|
||
|
|
)
|
||
|
|
logger.info(f"Extracted {len(extraction_result.technologies)} technologies")
|
||
|
|
|
||
|
|
if not extraction_result.technologies:
|
||
|
|
return self._create_empty_result(
|
||
|
|
result_id, user_input, start_time, parsed,
|
||
|
|
len(search_results), 0, 0
|
||
|
|
)
|
||
|
|
|
||
|
|
# Stage 4: Group technologies
|
||
|
|
logger.info("Stage 4: Grouping duplicate technologies...")
|
||
|
|
grouped_technologies = self.technology_grouper.group(extraction_result.technologies)
|
||
|
|
logger.info(f"Grouped into {len(grouped_technologies)} unique technologies")
|
||
|
|
|
||
|
|
# Stage 5: Evaluate capability fit
|
||
|
|
logger.info("Stage 5: Evaluating capability fit...")
|
||
|
|
evaluated_technologies = self.capability_evaluator.evaluate_all(
|
||
|
|
grouped_technologies,
|
||
|
|
parsed.capability_need,
|
||
|
|
parsed.capability_criteria
|
||
|
|
)
|
||
|
|
|
||
|
|
# Filter by minimum fit score
|
||
|
|
filtered_technologies = [
|
||
|
|
t for t in evaluated_technologies
|
||
|
|
if t.capability_match.fit_score >= min_fit_score
|
||
|
|
]
|
||
|
|
|
||
|
|
# Limit to max_technologies
|
||
|
|
final_technologies = filtered_technologies[:max_technologies]
|
||
|
|
|
||
|
|
# Build summary
|
||
|
|
summary = self._build_summary(final_technologies)
|
||
|
|
|
||
|
|
# Build metadata
|
||
|
|
duration = (datetime.now() - start_time).total_seconds()
|
||
|
|
metadata = SearchMetadata(
|
||
|
|
total_documents_searched=len(search_results),
|
||
|
|
technologies_extracted=len(extraction_result.technologies),
|
||
|
|
technologies_after_grouping=len(grouped_technologies),
|
||
|
|
sources_used=sources,
|
||
|
|
processing_time_seconds=duration
|
||
|
|
)
|
||
|
|
|
||
|
|
# Build result
|
||
|
|
result = CapabilityMatchResult(
|
||
|
|
id=result_id,
|
||
|
|
timestamp=datetime.now().isoformat(),
|
||
|
|
user_input=user_input,
|
||
|
|
capability_need=parsed.capability_need,
|
||
|
|
capability_criteria=parsed.capability_criteria,
|
||
|
|
technologies=final_technologies,
|
||
|
|
summary=summary,
|
||
|
|
metadata=metadata,
|
||
|
|
success=True,
|
||
|
|
parsed_capability=parsed
|
||
|
|
)
|
||
|
|
|
||
|
|
# Save result
|
||
|
|
save_path = self.config.analyses_dir / f"match_{result_id}.json"
|
||
|
|
result.save(save_path)
|
||
|
|
logger.info(f"Saved match result to {save_path}")
|
||
|
|
|
||
|
|
return result
|
||
|
|
|
||
|
|
def match_with_status(
|
||
|
|
self,
|
||
|
|
user_input: str,
|
||
|
|
max_technologies: int = 15,
|
||
|
|
min_fit_score: int = 25,
|
||
|
|
sources: Optional[List[str]] = None
|
||
|
|
) -> Generator[StatusUpdate | CapabilityMatchResult, None, None]:
|
||
|
|
"""
|
||
|
|
Run matching pipeline with status updates.
|
||
|
|
|
||
|
|
Yields StatusUpdate objects during processing, then final CapabilityMatchResult.
|
||
|
|
"""
|
||
|
|
start_time = datetime.now()
|
||
|
|
result_id = str(uuid.uuid4())[:8]
|
||
|
|
sources = sources or ["sbir", "patents", "contracts", "web", "news"]
|
||
|
|
|
||
|
|
# Stage 1: Parse
|
||
|
|
yield StatusUpdate("parsing", "Parsing capability need...", 10)
|
||
|
|
parsed = self.capability_parser.parse(user_input)
|
||
|
|
|
||
|
|
if not parsed.success:
|
||
|
|
yield self._create_error_result(
|
||
|
|
result_id, user_input, start_time,
|
||
|
|
f"Capability parsing failed: {parsed.error}"
|
||
|
|
)
|
||
|
|
return
|
||
|
|
|
||
|
|
# Stage 2: Search
|
||
|
|
yield StatusUpdate("searching", f"Searching {len(sources)} sources...", 20)
|
||
|
|
search_results = self._search_all_sources(parsed, sources)
|
||
|
|
yield StatusUpdate("searching", f"Found {len(search_results)} results", 40)
|
||
|
|
|
||
|
|
if not search_results:
|
||
|
|
yield self._create_error_result(
|
||
|
|
result_id, user_input, start_time,
|
||
|
|
"No search results found", parsed
|
||
|
|
)
|
||
|
|
return
|
||
|
|
|
||
|
|
self._extract_organizations(search_results)
|
||
|
|
|
||
|
|
# Stage 3: Extract
|
||
|
|
yield StatusUpdate("extracting", "Extracting technologies from results...", 50)
|
||
|
|
extraction_result = self.technology_extractor.extract_all(
|
||
|
|
search_results,
|
||
|
|
parsed.capability_need,
|
||
|
|
parsed.technology_indicators
|
||
|
|
)
|
||
|
|
yield StatusUpdate("extracting", f"Extracted {len(extraction_result.technologies)} technologies", 65)
|
||
|
|
|
||
|
|
if not extraction_result.technologies:
|
||
|
|
yield self._create_empty_result(
|
||
|
|
result_id, user_input, start_time, parsed,
|
||
|
|
len(search_results), 0, 0
|
||
|
|
)
|
||
|
|
return
|
||
|
|
|
||
|
|
# Stage 4: Group
|
||
|
|
yield StatusUpdate("grouping", "Grouping duplicate technologies...", 70)
|
||
|
|
grouped_technologies = self.technology_grouper.group(extraction_result.technologies)
|
||
|
|
yield StatusUpdate("grouping", f"Found {len(grouped_technologies)} unique technologies", 75)
|
||
|
|
|
||
|
|
# Stage 5: Evaluate
|
||
|
|
yield StatusUpdate("evaluating", "Evaluating capability fit...", 80)
|
||
|
|
evaluated_technologies = self.capability_evaluator.evaluate_all(
|
||
|
|
grouped_technologies,
|
||
|
|
parsed.capability_need,
|
||
|
|
parsed.capability_criteria
|
||
|
|
)
|
||
|
|
yield StatusUpdate("evaluating", "Evaluation complete", 95)
|
||
|
|
|
||
|
|
# Filter and finalize
|
||
|
|
filtered = [t for t in evaluated_technologies if t.capability_match.fit_score >= min_fit_score]
|
||
|
|
final = filtered[:max_technologies]
|
||
|
|
|
||
|
|
summary = self._build_summary(final)
|
||
|
|
duration = (datetime.now() - start_time).total_seconds()
|
||
|
|
metadata = SearchMetadata(
|
||
|
|
total_documents_searched=len(search_results),
|
||
|
|
technologies_extracted=len(extraction_result.technologies),
|
||
|
|
technologies_after_grouping=len(grouped_technologies),
|
||
|
|
sources_used=sources,
|
||
|
|
processing_time_seconds=duration
|
||
|
|
)
|
||
|
|
|
||
|
|
result = CapabilityMatchResult(
|
||
|
|
id=result_id,
|
||
|
|
timestamp=datetime.now().isoformat(),
|
||
|
|
user_input=user_input,
|
||
|
|
capability_need=parsed.capability_need,
|
||
|
|
capability_criteria=parsed.capability_criteria,
|
||
|
|
technologies=final,
|
||
|
|
summary=summary,
|
||
|
|
metadata=metadata,
|
||
|
|
success=True,
|
||
|
|
parsed_capability=parsed
|
||
|
|
)
|
||
|
|
|
||
|
|
save_path = self.config.analyses_dir / f"match_{result_id}.json"
|
||
|
|
result.save(save_path)
|
||
|
|
|
||
|
|
yield StatusUpdate("complete", f"Found {len(final)} technologies", 100)
|
||
|
|
yield result
|
||
|
|
|
||
|
|
def _search_all_sources(
|
||
|
|
self,
|
||
|
|
parsed: ParsedCapability,
|
||
|
|
sources: List[str]
|
||
|
|
) -> List[SearchResult]:
|
||
|
|
"""Search all configured sources."""
|
||
|
|
all_results: List[SearchResult] = []
|
||
|
|
|
||
|
|
# SBIR/STTR
|
||
|
|
if "sbir" in sources:
|
||
|
|
logger.info("Searching SBIR/STTR...")
|
||
|
|
for query in parsed.sbir_queries[:3]:
|
||
|
|
try:
|
||
|
|
results = self.sbir_searcher.search(query, max_results=15)
|
||
|
|
all_results.extend(results)
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning(f"SBIR search failed: {e}")
|
||
|
|
|
||
|
|
# Patents
|
||
|
|
if "patents" in sources:
|
||
|
|
logger.info("Searching patents...")
|
||
|
|
for query in parsed.patent_queries[:3]:
|
||
|
|
try:
|
||
|
|
results = self.patent_searcher.search(query, max_results=15)
|
||
|
|
all_results.extend(results)
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning(f"Patent search failed: {e}")
|
||
|
|
|
||
|
|
# Contracts
|
||
|
|
if "contracts" in sources:
|
||
|
|
logger.info("Searching federal contracts...")
|
||
|
|
for query in parsed.search_queries[:2]:
|
||
|
|
try:
|
||
|
|
results = self.contract_searcher.search_dod(query, max_results=10)
|
||
|
|
all_results.extend(results)
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning(f"Contract search failed: {e}")
|
||
|
|
|
||
|
|
# Web
|
||
|
|
if "web" in sources:
|
||
|
|
logger.info("Searching web...")
|
||
|
|
for query in parsed.search_queries[:4]:
|
||
|
|
try:
|
||
|
|
results = self.web_searcher.search(query, max_results=10)
|
||
|
|
all_results.extend(results)
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning(f"Web search failed: {e}")
|
||
|
|
|
||
|
|
# News
|
||
|
|
if "news" in sources:
|
||
|
|
logger.info("Searching news...")
|
||
|
|
for query in parsed.news_queries[:2]:
|
||
|
|
try:
|
||
|
|
results = self.web_searcher.search(query, max_results=10, news_only=True)
|
||
|
|
all_results.extend(results)
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning(f"News search failed: {e}")
|
||
|
|
|
||
|
|
# Deduplicate by URL
|
||
|
|
seen_urls = set()
|
||
|
|
unique_results = []
|
||
|
|
for result in all_results:
|
||
|
|
if result.url and result.url not in seen_urls:
|
||
|
|
seen_urls.add(result.url)
|
||
|
|
unique_results.append(result)
|
||
|
|
|
||
|
|
return unique_results
|
||
|
|
|
||
|
|
def _extract_organizations(self, results: List[SearchResult]):
|
||
|
|
"""Extract organizations for results that don't have them."""
|
||
|
|
results_needing_org = [
|
||
|
|
(i, r) for i, r in enumerate(results)
|
||
|
|
if r.source_type in ("web", "news", "government", "academic") and not r.organization
|
||
|
|
]
|
||
|
|
|
||
|
|
if results_needing_org:
|
||
|
|
items_to_extract = [(r.title, r.snippet) for _, r in results_needing_org]
|
||
|
|
extractions = self.org_extractor.extract_batch(items_to_extract, use_llm_fallback=True)
|
||
|
|
|
||
|
|
for (idx, result), extraction in zip(results_needing_org, extractions):
|
||
|
|
if extraction.organization:
|
||
|
|
results[idx].organization = extraction.organization
|
||
|
|
|
||
|
|
def _build_summary(self, technologies: List[EvaluatedTechnology]) -> MatchSummary:
|
||
|
|
"""Build summary of match results."""
|
||
|
|
high_count = sum(1 for t in technologies if t.capability_match.overall_fit == "HIGH")
|
||
|
|
medium_count = sum(1 for t in technologies if t.capability_match.overall_fit == "MEDIUM")
|
||
|
|
low_count = sum(1 for t in technologies if t.capability_match.overall_fit == "LOW")
|
||
|
|
uncertain_count = sum(1 for t in technologies if t.capability_match.overall_fit == "UNCERTAIN")
|
||
|
|
|
||
|
|
top_rec = None
|
||
|
|
if technologies and technologies[0].capability_match.overall_fit in ("HIGH", "MEDIUM"):
|
||
|
|
top_rec = technologies[0].technology.canonical_name
|
||
|
|
|
||
|
|
return MatchSummary(
|
||
|
|
total_technologies=len(technologies),
|
||
|
|
high_fit_count=high_count,
|
||
|
|
medium_fit_count=medium_count,
|
||
|
|
low_fit_count=low_count,
|
||
|
|
uncertain_count=uncertain_count,
|
||
|
|
top_recommendation=top_rec
|
||
|
|
)
|
||
|
|
|
||
|
|
def _create_error_result(
|
||
|
|
self,
|
||
|
|
result_id: str,
|
||
|
|
user_input: str,
|
||
|
|
start_time: datetime,
|
||
|
|
error: str,
|
||
|
|
parsed: Optional[ParsedCapability] = None
|
||
|
|
) -> CapabilityMatchResult:
|
||
|
|
"""Create an error result."""
|
||
|
|
duration = (datetime.now() - start_time).total_seconds()
|
||
|
|
return CapabilityMatchResult(
|
||
|
|
id=result_id,
|
||
|
|
timestamp=datetime.now().isoformat(),
|
||
|
|
user_input=user_input,
|
||
|
|
capability_need=parsed.capability_need if parsed else None,
|
||
|
|
capability_criteria=parsed.capability_criteria if parsed else [],
|
||
|
|
technologies=[],
|
||
|
|
summary=MatchSummary(0, 0, 0, 0, 0, None),
|
||
|
|
metadata=SearchMetadata(0, 0, 0, [], duration),
|
||
|
|
success=False,
|
||
|
|
error=error,
|
||
|
|
parsed_capability=parsed
|
||
|
|
)
|
||
|
|
|
||
|
|
def _create_empty_result(
|
||
|
|
self,
|
||
|
|
result_id: str,
|
||
|
|
user_input: str,
|
||
|
|
start_time: datetime,
|
||
|
|
parsed: ParsedCapability,
|
||
|
|
docs_searched: int,
|
||
|
|
techs_extracted: int,
|
||
|
|
techs_grouped: int
|
||
|
|
) -> CapabilityMatchResult:
|
||
|
|
"""Create a result with no technologies found."""
|
||
|
|
duration = (datetime.now() - start_time).total_seconds()
|
||
|
|
return CapabilityMatchResult(
|
||
|
|
id=result_id,
|
||
|
|
timestamp=datetime.now().isoformat(),
|
||
|
|
user_input=user_input,
|
||
|
|
capability_need=parsed.capability_need,
|
||
|
|
capability_criteria=parsed.capability_criteria,
|
||
|
|
technologies=[],
|
||
|
|
summary=MatchSummary(0, 0, 0, 0, 0, None),
|
||
|
|
metadata=SearchMetadata(docs_searched, techs_extracted, techs_grouped, [], duration),
|
||
|
|
success=True,
|
||
|
|
error=None,
|
||
|
|
parsed_capability=parsed
|
||
|
|
)
|
||
|
|
|
||
|
|
# ========================================================================
|
||
|
|
# Step-Based Pipeline Methods (for guided workflow)
|
||
|
|
# ========================================================================
|
||
|
|
|
||
|
|
def step_search(
|
||
|
|
self,
|
||
|
|
user_input: str,
|
||
|
|
sources: Optional[List[str]] = None
|
||
|
|
) -> StepSearchResult:
|
||
|
|
"""
|
||
|
|
Step 1: Parse capability need and search sources.
|
||
|
|
|
||
|
|
This is the first step in the guided workflow. Returns search results
|
||
|
|
for user review before proceeding to technology extraction.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
user_input: Natural language capability need description
|
||
|
|
sources: Which sources to search (default: all)
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
StepSearchResult with parsed capability and search results
|
||
|
|
"""
|
||
|
|
start_time = datetime.now()
|
||
|
|
result_id = str(uuid.uuid4())[:8]
|
||
|
|
sources = sources or ["sbir", "patents", "contracts", "web", "news"]
|
||
|
|
|
||
|
|
logger.info(f"Step 1: Search & Parse for: {user_input[:100]}...")
|
||
|
|
|
||
|
|
# Parse capability need
|
||
|
|
logger.info("Parsing capability need...")
|
||
|
|
parsed = self.capability_parser.parse(user_input)
|
||
|
|
|
||
|
|
if not parsed.success:
|
||
|
|
duration = (datetime.now() - start_time).total_seconds()
|
||
|
|
return StepSearchResult(
|
||
|
|
id=result_id,
|
||
|
|
timestamp=datetime.now().isoformat(),
|
||
|
|
user_input=user_input,
|
||
|
|
capability_need=None,
|
||
|
|
capability_criteria=[],
|
||
|
|
parsed_capability=None,
|
||
|
|
search_results=[],
|
||
|
|
source_counts={},
|
||
|
|
processing_time_seconds=duration,
|
||
|
|
success=False,
|
||
|
|
error=f"Capability parsing failed: {parsed.error}",
|
||
|
|
guidance_message="Failed to understand your capability need. Please try rephrasing your query."
|
||
|
|
)
|
||
|
|
|
||
|
|
# Search all sources
|
||
|
|
logger.info(f"Searching {len(sources)} sources...")
|
||
|
|
raw_results = self._search_all_sources(parsed, sources)
|
||
|
|
|
||
|
|
# Extract organizations for results missing them
|
||
|
|
self._extract_organizations(raw_results)
|
||
|
|
|
||
|
|
# Convert to SearchResultItem
|
||
|
|
search_results = []
|
||
|
|
source_counts: Dict[str, int] = {}
|
||
|
|
|
||
|
|
for i, r in enumerate(raw_results):
|
||
|
|
item = SearchResultItem(
|
||
|
|
id=f"{result_id}-{i}",
|
||
|
|
title=r.title,
|
||
|
|
snippet=r.snippet,
|
||
|
|
url=r.url,
|
||
|
|
source_type=r.source_type,
|
||
|
|
source_name=r.source,
|
||
|
|
organization=r.organization,
|
||
|
|
published_date=r.published_date,
|
||
|
|
award_amount=r.award_amount,
|
||
|
|
trl_estimate=r.trl_estimate,
|
||
|
|
)
|
||
|
|
search_results.append(item)
|
||
|
|
source_counts[r.source_type] = source_counts.get(r.source_type, 0) + 1
|
||
|
|
|
||
|
|
duration = (datetime.now() - start_time).total_seconds()
|
||
|
|
|
||
|
|
# Generate guidance message
|
||
|
|
if not search_results:
|
||
|
|
guidance = "No results found. Consider broadening your search terms or enabling more sources."
|
||
|
|
elif len(search_results) < 10:
|
||
|
|
guidance = f"Found only **{len(search_results)} results**. You may want to broaden your query. If these look relevant, proceed to **Extract Technologies**."
|
||
|
|
else:
|
||
|
|
source_summary = ", ".join([f"{count} {src}" for src, count in sorted(source_counts.items(), key=lambda x: -x[1])])
|
||
|
|
guidance = f"Found **{len(search_results)} results** ({source_summary}). Review the results below. If they look relevant to your capability need, proceed to **Extract Technologies**. Otherwise, refine your query."
|
||
|
|
|
||
|
|
# Store the raw results and parsed capability for later steps
|
||
|
|
# We'll save this to disk so the extraction step can retrieve it
|
||
|
|
step_data = {
|
||
|
|
"id": result_id,
|
||
|
|
"user_input": user_input,
|
||
|
|
"parsed_capability": parsed.to_dict(),
|
||
|
|
"raw_results": [self._search_result_to_dict(r) for r in raw_results],
|
||
|
|
}
|
||
|
|
save_path = self.config.analyses_dir / f"step_search_{result_id}.json"
|
||
|
|
with open(save_path, "w") as f:
|
||
|
|
json.dump(step_data, f, indent=2)
|
||
|
|
|
||
|
|
return StepSearchResult(
|
||
|
|
id=result_id,
|
||
|
|
timestamp=datetime.now().isoformat(),
|
||
|
|
user_input=user_input,
|
||
|
|
capability_need=parsed.capability_need,
|
||
|
|
capability_criteria=parsed.capability_criteria,
|
||
|
|
parsed_capability=parsed,
|
||
|
|
search_results=search_results,
|
||
|
|
source_counts=source_counts,
|
||
|
|
processing_time_seconds=duration,
|
||
|
|
success=True,
|
||
|
|
guidance_message=guidance,
|
||
|
|
)
|
||
|
|
|
||
|
|
def step_extract(self, search_id: str) -> StepExtractionResult:
|
||
|
|
"""
|
||
|
|
Step 2: Extract technologies from search results.
|
||
|
|
|
||
|
|
Takes the search results from Step 1 and extracts/groups technologies.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
search_id: ID from StepSearchResult
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
StepExtractionResult with extracted technologies
|
||
|
|
"""
|
||
|
|
start_time = datetime.now()
|
||
|
|
result_id = str(uuid.uuid4())[:8]
|
||
|
|
|
||
|
|
logger.info(f"Step 2: Extract Technologies for search {search_id}...")
|
||
|
|
|
||
|
|
# Load the saved search data
|
||
|
|
search_path = self.config.analyses_dir / f"step_search_{search_id}.json"
|
||
|
|
if not search_path.exists():
|
||
|
|
return StepExtractionResult(
|
||
|
|
id=result_id,
|
||
|
|
timestamp=datetime.now().isoformat(),
|
||
|
|
search_id=search_id,
|
||
|
|
technologies=[],
|
||
|
|
raw_extractions=0,
|
||
|
|
after_grouping=0,
|
||
|
|
processing_time_seconds=0,
|
||
|
|
success=False,
|
||
|
|
error=f"Search results not found. Please run search step first.",
|
||
|
|
guidance_message="Search results expired or not found. Please start a new search."
|
||
|
|
)
|
||
|
|
|
||
|
|
with open(search_path) as f:
|
||
|
|
search_data = json.load(f)
|
||
|
|
|
||
|
|
# Reconstruct parsed capability
|
||
|
|
parsed = self._dict_to_parsed_capability(search_data["parsed_capability"])
|
||
|
|
|
||
|
|
# Reconstruct search results
|
||
|
|
raw_results = [self._dict_to_search_result(r) for r in search_data["raw_results"]]
|
||
|
|
|
||
|
|
# Extract technologies
|
||
|
|
logger.info(f"Extracting technologies from {len(raw_results)} results...")
|
||
|
|
extraction_result = self.technology_extractor.extract_all(
|
||
|
|
raw_results,
|
||
|
|
parsed.capability_need,
|
||
|
|
parsed.technology_indicators
|
||
|
|
)
|
||
|
|
|
||
|
|
if not extraction_result.technologies:
|
||
|
|
duration = (datetime.now() - start_time).total_seconds()
|
||
|
|
return StepExtractionResult(
|
||
|
|
id=result_id,
|
||
|
|
timestamp=datetime.now().isoformat(),
|
||
|
|
search_id=search_id,
|
||
|
|
technologies=[],
|
||
|
|
raw_extractions=0,
|
||
|
|
after_grouping=0,
|
||
|
|
processing_time_seconds=duration,
|
||
|
|
success=True,
|
||
|
|
guidance_message="No specific technologies could be extracted from the search results. The results may be too general. Try refining your search with more specific technical terms."
|
||
|
|
)
|
||
|
|
|
||
|
|
# Group technologies
|
||
|
|
logger.info(f"Grouping {len(extraction_result.technologies)} extracted technologies...")
|
||
|
|
grouped = self.technology_grouper.group(extraction_result.technologies)
|
||
|
|
|
||
|
|
# Convert to TechnologyItem
|
||
|
|
technology_items = []
|
||
|
|
for tech in grouped:
|
||
|
|
item = TechnologyItem(
|
||
|
|
id=tech.id,
|
||
|
|
canonical_name=tech.canonical_name,
|
||
|
|
technology_type=tech.technology_type,
|
||
|
|
description=tech.description,
|
||
|
|
capabilities=tech.capabilities,
|
||
|
|
developers=[d.to_dict() for d in tech.developers],
|
||
|
|
trl_estimate=tech.trl_estimate,
|
||
|
|
source_count=tech.source_count,
|
||
|
|
sources=[s.to_dict() for s in tech.sources],
|
||
|
|
)
|
||
|
|
technology_items.append(item)
|
||
|
|
|
||
|
|
duration = (datetime.now() - start_time).total_seconds()
|
||
|
|
|
||
|
|
# Generate guidance
|
||
|
|
if len(technology_items) == 0:
|
||
|
|
guidance = "No technologies found. Try a different search."
|
||
|
|
elif len(technology_items) <= 3:
|
||
|
|
guidance = f"Found **{len(technology_items)} technologies**. Select the ones you want to evaluate against your capability criteria, then click **Evaluate Selected**."
|
||
|
|
else:
|
||
|
|
guidance = f"Found **{len(technology_items)} unique technologies** (grouped from {len(extraction_result.technologies)} extractions). Review the list and select promising candidates, then click **Evaluate Selected** to see how well they match your capability need."
|
||
|
|
|
||
|
|
# Save extraction data for evaluation step
|
||
|
|
step_data = {
|
||
|
|
"id": result_id,
|
||
|
|
"search_id": search_id,
|
||
|
|
"user_input": search_data["user_input"],
|
||
|
|
"parsed_capability": search_data["parsed_capability"],
|
||
|
|
"grouped_technologies": [self._grouped_tech_to_dict(t) for t in grouped],
|
||
|
|
}
|
||
|
|
save_path = self.config.analyses_dir / f"step_extract_{result_id}.json"
|
||
|
|
with open(save_path, "w") as f:
|
||
|
|
json.dump(step_data, f, indent=2)
|
||
|
|
|
||
|
|
return StepExtractionResult(
|
||
|
|
id=result_id,
|
||
|
|
timestamp=datetime.now().isoformat(),
|
||
|
|
search_id=search_id,
|
||
|
|
technologies=technology_items,
|
||
|
|
raw_extractions=len(extraction_result.technologies),
|
||
|
|
after_grouping=len(grouped),
|
||
|
|
processing_time_seconds=duration,
|
||
|
|
success=True,
|
||
|
|
guidance_message=guidance,
|
||
|
|
)
|
||
|
|
|
||
|
|
def step_evaluate(
|
||
|
|
self,
|
||
|
|
extraction_id: str,
|
||
|
|
technology_ids: List[str]
|
||
|
|
) -> StepEvaluationResult:
|
||
|
|
"""
|
||
|
|
Step 3: Evaluate selected technologies against capability criteria.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
extraction_id: ID from StepExtractionResult
|
||
|
|
technology_ids: List of technology IDs to evaluate
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
StepEvaluationResult with fit assessments
|
||
|
|
"""
|
||
|
|
start_time = datetime.now()
|
||
|
|
result_id = str(uuid.uuid4())[:8]
|
||
|
|
|
||
|
|
logger.info(f"Step 3: Evaluate {len(technology_ids)} technologies...")
|
||
|
|
|
||
|
|
# Load extraction data
|
||
|
|
extract_path = self.config.analyses_dir / f"step_extract_{extraction_id}.json"
|
||
|
|
if not extract_path.exists():
|
||
|
|
return StepEvaluationResult(
|
||
|
|
id=result_id,
|
||
|
|
timestamp=datetime.now().isoformat(),
|
||
|
|
search_id="",
|
||
|
|
technologies=[],
|
||
|
|
summary=MatchSummary(0, 0, 0, 0, 0, None),
|
||
|
|
processing_time_seconds=0,
|
||
|
|
success=False,
|
||
|
|
error="Extraction results not found. Please run extraction step first.",
|
||
|
|
guidance_message="Extraction results expired. Please start over."
|
||
|
|
)
|
||
|
|
|
||
|
|
with open(extract_path) as f:
|
||
|
|
extract_data = json.load(f)
|
||
|
|
|
||
|
|
# Reconstruct parsed capability
|
||
|
|
parsed = self._dict_to_parsed_capability(extract_data["parsed_capability"])
|
||
|
|
|
||
|
|
# Get selected technologies
|
||
|
|
all_grouped = [self._dict_to_grouped_technology(t) for t in extract_data["grouped_technologies"]]
|
||
|
|
selected = [t for t in all_grouped if t.id in technology_ids]
|
||
|
|
|
||
|
|
if not selected:
|
||
|
|
return StepEvaluationResult(
|
||
|
|
id=result_id,
|
||
|
|
timestamp=datetime.now().isoformat(),
|
||
|
|
search_id=extract_data["search_id"],
|
||
|
|
technologies=[],
|
||
|
|
summary=MatchSummary(0, 0, 0, 0, 0, None),
|
||
|
|
processing_time_seconds=0,
|
||
|
|
success=False,
|
||
|
|
error="No valid technologies selected.",
|
||
|
|
guidance_message="Please select at least one technology to evaluate."
|
||
|
|
)
|
||
|
|
|
||
|
|
# Evaluate selected technologies
|
||
|
|
logger.info(f"Evaluating {len(selected)} technologies against criteria...")
|
||
|
|
evaluated = self.capability_evaluator.evaluate_all(
|
||
|
|
selected,
|
||
|
|
parsed.capability_need,
|
||
|
|
parsed.capability_criteria
|
||
|
|
)
|
||
|
|
|
||
|
|
# Sort by fit score
|
||
|
|
evaluated.sort(key=lambda t: t.capability_match.fit_score, reverse=True)
|
||
|
|
|
||
|
|
duration = (datetime.now() - start_time).total_seconds()
|
||
|
|
summary = self._build_summary(evaluated)
|
||
|
|
|
||
|
|
# Generate guidance
|
||
|
|
if summary.high_fit_count > 0:
|
||
|
|
guidance = f"**{summary.high_fit_count} technologies** show HIGH fit for your capability need! Click on each to see detailed criteria assessment. Consider running a **Deep Dive** on promising candidates."
|
||
|
|
elif summary.medium_fit_count > 0:
|
||
|
|
guidance = f"Found **{summary.medium_fit_count} technologies** with MEDIUM fit. These partially address your need. Review the criteria breakdown to understand gaps."
|
||
|
|
else:
|
||
|
|
guidance = "No technologies show strong fit for your criteria. Consider broadening your search or adjusting your requirements."
|
||
|
|
|
||
|
|
return StepEvaluationResult(
|
||
|
|
id=result_id,
|
||
|
|
timestamp=datetime.now().isoformat(),
|
||
|
|
search_id=extract_data["search_id"],
|
||
|
|
technologies=evaluated,
|
||
|
|
summary=summary,
|
||
|
|
processing_time_seconds=duration,
|
||
|
|
success=True,
|
||
|
|
guidance_message=guidance,
|
||
|
|
)
|
||
|
|
|
||
|
|
# Helper methods for serialization/deserialization
|
||
|
|
|
||
|
|
def _search_result_to_dict(self, r) -> Dict[str, Any]:
|
||
|
|
"""Convert SearchResult to dict for JSON storage.
|
||
|
|
|
||
|
|
Handles both SearchResult from base.py and SearchResultItem from this module.
|
||
|
|
"""
|
||
|
|
# Generate an id if not present (SearchResult from base.py uses rank instead)
|
||
|
|
result_id = getattr(r, 'id', None) or f"{r.source_type}_{r.rank}" if hasattr(r, 'rank') else f"{r.source_type}_{hash(r.url) % 100000}"
|
||
|
|
|
||
|
|
return {
|
||
|
|
"id": result_id,
|
||
|
|
"title": r.title,
|
||
|
|
"snippet": r.snippet,
|
||
|
|
"url": r.url,
|
||
|
|
"source_type": r.source_type,
|
||
|
|
"source": getattr(r, 'source', getattr(r, 'source_name', '')),
|
||
|
|
"organization": r.organization,
|
||
|
|
"published_date": r.published_date,
|
||
|
|
"award_amount": r.award_amount,
|
||
|
|
"award_id": getattr(r, 'award_id', None),
|
||
|
|
"trl_estimate": r.trl_estimate,
|
||
|
|
"patent_number": getattr(r, 'patent_number', None),
|
||
|
|
"relevance_score": getattr(r, 'relevance_score', 0.0),
|
||
|
|
}
|
||
|
|
|
||
|
|
def _dict_to_search_result(self, d: Dict[str, Any]) -> SearchResult:
|
||
|
|
"""Reconstruct SearchResult from dict.
|
||
|
|
|
||
|
|
Note: SearchResult from base.py uses 'rank' not 'id', and 'source' not 'source_name'.
|
||
|
|
We store extra fields in raw_data for later use.
|
||
|
|
"""
|
||
|
|
return SearchResult(
|
||
|
|
title=d.get("title", ""),
|
||
|
|
snippet=d.get("snippet", ""),
|
||
|
|
url=d.get("url", ""),
|
||
|
|
source_type=d.get("source_type", ""),
|
||
|
|
source=d.get("source", ""),
|
||
|
|
rank=0, # Rank is lost during serialization, default to 0
|
||
|
|
organization=d.get("organization"),
|
||
|
|
published_date=d.get("published_date"),
|
||
|
|
award_amount=d.get("award_amount"),
|
||
|
|
award_id=d.get("award_id"),
|
||
|
|
trl_estimate=d.get("trl_estimate"),
|
||
|
|
patent_number=d.get("patent_number"),
|
||
|
|
raw_data={
|
||
|
|
"id": d.get("id", ""),
|
||
|
|
"relevance_score": d.get("relevance_score", 0.0),
|
||
|
|
},
|
||
|
|
)
|
||
|
|
|
||
|
|
def _dict_to_parsed_capability(self, d: Dict[str, Any]) -> ParsedCapability:
|
||
|
|
"""Reconstruct ParsedCapability from dict."""
|
||
|
|
cap_need = None
|
||
|
|
if d.get("capability_need"):
|
||
|
|
cn = d["capability_need"]
|
||
|
|
cap_need = CapabilityNeed(
|
||
|
|
functional_need=cn.get("functional_need", ""),
|
||
|
|
domain=cn.get("domain", ""),
|
||
|
|
implied_constraints=cn.get("implied_constraints", []),
|
||
|
|
technology_types_sought=cn.get("technology_types_sought", []),
|
||
|
|
)
|
||
|
|
|
||
|
|
criteria = []
|
||
|
|
for c in d.get("capability_criteria", []):
|
||
|
|
criteria.append(CapabilityCriterion(
|
||
|
|
criterion=c.get("criterion", ""),
|
||
|
|
weight=c.get("weight", "should_have"),
|
||
|
|
keywords=c.get("keywords", []),
|
||
|
|
))
|
||
|
|
|
||
|
|
from ..capability.types import TechnologyIndicators
|
||
|
|
tech_ind = None
|
||
|
|
if d.get("technology_indicators"):
|
||
|
|
ti = d["technology_indicators"]
|
||
|
|
tech_ind = TechnologyIndicators(
|
||
|
|
positive=ti.get("positive", []),
|
||
|
|
negative=ti.get("negative", []),
|
||
|
|
)
|
||
|
|
|
||
|
|
return ParsedCapability(
|
||
|
|
original_query=d.get("original_query", ""),
|
||
|
|
understanding=d.get("understanding", ""),
|
||
|
|
technical_domains=d.get("technical_domains", []),
|
||
|
|
search_queries=d.get("search_queries", []),
|
||
|
|
sbir_queries=d.get("sbir_queries", []),
|
||
|
|
patent_queries=d.get("patent_queries", []),
|
||
|
|
news_queries=d.get("news_queries", []),
|
||
|
|
keywords=d.get("keywords", []),
|
||
|
|
exclusions=d.get("exclusions", []),
|
||
|
|
target_trl_range=tuple(d.get("target_trl_range", [4, 7])),
|
||
|
|
capability_need=cap_need,
|
||
|
|
capability_criteria=criteria,
|
||
|
|
technology_indicators=tech_ind,
|
||
|
|
success=d.get("success", True),
|
||
|
|
error=d.get("error"),
|
||
|
|
)
|
||
|
|
|
||
|
|
def _grouped_tech_to_dict(self, t: GroupedTechnology) -> Dict[str, Any]:
|
||
|
|
"""Convert GroupedTechnology to dict for JSON storage."""
|
||
|
|
return t.to_dict()
|
||
|
|
|
||
|
|
def _dict_to_grouped_technology(self, d: Dict[str, Any]) -> GroupedTechnology:
|
||
|
|
"""Reconstruct GroupedTechnology from dict."""
|
||
|
|
from ..technology.types import Developer, SourceEvidence
|
||
|
|
|
||
|
|
developers = []
|
||
|
|
for dev in d.get("developers", []):
|
||
|
|
developers.append(Developer(
|
||
|
|
name=dev.get("name", ""),
|
||
|
|
type=dev.get("type", "unknown"),
|
||
|
|
location=dev.get("location"),
|
||
|
|
))
|
||
|
|
|
||
|
|
sources = []
|
||
|
|
for src in d.get("sources", []):
|
||
|
|
sources.append(SourceEvidence(
|
||
|
|
source_type=src.get("source_type", ""),
|
||
|
|
source_name=src.get("source_name", ""),
|
||
|
|
title=src.get("title", ""),
|
||
|
|
url=src.get("url", ""),
|
||
|
|
snippet=src.get("snippet", ""),
|
||
|
|
contribution=src.get("contribution", ""),
|
||
|
|
published_date=src.get("published_date"),
|
||
|
|
award_amount=src.get("award_amount"),
|
||
|
|
award_id=src.get("award_id"),
|
||
|
|
patent_number=src.get("patent_number"),
|
||
|
|
))
|
||
|
|
|
||
|
|
return GroupedTechnology(
|
||
|
|
id=d.get("id", ""),
|
||
|
|
canonical_name=d.get("canonical_name", ""),
|
||
|
|
alternate_names=d.get("alternate_names", []),
|
||
|
|
technology_type=d.get("technology_type", ""),
|
||
|
|
description=d.get("description", ""),
|
||
|
|
capabilities=d.get("capabilities", []),
|
||
|
|
mechanism=d.get("mechanism"),
|
||
|
|
developers=developers,
|
||
|
|
trl_estimate=d.get("trl_estimate"),
|
||
|
|
trl_confidence=d.get("trl_confidence", 0.0),
|
||
|
|
trl_evidence=d.get("trl_evidence", []),
|
||
|
|
sources=sources,
|
||
|
|
source_count=d.get("source_count", 0),
|
||
|
|
grouping_confidence=d.get("grouping_confidence", 1.0),
|
||
|
|
grouped_from=d.get("grouped_from", []),
|
||
|
|
)
|