363 lines
12 KiB
Python
363 lines
12 KiB
Python
"""
|
|
Capability Evaluator for TechScout.
|
|
|
|
Evaluates how well each technology matches the user's capability need.
|
|
Produces structured capability-fit assessments.
|
|
|
|
This is Stage 5 of the Capability-Technology Matching pipeline.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
from typing import List, Optional
|
|
|
|
from ..extraction.llm_client import OllamaClient
|
|
from ..capability.types import CapabilityNeed, CapabilityCriterion
|
|
from .types import (
|
|
GroupedTechnology,
|
|
CapabilityMatch,
|
|
CriterionResult,
|
|
EvaluatedTechnology,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class CapabilityEvaluator:
|
|
"""
|
|
Evaluates technologies against capability needs.
|
|
|
|
For each technology, produces:
|
|
- Per-criterion assessments (SUPPORTS/PARTIAL/DOES_NOT_SUPPORT/UNKNOWN)
|
|
- Overall fit score (0-100)
|
|
- Narrative explanation
|
|
- Strengths, limitations, unknowns
|
|
- Investigation recommendation
|
|
"""
|
|
|
|
EVALUATION_PROMPT = """Evaluate whether this technology addresses the user's capability need.
|
|
|
|
USER'S CAPABILITY NEED:
|
|
{functional_need}
|
|
|
|
TECHNOLOGY:
|
|
Name: {tech_name}
|
|
Type: {tech_type}
|
|
Description: {description}
|
|
Capabilities: {capabilities}
|
|
Developer(s): {developers}
|
|
Maturity: TRL {trl_estimate} (Evidence: {trl_evidence})
|
|
|
|
EVALUATION CRITERIA:
|
|
{criteria_text}
|
|
|
|
INSTRUCTIONS:
|
|
For each criterion, assess:
|
|
- SUPPORTS: Technology clearly provides this capability
|
|
- PARTIAL: Technology partially addresses this, with limitations
|
|
- DOES_NOT_SUPPORT: Technology does not address this
|
|
- UNKNOWN: Insufficient information to determine
|
|
|
|
Respond with JSON:
|
|
{{
|
|
"criteria_results": [
|
|
{{
|
|
"criterion": "criterion text",
|
|
"weight": "must_have|should_have|nice_to_have",
|
|
"assessment": "SUPPORTS|PARTIAL|DOES_NOT_SUPPORT|UNKNOWN",
|
|
"evidence": "Explanation with evidence from technology description"
|
|
}}
|
|
],
|
|
"how_it_addresses_need": "2-3 sentences explaining how this technology could address the capability need",
|
|
"key_strengths": ["strength 1", "strength 2"],
|
|
"key_limitations": ["limitation 1", "limitation 2"],
|
|
"key_unknowns": ["unknown 1", "unknown 2"],
|
|
"investigation_worthy": true/false,
|
|
"investigation_rationale": "Why this technology is/isn't worth investigating further"
|
|
}}"""
|
|
|
|
def __init__(
|
|
self,
|
|
ollama_client: Optional[OllamaClient] = None,
|
|
model: str = "mistral-nemo:12b"
|
|
):
|
|
self.client = ollama_client or OllamaClient()
|
|
self.model = model
|
|
|
|
def evaluate_all(
|
|
self,
|
|
technologies: List[GroupedTechnology],
|
|
capability_need: CapabilityNeed,
|
|
capability_criteria: List[CapabilityCriterion]
|
|
) -> List[EvaluatedTechnology]:
|
|
"""
|
|
Evaluate all technologies against capability criteria.
|
|
|
|
Args:
|
|
technologies: List of grouped technologies to evaluate
|
|
capability_need: Structured capability need
|
|
capability_criteria: List of evaluation criteria
|
|
|
|
Returns:
|
|
List of evaluated technologies with fit assessments
|
|
"""
|
|
logger.info(f"Evaluating {len(technologies)} technologies against capability criteria...")
|
|
|
|
evaluated = []
|
|
for tech in technologies:
|
|
match = self._evaluate_technology(tech, capability_need, capability_criteria)
|
|
evaluated.append(EvaluatedTechnology(
|
|
technology=tech,
|
|
capability_match=match
|
|
))
|
|
|
|
# Sort by fit score descending
|
|
evaluated.sort(key=lambda e: e.capability_match.fit_score, reverse=True)
|
|
|
|
logger.info(f"Evaluation complete. Top score: {evaluated[0].capability_match.fit_score if evaluated else 0}")
|
|
|
|
return evaluated
|
|
|
|
def _evaluate_technology(
|
|
self,
|
|
tech: GroupedTechnology,
|
|
capability_need: CapabilityNeed,
|
|
capability_criteria: List[CapabilityCriterion]
|
|
) -> CapabilityMatch:
|
|
"""Evaluate a single technology against capability criteria."""
|
|
|
|
# Build criteria text for prompt
|
|
criteria_text = ""
|
|
for i, criterion in enumerate(capability_criteria, 1):
|
|
criteria_text += f"{i}. [{criterion.weight.upper()}] {criterion.criterion}\n"
|
|
criteria_text += f" Keywords: {', '.join(criterion.keywords[:5])}\n"
|
|
|
|
# Build developers text
|
|
developers_text = ", ".join(d.name for d in tech.developers) if tech.developers else "Unknown"
|
|
|
|
prompt = self.EVALUATION_PROMPT.format(
|
|
functional_need=capability_need.functional_need,
|
|
tech_name=tech.canonical_name,
|
|
tech_type=tech.technology_type,
|
|
description=tech.description[:500],
|
|
capabilities="; ".join(tech.capabilities[:5]),
|
|
developers=developers_text,
|
|
trl_estimate=tech.trl_estimate or "Unknown",
|
|
trl_evidence="; ".join(tech.trl_evidence[:3]) if tech.trl_evidence else "None",
|
|
criteria_text=criteria_text
|
|
)
|
|
|
|
response = self.client.generate(
|
|
prompt=prompt,
|
|
model=self.model,
|
|
temperature=0.1,
|
|
format="json"
|
|
)
|
|
|
|
if not response.success:
|
|
logger.warning(f"LLM evaluation failed for: {tech.canonical_name}")
|
|
return self._create_fallback_match(tech, capability_criteria)
|
|
|
|
try:
|
|
data = json.loads(response.content)
|
|
except json.JSONDecodeError:
|
|
data = self.client.extract_json_from_text(response.content)
|
|
if not data:
|
|
return self._create_fallback_match(tech, capability_criteria)
|
|
|
|
return self._build_capability_match(data, capability_criteria)
|
|
|
|
def _build_capability_match(
|
|
self,
|
|
data: dict,
|
|
capability_criteria: List[CapabilityCriterion]
|
|
) -> CapabilityMatch:
|
|
"""Build CapabilityMatch from LLM response."""
|
|
|
|
# Parse criteria results
|
|
criteria_results = []
|
|
results_data = data.get("criteria_results", [])
|
|
|
|
for result in results_data:
|
|
assessment = result.get("assessment", "UNKNOWN")
|
|
if assessment not in ("SUPPORTS", "PARTIAL", "DOES_NOT_SUPPORT", "UNKNOWN"):
|
|
assessment = "UNKNOWN"
|
|
|
|
weight = result.get("weight", "should_have")
|
|
if weight not in ("must_have", "should_have", "nice_to_have"):
|
|
weight = "should_have"
|
|
|
|
criteria_results.append(CriterionResult(
|
|
criterion=result.get("criterion", ""),
|
|
weight=weight,
|
|
assessment=assessment,
|
|
evidence=result.get("evidence", ""),
|
|
source=result.get("source")
|
|
))
|
|
|
|
# Calculate fit score
|
|
fit_score = self._calculate_fit_score(criteria_results)
|
|
|
|
# Determine overall fit
|
|
overall_fit = self._determine_overall_fit(fit_score, criteria_results)
|
|
|
|
return CapabilityMatch(
|
|
overall_fit=overall_fit,
|
|
fit_score=fit_score,
|
|
criteria_results=criteria_results,
|
|
how_it_addresses_need=data.get("how_it_addresses_need", ""),
|
|
key_strengths=data.get("key_strengths", []),
|
|
key_limitations=data.get("key_limitations", []),
|
|
key_unknowns=data.get("key_unknowns", []),
|
|
investigation_worthy=data.get("investigation_worthy", fit_score >= 50),
|
|
investigation_rationale=data.get("investigation_rationale", "")
|
|
)
|
|
|
|
def _calculate_fit_score(self, criteria_results: List[CriterionResult]) -> int:
|
|
"""
|
|
Calculate fit score based on criteria results.
|
|
|
|
Scoring:
|
|
- must_have SUPPORTS: +30 points
|
|
- must_have PARTIAL: +15 points
|
|
- must_have DOES_NOT_SUPPORT: 0 points (caps overall)
|
|
- should_have SUPPORTS: +15 points
|
|
- should_have PARTIAL: +8 points
|
|
- nice_to_have SUPPORTS: +5 points
|
|
- nice_to_have PARTIAL: +3 points
|
|
- UNKNOWN: +5 points (benefit of doubt)
|
|
|
|
Normalized to 0-100.
|
|
"""
|
|
if not criteria_results:
|
|
return 50 # Neutral score if no criteria
|
|
|
|
total_points = 0
|
|
max_points = 0
|
|
|
|
for result in criteria_results:
|
|
weight = result.weight
|
|
assessment = result.assessment
|
|
|
|
# Calculate max possible points for this criterion
|
|
if weight == "must_have":
|
|
max_points += 30
|
|
elif weight == "should_have":
|
|
max_points += 15
|
|
else: # nice_to_have
|
|
max_points += 5
|
|
|
|
# Calculate actual points
|
|
if assessment == "SUPPORTS":
|
|
if weight == "must_have":
|
|
total_points += 30
|
|
elif weight == "should_have":
|
|
total_points += 15
|
|
else:
|
|
total_points += 5
|
|
elif assessment == "PARTIAL":
|
|
if weight == "must_have":
|
|
total_points += 15
|
|
elif weight == "should_have":
|
|
total_points += 8
|
|
else:
|
|
total_points += 3
|
|
elif assessment == "UNKNOWN":
|
|
# Benefit of doubt
|
|
total_points += 5
|
|
# DOES_NOT_SUPPORT adds 0 points
|
|
|
|
# Normalize to 0-100
|
|
if max_points > 0:
|
|
score = int((total_points / max_points) * 100)
|
|
else:
|
|
score = 50
|
|
|
|
return min(100, max(0, score))
|
|
|
|
def _determine_overall_fit(
|
|
self,
|
|
score: int,
|
|
criteria_results: List[CriterionResult]
|
|
) -> str:
|
|
"""Determine overall fit category based on score and criteria results."""
|
|
|
|
# Check for must_have failures
|
|
must_have_failures = sum(
|
|
1 for r in criteria_results
|
|
if r.weight == "must_have" and r.assessment == "DOES_NOT_SUPPORT"
|
|
)
|
|
|
|
# Check for too many unknowns
|
|
unknown_count = sum(1 for r in criteria_results if r.assessment == "UNKNOWN")
|
|
unknown_ratio = unknown_count / len(criteria_results) if criteria_results else 0
|
|
|
|
# If most criteria are unknown, mark as UNCERTAIN
|
|
if unknown_ratio > 0.5:
|
|
return "UNCERTAIN"
|
|
|
|
# Apply score-based categorization with must_have penalty
|
|
if must_have_failures >= 2:
|
|
return "LOW"
|
|
elif must_have_failures == 1:
|
|
# Cap at MEDIUM if any must_have fails
|
|
if score >= 50:
|
|
return "MEDIUM"
|
|
else:
|
|
return "LOW"
|
|
else:
|
|
# No must_have failures
|
|
if score >= 75:
|
|
return "HIGH"
|
|
elif score >= 50:
|
|
return "MEDIUM"
|
|
elif score >= 25:
|
|
return "LOW"
|
|
else:
|
|
return "LOW"
|
|
|
|
def _create_fallback_match(
|
|
self,
|
|
tech: GroupedTechnology,
|
|
capability_criteria: List[CapabilityCriterion]
|
|
) -> CapabilityMatch:
|
|
"""Create a fallback capability match using heuristics."""
|
|
|
|
# Simple keyword-based assessment
|
|
tech_text = f"{tech.canonical_name} {tech.description} {' '.join(tech.capabilities)}".lower()
|
|
|
|
criteria_results = []
|
|
for criterion in capability_criteria:
|
|
# Check if any keywords appear in tech text
|
|
keyword_matches = sum(1 for kw in criterion.keywords if kw.lower() in tech_text)
|
|
|
|
if keyword_matches >= 2:
|
|
assessment = "SUPPORTS"
|
|
elif keyword_matches == 1:
|
|
assessment = "PARTIAL"
|
|
else:
|
|
assessment = "UNKNOWN"
|
|
|
|
criteria_results.append(CriterionResult(
|
|
criterion=criterion.criterion,
|
|
weight=criterion.weight,
|
|
assessment=assessment,
|
|
evidence=f"Heuristic: {keyword_matches} keyword matches",
|
|
source=None
|
|
))
|
|
|
|
fit_score = self._calculate_fit_score(criteria_results)
|
|
overall_fit = self._determine_overall_fit(fit_score, criteria_results)
|
|
|
|
return CapabilityMatch(
|
|
overall_fit=overall_fit,
|
|
fit_score=fit_score,
|
|
criteria_results=criteria_results,
|
|
how_it_addresses_need="Assessment based on keyword matching (LLM unavailable)",
|
|
key_strengths=[],
|
|
key_limitations=["Full LLM assessment unavailable"],
|
|
key_unknowns=["Detailed capability analysis not performed"],
|
|
investigation_worthy=fit_score >= 50,
|
|
investigation_rationale="Based on keyword matching only"
|
|
)
|