""" Capability Matcher Pipeline for TechScout. Orchestrates the full capability-to-technology matching process: 1. Parse capability need from natural language 2. Search sources for relevant content 3. Extract technologies from search results 4. Group duplicate technologies 5. Evaluate capability fit 6. Return ranked technologies This is the main entry point for the new technology-centric approach. """ import json import logging import uuid from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import List, Dict, Any, Optional, Generator from ..config import TechScoutConfig, config as default_config from ..extraction.llm_client import OllamaClient from ..extraction.org_extractor import OrganizationExtractor from ..search.web import WebSearcher from ..search.base import SearchResult from ..sources.sbir import SBIRSearcher from ..sources.patents import PatentSearcher from ..sources.contracts import ContractSearcher from ..capability.parser import CapabilityParser from ..capability.types import ParsedCapability, CapabilityNeed, CapabilityCriterion from ..technology.extractor import TechnologyExtractor from ..technology.grouper import TechnologyGrouper from ..technology.evaluator import CapabilityEvaluator from ..technology.types import EvaluatedTechnology, GroupedTechnology logger = logging.getLogger(__name__) @dataclass class MatchSummary: """Summary of match results.""" total_technologies: int high_fit_count: int medium_fit_count: int low_fit_count: int uncertain_count: int top_recommendation: Optional[str] def to_dict(self) -> Dict[str, Any]: return { "total_technologies": self.total_technologies, "high_fit_count": self.high_fit_count, "medium_fit_count": self.medium_fit_count, "low_fit_count": self.low_fit_count, "uncertain_count": self.uncertain_count, "top_recommendation": self.top_recommendation, } # ============================================================================ # Step-Based Pipeline Results (for guided workflow) # ============================================================================ @dataclass class SearchResultItem: """A single search result for the step-based workflow.""" id: str title: str snippet: str url: str source_type: str source_name: str organization: Optional[str] = None published_date: Optional[str] = None award_amount: Optional[float] = None trl_estimate: Optional[int] = None def to_dict(self) -> Dict[str, Any]: return { "id": self.id, "title": self.title, "snippet": self.snippet, "url": self.url, "source_type": self.source_type, "source_name": self.source_name, "organization": self.organization, "published_date": self.published_date, "award_amount": self.award_amount, "trl_estimate": self.trl_estimate, } @dataclass class StepSearchResult: """ Result of Step 1: Search & Parse. Contains parsed capability and raw search results for user review. """ id: str timestamp: str user_input: str # Parsed capability info capability_need: Optional[CapabilityNeed] capability_criteria: List[CapabilityCriterion] parsed_capability: Optional[ParsedCapability] # Search results search_results: List[SearchResultItem] source_counts: Dict[str, int] # Timing processing_time_seconds: float # Status success: bool = True error: Optional[str] = None # Guidance guidance_message: str = "" def to_dict(self) -> Dict[str, Any]: return { "id": self.id, "timestamp": self.timestamp, "user_input": self.user_input, "capability_need": self.capability_need.to_dict() if self.capability_need else None, "capability_criteria": [c.to_dict() for c in self.capability_criteria], "parsed_capability": self.parsed_capability.to_dict() if self.parsed_capability else None, "search_results": [r.to_dict() for r in self.search_results], "source_counts": self.source_counts, "processing_time_seconds": self.processing_time_seconds, "success": self.success, "error": self.error, "guidance_message": self.guidance_message, } @dataclass class TechnologyItem: """A technology item for the step-based workflow (pre-evaluation).""" id: str canonical_name: str technology_type: str description: str capabilities: List[str] developers: List[Dict[str, Any]] trl_estimate: Optional[int] source_count: int sources: List[Dict[str, Any]] def to_dict(self) -> Dict[str, Any]: return { "id": self.id, "canonical_name": self.canonical_name, "technology_type": self.technology_type, "description": self.description, "capabilities": self.capabilities, "developers": self.developers, "trl_estimate": self.trl_estimate, "source_count": self.source_count, "sources": self.sources, } @dataclass class StepExtractionResult: """ Result of Step 2: Technology Extraction. Contains extracted and grouped technologies for user selection. """ id: str timestamp: str search_id: str # Links to StepSearchResult # Technologies found technologies: List[TechnologyItem] # Stats raw_extractions: int after_grouping: int # Timing processing_time_seconds: float # Status success: bool = True error: Optional[str] = None # Guidance guidance_message: str = "" def to_dict(self) -> Dict[str, Any]: return { "id": self.id, "timestamp": self.timestamp, "search_id": self.search_id, "technologies": [t.to_dict() for t in self.technologies], "raw_extractions": self.raw_extractions, "after_grouping": self.after_grouping, "processing_time_seconds": self.processing_time_seconds, "success": self.success, "error": self.error, "guidance_message": self.guidance_message, } @dataclass class StepEvaluationResult: """ Result of Step 3: Capability Evaluation. Contains evaluated technologies with fit scores. """ id: str timestamp: str search_id: str # Links to original search # Evaluated technologies technologies: List[EvaluatedTechnology] # Summary summary: MatchSummary # Timing processing_time_seconds: float # Status success: bool = True error: Optional[str] = None # Guidance guidance_message: str = "" def to_dict(self) -> Dict[str, Any]: return { "id": self.id, "timestamp": self.timestamp, "search_id": self.search_id, "technologies": [t.to_dict() for t in self.technologies], "summary": self.summary.to_dict(), "processing_time_seconds": self.processing_time_seconds, "success": self.success, "error": self.error, "guidance_message": self.guidance_message, } @dataclass class SearchMetadata: """Metadata about the search process.""" total_documents_searched: int technologies_extracted: int technologies_after_grouping: int sources_used: List[str] processing_time_seconds: float def to_dict(self) -> Dict[str, Any]: return { "total_documents_searched": self.total_documents_searched, "technologies_extracted": self.technologies_extracted, "technologies_after_grouping": self.technologies_after_grouping, "sources_used": self.sources_used, "processing_time_seconds": self.processing_time_seconds, } @dataclass class CapabilityMatchResult: """ Complete result of capability-to-technology matching. This is the main output type for the new pipeline. """ # Identity id: str timestamp: str # Input user_input: str capability_need: Optional[CapabilityNeed] capability_criteria: List[CapabilityCriterion] # Output technologies: List[EvaluatedTechnology] # Summary summary: MatchSummary # Metadata metadata: SearchMetadata # Status success: bool = True error: Optional[str] = None # Original parsing (for transparency) parsed_capability: Optional[ParsedCapability] = None def to_dict(self) -> Dict[str, Any]: return { "id": self.id, "timestamp": self.timestamp, "user_input": self.user_input, "capability_need": self.capability_need.to_dict() if self.capability_need else None, "capability_criteria": [c.to_dict() for c in self.capability_criteria], "technologies": [t.to_dict() for t in self.technologies], "summary": self.summary.to_dict(), "metadata": self.metadata.to_dict(), "success": self.success, "error": self.error, "parsed_capability": self.parsed_capability.to_dict() if self.parsed_capability else None, } def save(self, path: Path): """Save result to JSON file.""" with open(path, "w") as f: json.dump(self.to_dict(), f, indent=2) @dataclass class StatusUpdate: """Status update during pipeline execution.""" stage: str message: str progress: Optional[int] = None # 0-100 class CapabilityMatcherPipeline: """ Main pipeline for capability-to-technology matching. Orchestrates: 1. Capability Parser (Stage 1) 2. Source Search (Stage 2) 3. Technology Extractor (Stage 3) 4. Technology Grouper (Stage 4) 5. Capability Evaluator (Stage 5) """ def __init__( self, config: Optional[TechScoutConfig] = None, model: str = "mistral-nemo:12b" ): self.config = config or default_config self.model = model # Initialize LLM client self.llm_client = OllamaClient( base_url=self.config.ollama.base_url, default_model=model ) # Initialize pipeline stages self.capability_parser = CapabilityParser(self.llm_client, model) self.technology_extractor = TechnologyExtractor(self.llm_client, model) self.technology_grouper = TechnologyGrouper() self.capability_evaluator = CapabilityEvaluator(self.llm_client, model) self.org_extractor = OrganizationExtractor(self.llm_client, model) # Initialize searchers self.web_searcher = WebSearcher() self.sbir_searcher = SBIRSearcher() self.patent_searcher = PatentSearcher() self.contract_searcher = ContractSearcher() def match( self, user_input: str, max_technologies: int = 15, min_fit_score: int = 25, sources: Optional[List[str]] = None ) -> CapabilityMatchResult: """ Run the full capability-to-technology matching pipeline. Args: user_input: Natural language capability need description max_technologies: Maximum technologies to return min_fit_score: Minimum fit score to include sources: Which sources to search (default: all) Returns: CapabilityMatchResult with ranked technologies """ start_time = datetime.now() result_id = str(uuid.uuid4())[:8] sources = sources or ["sbir", "patents", "contracts", "web", "news"] logger.info(f"Starting capability match for: {user_input[:100]}...") # Stage 1: Parse capability need logger.info("Stage 1: Parsing capability need...") parsed = self.capability_parser.parse(user_input) if not parsed.success: return self._create_error_result( result_id, user_input, start_time, f"Capability parsing failed: {parsed.error}" ) logger.info(f"Parsed capability need: {parsed.capability_need.functional_need[:100] if parsed.capability_need else 'N/A'}") logger.info(f"Generated {len(parsed.capability_criteria)} evaluation criteria") # Stage 2: Search sources logger.info("Stage 2: Searching sources...") search_results = self._search_all_sources(parsed, sources) logger.info(f"Found {len(search_results)} total search results") if not search_results: return self._create_error_result( result_id, user_input, start_time, "No search results found", parsed ) # Extract organizations for results missing them self._extract_organizations(search_results) # Stage 3: Extract technologies logger.info("Stage 3: Extracting technologies from results...") extraction_result = self.technology_extractor.extract_all( search_results, parsed.capability_need, parsed.technology_indicators ) logger.info(f"Extracted {len(extraction_result.technologies)} technologies") if not extraction_result.technologies: return self._create_empty_result( result_id, user_input, start_time, parsed, len(search_results), 0, 0 ) # Stage 4: Group technologies logger.info("Stage 4: Grouping duplicate technologies...") grouped_technologies = self.technology_grouper.group(extraction_result.technologies) logger.info(f"Grouped into {len(grouped_technologies)} unique technologies") # Stage 5: Evaluate capability fit logger.info("Stage 5: Evaluating capability fit...") evaluated_technologies = self.capability_evaluator.evaluate_all( grouped_technologies, parsed.capability_need, parsed.capability_criteria ) # Filter by minimum fit score filtered_technologies = [ t for t in evaluated_technologies if t.capability_match.fit_score >= min_fit_score ] # Limit to max_technologies final_technologies = filtered_technologies[:max_technologies] # Build summary summary = self._build_summary(final_technologies) # Build metadata duration = (datetime.now() - start_time).total_seconds() metadata = SearchMetadata( total_documents_searched=len(search_results), technologies_extracted=len(extraction_result.technologies), technologies_after_grouping=len(grouped_technologies), sources_used=sources, processing_time_seconds=duration ) # Build result result = CapabilityMatchResult( id=result_id, timestamp=datetime.now().isoformat(), user_input=user_input, capability_need=parsed.capability_need, capability_criteria=parsed.capability_criteria, technologies=final_technologies, summary=summary, metadata=metadata, success=True, parsed_capability=parsed ) # Save result save_path = self.config.analyses_dir / f"match_{result_id}.json" result.save(save_path) logger.info(f"Saved match result to {save_path}") return result def match_with_status( self, user_input: str, max_technologies: int = 15, min_fit_score: int = 25, sources: Optional[List[str]] = None ) -> Generator[StatusUpdate | CapabilityMatchResult, None, None]: """ Run matching pipeline with status updates. Yields StatusUpdate objects during processing, then final CapabilityMatchResult. """ start_time = datetime.now() result_id = str(uuid.uuid4())[:8] sources = sources or ["sbir", "patents", "contracts", "web", "news"] # Stage 1: Parse yield StatusUpdate("parsing", "Parsing capability need...", 10) parsed = self.capability_parser.parse(user_input) if not parsed.success: yield self._create_error_result( result_id, user_input, start_time, f"Capability parsing failed: {parsed.error}" ) return # Stage 2: Search yield StatusUpdate("searching", f"Searching {len(sources)} sources...", 20) search_results = self._search_all_sources(parsed, sources) yield StatusUpdate("searching", f"Found {len(search_results)} results", 40) if not search_results: yield self._create_error_result( result_id, user_input, start_time, "No search results found", parsed ) return self._extract_organizations(search_results) # Stage 3: Extract yield StatusUpdate("extracting", "Extracting technologies from results...", 50) extraction_result = self.technology_extractor.extract_all( search_results, parsed.capability_need, parsed.technology_indicators ) yield StatusUpdate("extracting", f"Extracted {len(extraction_result.technologies)} technologies", 65) if not extraction_result.technologies: yield self._create_empty_result( result_id, user_input, start_time, parsed, len(search_results), 0, 0 ) return # Stage 4: Group yield StatusUpdate("grouping", "Grouping duplicate technologies...", 70) grouped_technologies = self.technology_grouper.group(extraction_result.technologies) yield StatusUpdate("grouping", f"Found {len(grouped_technologies)} unique technologies", 75) # Stage 5: Evaluate yield StatusUpdate("evaluating", "Evaluating capability fit...", 80) evaluated_technologies = self.capability_evaluator.evaluate_all( grouped_technologies, parsed.capability_need, parsed.capability_criteria ) yield StatusUpdate("evaluating", "Evaluation complete", 95) # Filter and finalize filtered = [t for t in evaluated_technologies if t.capability_match.fit_score >= min_fit_score] final = filtered[:max_technologies] summary = self._build_summary(final) duration = (datetime.now() - start_time).total_seconds() metadata = SearchMetadata( total_documents_searched=len(search_results), technologies_extracted=len(extraction_result.technologies), technologies_after_grouping=len(grouped_technologies), sources_used=sources, processing_time_seconds=duration ) result = CapabilityMatchResult( id=result_id, timestamp=datetime.now().isoformat(), user_input=user_input, capability_need=parsed.capability_need, capability_criteria=parsed.capability_criteria, technologies=final, summary=summary, metadata=metadata, success=True, parsed_capability=parsed ) save_path = self.config.analyses_dir / f"match_{result_id}.json" result.save(save_path) yield StatusUpdate("complete", f"Found {len(final)} technologies", 100) yield result def _search_all_sources( self, parsed: ParsedCapability, sources: List[str] ) -> List[SearchResult]: """Search all configured sources.""" all_results: List[SearchResult] = [] # SBIR/STTR if "sbir" in sources: logger.info("Searching SBIR/STTR...") for query in parsed.sbir_queries[:3]: try: results = self.sbir_searcher.search(query, max_results=15) all_results.extend(results) except Exception as e: logger.warning(f"SBIR search failed: {e}") # Patents if "patents" in sources: logger.info("Searching patents...") for query in parsed.patent_queries[:3]: try: results = self.patent_searcher.search(query, max_results=15) all_results.extend(results) except Exception as e: logger.warning(f"Patent search failed: {e}") # Contracts if "contracts" in sources: logger.info("Searching federal contracts...") for query in parsed.search_queries[:2]: try: results = self.contract_searcher.search_dod(query, max_results=10) all_results.extend(results) except Exception as e: logger.warning(f"Contract search failed: {e}") # Web if "web" in sources: logger.info("Searching web...") for query in parsed.search_queries[:4]: try: results = self.web_searcher.search(query, max_results=10) all_results.extend(results) except Exception as e: logger.warning(f"Web search failed: {e}") # News if "news" in sources: logger.info("Searching news...") for query in parsed.news_queries[:2]: try: results = self.web_searcher.search(query, max_results=10, news_only=True) all_results.extend(results) except Exception as e: logger.warning(f"News search failed: {e}") # Deduplicate by URL seen_urls = set() unique_results = [] for result in all_results: if result.url and result.url not in seen_urls: seen_urls.add(result.url) unique_results.append(result) return unique_results def _extract_organizations(self, results: List[SearchResult]): """Extract organizations for results that don't have them.""" results_needing_org = [ (i, r) for i, r in enumerate(results) if r.source_type in ("web", "news", "government", "academic") and not r.organization ] if results_needing_org: items_to_extract = [(r.title, r.snippet) for _, r in results_needing_org] extractions = self.org_extractor.extract_batch(items_to_extract, use_llm_fallback=True) for (idx, result), extraction in zip(results_needing_org, extractions): if extraction.organization: results[idx].organization = extraction.organization def _build_summary(self, technologies: List[EvaluatedTechnology]) -> MatchSummary: """Build summary of match results.""" high_count = sum(1 for t in technologies if t.capability_match.overall_fit == "HIGH") medium_count = sum(1 for t in technologies if t.capability_match.overall_fit == "MEDIUM") low_count = sum(1 for t in technologies if t.capability_match.overall_fit == "LOW") uncertain_count = sum(1 for t in technologies if t.capability_match.overall_fit == "UNCERTAIN") top_rec = None if technologies and technologies[0].capability_match.overall_fit in ("HIGH", "MEDIUM"): top_rec = technologies[0].technology.canonical_name return MatchSummary( total_technologies=len(technologies), high_fit_count=high_count, medium_fit_count=medium_count, low_fit_count=low_count, uncertain_count=uncertain_count, top_recommendation=top_rec ) def _create_error_result( self, result_id: str, user_input: str, start_time: datetime, error: str, parsed: Optional[ParsedCapability] = None ) -> CapabilityMatchResult: """Create an error result.""" duration = (datetime.now() - start_time).total_seconds() return CapabilityMatchResult( id=result_id, timestamp=datetime.now().isoformat(), user_input=user_input, capability_need=parsed.capability_need if parsed else None, capability_criteria=parsed.capability_criteria if parsed else [], technologies=[], summary=MatchSummary(0, 0, 0, 0, 0, None), metadata=SearchMetadata(0, 0, 0, [], duration), success=False, error=error, parsed_capability=parsed ) def _create_empty_result( self, result_id: str, user_input: str, start_time: datetime, parsed: ParsedCapability, docs_searched: int, techs_extracted: int, techs_grouped: int ) -> CapabilityMatchResult: """Create a result with no technologies found.""" duration = (datetime.now() - start_time).total_seconds() return CapabilityMatchResult( id=result_id, timestamp=datetime.now().isoformat(), user_input=user_input, capability_need=parsed.capability_need, capability_criteria=parsed.capability_criteria, technologies=[], summary=MatchSummary(0, 0, 0, 0, 0, None), metadata=SearchMetadata(docs_searched, techs_extracted, techs_grouped, [], duration), success=True, error=None, parsed_capability=parsed ) # ======================================================================== # Step-Based Pipeline Methods (for guided workflow) # ======================================================================== def step_search( self, user_input: str, sources: Optional[List[str]] = None ) -> StepSearchResult: """ Step 1: Parse capability need and search sources. This is the first step in the guided workflow. Returns search results for user review before proceeding to technology extraction. Args: user_input: Natural language capability need description sources: Which sources to search (default: all) Returns: StepSearchResult with parsed capability and search results """ start_time = datetime.now() result_id = str(uuid.uuid4())[:8] sources = sources or ["sbir", "patents", "contracts", "web", "news"] logger.info(f"Step 1: Search & Parse for: {user_input[:100]}...") # Parse capability need logger.info("Parsing capability need...") parsed = self.capability_parser.parse(user_input) if not parsed.success: duration = (datetime.now() - start_time).total_seconds() return StepSearchResult( id=result_id, timestamp=datetime.now().isoformat(), user_input=user_input, capability_need=None, capability_criteria=[], parsed_capability=None, search_results=[], source_counts={}, processing_time_seconds=duration, success=False, error=f"Capability parsing failed: {parsed.error}", guidance_message="Failed to understand your capability need. Please try rephrasing your query." ) # Search all sources logger.info(f"Searching {len(sources)} sources...") raw_results = self._search_all_sources(parsed, sources) # Extract organizations for results missing them self._extract_organizations(raw_results) # Convert to SearchResultItem search_results = [] source_counts: Dict[str, int] = {} for i, r in enumerate(raw_results): item = SearchResultItem( id=f"{result_id}-{i}", title=r.title, snippet=r.snippet, url=r.url, source_type=r.source_type, source_name=r.source, organization=r.organization, published_date=r.published_date, award_amount=r.award_amount, trl_estimate=r.trl_estimate, ) search_results.append(item) source_counts[r.source_type] = source_counts.get(r.source_type, 0) + 1 duration = (datetime.now() - start_time).total_seconds() # Generate guidance message if not search_results: guidance = "No results found. Consider broadening your search terms or enabling more sources." elif len(search_results) < 10: guidance = f"Found only **{len(search_results)} results**. You may want to broaden your query. If these look relevant, proceed to **Extract Technologies**." else: source_summary = ", ".join([f"{count} {src}" for src, count in sorted(source_counts.items(), key=lambda x: -x[1])]) guidance = f"Found **{len(search_results)} results** ({source_summary}). Review the results below. If they look relevant to your capability need, proceed to **Extract Technologies**. Otherwise, refine your query." # Store the raw results and parsed capability for later steps # We'll save this to disk so the extraction step can retrieve it step_data = { "id": result_id, "user_input": user_input, "parsed_capability": parsed.to_dict(), "raw_results": [self._search_result_to_dict(r) for r in raw_results], } save_path = self.config.analyses_dir / f"step_search_{result_id}.json" with open(save_path, "w") as f: json.dump(step_data, f, indent=2) return StepSearchResult( id=result_id, timestamp=datetime.now().isoformat(), user_input=user_input, capability_need=parsed.capability_need, capability_criteria=parsed.capability_criteria, parsed_capability=parsed, search_results=search_results, source_counts=source_counts, processing_time_seconds=duration, success=True, guidance_message=guidance, ) def step_extract(self, search_id: str) -> StepExtractionResult: """ Step 2: Extract technologies from search results. Takes the search results from Step 1 and extracts/groups technologies. Args: search_id: ID from StepSearchResult Returns: StepExtractionResult with extracted technologies """ start_time = datetime.now() result_id = str(uuid.uuid4())[:8] logger.info(f"Step 2: Extract Technologies for search {search_id}...") # Load the saved search data search_path = self.config.analyses_dir / f"step_search_{search_id}.json" if not search_path.exists(): return StepExtractionResult( id=result_id, timestamp=datetime.now().isoformat(), search_id=search_id, technologies=[], raw_extractions=0, after_grouping=0, processing_time_seconds=0, success=False, error=f"Search results not found. Please run search step first.", guidance_message="Search results expired or not found. Please start a new search." ) with open(search_path) as f: search_data = json.load(f) # Reconstruct parsed capability parsed = self._dict_to_parsed_capability(search_data["parsed_capability"]) # Reconstruct search results raw_results = [self._dict_to_search_result(r) for r in search_data["raw_results"]] # Extract technologies logger.info(f"Extracting technologies from {len(raw_results)} results...") extraction_result = self.technology_extractor.extract_all( raw_results, parsed.capability_need, parsed.technology_indicators ) if not extraction_result.technologies: duration = (datetime.now() - start_time).total_seconds() return StepExtractionResult( id=result_id, timestamp=datetime.now().isoformat(), search_id=search_id, technologies=[], raw_extractions=0, after_grouping=0, processing_time_seconds=duration, success=True, guidance_message="No specific technologies could be extracted from the search results. The results may be too general. Try refining your search with more specific technical terms." ) # Group technologies logger.info(f"Grouping {len(extraction_result.technologies)} extracted technologies...") grouped = self.technology_grouper.group(extraction_result.technologies) # Convert to TechnologyItem technology_items = [] for tech in grouped: item = TechnologyItem( id=tech.id, canonical_name=tech.canonical_name, technology_type=tech.technology_type, description=tech.description, capabilities=tech.capabilities, developers=[d.to_dict() for d in tech.developers], trl_estimate=tech.trl_estimate, source_count=tech.source_count, sources=[s.to_dict() for s in tech.sources], ) technology_items.append(item) duration = (datetime.now() - start_time).total_seconds() # Generate guidance if len(technology_items) == 0: guidance = "No technologies found. Try a different search." elif len(technology_items) <= 3: guidance = f"Found **{len(technology_items)} technologies**. Select the ones you want to evaluate against your capability criteria, then click **Evaluate Selected**." else: guidance = f"Found **{len(technology_items)} unique technologies** (grouped from {len(extraction_result.technologies)} extractions). Review the list and select promising candidates, then click **Evaluate Selected** to see how well they match your capability need." # Save extraction data for evaluation step step_data = { "id": result_id, "search_id": search_id, "user_input": search_data["user_input"], "parsed_capability": search_data["parsed_capability"], "grouped_technologies": [self._grouped_tech_to_dict(t) for t in grouped], } save_path = self.config.analyses_dir / f"step_extract_{result_id}.json" with open(save_path, "w") as f: json.dump(step_data, f, indent=2) return StepExtractionResult( id=result_id, timestamp=datetime.now().isoformat(), search_id=search_id, technologies=technology_items, raw_extractions=len(extraction_result.technologies), after_grouping=len(grouped), processing_time_seconds=duration, success=True, guidance_message=guidance, ) def step_evaluate( self, extraction_id: str, technology_ids: List[str] ) -> StepEvaluationResult: """ Step 3: Evaluate selected technologies against capability criteria. Args: extraction_id: ID from StepExtractionResult technology_ids: List of technology IDs to evaluate Returns: StepEvaluationResult with fit assessments """ start_time = datetime.now() result_id = str(uuid.uuid4())[:8] logger.info(f"Step 3: Evaluate {len(technology_ids)} technologies...") # Load extraction data extract_path = self.config.analyses_dir / f"step_extract_{extraction_id}.json" if not extract_path.exists(): return StepEvaluationResult( id=result_id, timestamp=datetime.now().isoformat(), search_id="", technologies=[], summary=MatchSummary(0, 0, 0, 0, 0, None), processing_time_seconds=0, success=False, error="Extraction results not found. Please run extraction step first.", guidance_message="Extraction results expired. Please start over." ) with open(extract_path) as f: extract_data = json.load(f) # Reconstruct parsed capability parsed = self._dict_to_parsed_capability(extract_data["parsed_capability"]) # Get selected technologies all_grouped = [self._dict_to_grouped_technology(t) for t in extract_data["grouped_technologies"]] selected = [t for t in all_grouped if t.id in technology_ids] if not selected: return StepEvaluationResult( id=result_id, timestamp=datetime.now().isoformat(), search_id=extract_data["search_id"], technologies=[], summary=MatchSummary(0, 0, 0, 0, 0, None), processing_time_seconds=0, success=False, error="No valid technologies selected.", guidance_message="Please select at least one technology to evaluate." ) # Evaluate selected technologies logger.info(f"Evaluating {len(selected)} technologies against criteria...") evaluated = self.capability_evaluator.evaluate_all( selected, parsed.capability_need, parsed.capability_criteria ) # Sort by fit score evaluated.sort(key=lambda t: t.capability_match.fit_score, reverse=True) duration = (datetime.now() - start_time).total_seconds() summary = self._build_summary(evaluated) # Generate guidance if summary.high_fit_count > 0: guidance = f"**{summary.high_fit_count} technologies** show HIGH fit for your capability need! Click on each to see detailed criteria assessment. Consider running a **Deep Dive** on promising candidates." elif summary.medium_fit_count > 0: guidance = f"Found **{summary.medium_fit_count} technologies** with MEDIUM fit. These partially address your need. Review the criteria breakdown to understand gaps." else: guidance = "No technologies show strong fit for your criteria. Consider broadening your search or adjusting your requirements." return StepEvaluationResult( id=result_id, timestamp=datetime.now().isoformat(), search_id=extract_data["search_id"], technologies=evaluated, summary=summary, processing_time_seconds=duration, success=True, guidance_message=guidance, ) # Helper methods for serialization/deserialization def _search_result_to_dict(self, r) -> Dict[str, Any]: """Convert SearchResult to dict for JSON storage. Handles both SearchResult from base.py and SearchResultItem from this module. """ # Generate an id if not present (SearchResult from base.py uses rank instead) result_id = getattr(r, 'id', None) or f"{r.source_type}_{r.rank}" if hasattr(r, 'rank') else f"{r.source_type}_{hash(r.url) % 100000}" return { "id": result_id, "title": r.title, "snippet": r.snippet, "url": r.url, "source_type": r.source_type, "source": getattr(r, 'source', getattr(r, 'source_name', '')), "organization": r.organization, "published_date": r.published_date, "award_amount": r.award_amount, "award_id": getattr(r, 'award_id', None), "trl_estimate": r.trl_estimate, "patent_number": getattr(r, 'patent_number', None), "relevance_score": getattr(r, 'relevance_score', 0.0), } def _dict_to_search_result(self, d: Dict[str, Any]) -> SearchResult: """Reconstruct SearchResult from dict. Note: SearchResult from base.py uses 'rank' not 'id', and 'source' not 'source_name'. We store extra fields in raw_data for later use. """ return SearchResult( title=d.get("title", ""), snippet=d.get("snippet", ""), url=d.get("url", ""), source_type=d.get("source_type", ""), source=d.get("source", ""), rank=0, # Rank is lost during serialization, default to 0 organization=d.get("organization"), published_date=d.get("published_date"), award_amount=d.get("award_amount"), award_id=d.get("award_id"), trl_estimate=d.get("trl_estimate"), patent_number=d.get("patent_number"), raw_data={ "id": d.get("id", ""), "relevance_score": d.get("relevance_score", 0.0), }, ) def _dict_to_parsed_capability(self, d: Dict[str, Any]) -> ParsedCapability: """Reconstruct ParsedCapability from dict.""" cap_need = None if d.get("capability_need"): cn = d["capability_need"] cap_need = CapabilityNeed( functional_need=cn.get("functional_need", ""), domain=cn.get("domain", ""), implied_constraints=cn.get("implied_constraints", []), technology_types_sought=cn.get("technology_types_sought", []), ) criteria = [] for c in d.get("capability_criteria", []): criteria.append(CapabilityCriterion( criterion=c.get("criterion", ""), weight=c.get("weight", "should_have"), keywords=c.get("keywords", []), )) from ..capability.types import TechnologyIndicators tech_ind = None if d.get("technology_indicators"): ti = d["technology_indicators"] tech_ind = TechnologyIndicators( positive=ti.get("positive", []), negative=ti.get("negative", []), ) return ParsedCapability( original_query=d.get("original_query", ""), understanding=d.get("understanding", ""), technical_domains=d.get("technical_domains", []), search_queries=d.get("search_queries", []), sbir_queries=d.get("sbir_queries", []), patent_queries=d.get("patent_queries", []), news_queries=d.get("news_queries", []), keywords=d.get("keywords", []), exclusions=d.get("exclusions", []), target_trl_range=tuple(d.get("target_trl_range", [4, 7])), capability_need=cap_need, capability_criteria=criteria, technology_indicators=tech_ind, success=d.get("success", True), error=d.get("error"), ) def _grouped_tech_to_dict(self, t: GroupedTechnology) -> Dict[str, Any]: """Convert GroupedTechnology to dict for JSON storage.""" return t.to_dict() def _dict_to_grouped_technology(self, d: Dict[str, Any]) -> GroupedTechnology: """Reconstruct GroupedTechnology from dict.""" from ..technology.types import Developer, SourceEvidence developers = [] for dev in d.get("developers", []): developers.append(Developer( name=dev.get("name", ""), type=dev.get("type", "unknown"), location=dev.get("location"), )) sources = [] for src in d.get("sources", []): sources.append(SourceEvidence( source_type=src.get("source_type", ""), source_name=src.get("source_name", ""), title=src.get("title", ""), url=src.get("url", ""), snippet=src.get("snippet", ""), contribution=src.get("contribution", ""), published_date=src.get("published_date"), award_amount=src.get("award_amount"), award_id=src.get("award_id"), patent_number=src.get("patent_number"), )) return GroupedTechnology( id=d.get("id", ""), canonical_name=d.get("canonical_name", ""), alternate_names=d.get("alternate_names", []), technology_type=d.get("technology_type", ""), description=d.get("description", ""), capabilities=d.get("capabilities", []), mechanism=d.get("mechanism"), developers=developers, trl_estimate=d.get("trl_estimate"), trl_confidence=d.get("trl_confidence", 0.0), trl_evidence=d.get("trl_evidence", []), sources=sources, source_count=d.get("source_count", 0), grouping_confidence=d.get("grouping_confidence", 1.0), grouped_from=d.get("grouped_from", []), )