TechScout/api_server.py

494 lines
19 KiB
Python

"""
TechScout API Server
Simple HTTP server that the dashboard calls for discovery/deep-dive/match operations.
"""
import json
import logging
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse
import threading
from techscout.pipeline.discovery import DiscoveryPipeline
from techscout.pipeline.deep_dive import DeepDivePipeline
from techscout.pipeline.capability_matcher import CapabilityMatcherPipeline
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Global pipelines (initialized once)
discovery_pipeline = None
deepdive_pipeline = None
matcher_pipeline = None
def get_discovery_pipeline():
global discovery_pipeline
if discovery_pipeline is None:
logger.info("Initializing discovery pipeline...")
discovery_pipeline = DiscoveryPipeline(model='mistral-nemo:12b')
return discovery_pipeline
def get_deepdive_pipeline():
global deepdive_pipeline
if deepdive_pipeline is None:
logger.info("Initializing deep dive pipeline...")
deepdive_pipeline = DeepDivePipeline(model='mistral-nemo:12b')
return deepdive_pipeline
def get_matcher_pipeline():
global matcher_pipeline
if matcher_pipeline is None:
logger.info("Initializing capability matcher pipeline...")
try:
matcher_pipeline = CapabilityMatcherPipeline(model='mistral-nemo:12b')
logger.info("Capability matcher pipeline initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize matcher pipeline: {e}")
import traceback
traceback.print_exc()
raise
return matcher_pipeline
class TechScoutHandler(BaseHTTPRequestHandler):
def _set_cors_headers(self):
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
def do_OPTIONS(self):
print(f"[DEBUG] do_OPTIONS: path={self.path}", flush=True)
self.send_response(200)
self._set_cors_headers()
self.end_headers()
def do_GET(self):
parsed = urlparse(self.path)
if parsed.path == '/api/health':
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(json.dumps({"status": "ok"}).encode())
elif parsed.path == '/api/discoveries':
self._list_discoveries()
elif parsed.path.startswith('/api/discoveries/'):
discovery_id = parsed.path.split('/')[-1]
self._get_discovery(discovery_id)
else:
self.send_response(404)
self.end_headers()
def do_POST(self):
parsed = urlparse(self.path)
print(f"[DEBUG] do_POST: path={parsed.path}", flush=True)
if parsed.path == '/api/discover':
self._run_discovery()
elif parsed.path == '/api/deepdive':
self._run_deepdive()
elif parsed.path == '/api/match':
self._run_match()
elif parsed.path == '/api/match/search':
self._run_match_search()
elif parsed.path == '/api/match/extract':
self._run_match_extract()
elif parsed.path == '/api/match/evaluate':
self._run_match_evaluate()
else:
self.send_response(404)
self.end_headers()
def _run_discovery(self):
try:
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
data = json.loads(post_data.decode())
query = data.get('query', '')
if not query:
self.send_response(400)
self.send_header('Content-Type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(json.dumps({"error": "Query is required"}).encode())
return
logger.info(f"Starting discovery for: {query[:50]}...")
pipeline = get_discovery_pipeline()
result = pipeline.discover(
capability_gap=query,
max_results=50,
use_llm_scoring=True
)
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(json.dumps({"result": result.to_dict()}).encode())
logger.info(f"Discovery complete: {len(result.candidates)} candidates")
except Exception as e:
logger.error(f"Discovery failed: {e}")
self.send_response(500)
self.send_header('Content-Type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(json.dumps({"error": str(e)}).encode())
def _run_deepdive(self):
try:
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
data = json.loads(post_data.decode())
organization = data.get('organization', '')
technology = data.get('technology', '')
gap = data.get('gap', '')
if not organization:
self.send_response(400)
self.send_header('Content-Type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(json.dumps({"error": "Organization is required"}).encode())
return
# Validate organization is not "Unknown" or similar placeholder
if organization.lower() in ('unknown', 'n/a', 'none', ''):
self.send_response(400)
self.send_header('Content-Type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(json.dumps({"error": "Cannot perform deep dive on unknown organization. Please select a candidate with a known company name."}).encode())
return
logger.info(f"Starting deep dive for: {organization}")
pipeline = get_deepdive_pipeline()
result = pipeline.deep_dive(
organization=organization,
technology_context=technology,
capability_gap=gap
)
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(json.dumps({"result": result.to_dict()}).encode())
logger.info(f"Deep dive complete for: {organization}")
except Exception as e:
logger.error(f"Deep dive failed: {e}")
self.send_response(500)
self.send_header('Content-Type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(json.dumps({"error": str(e)}).encode())
def _run_match(self):
"""Run capability-to-technology matching (new pipeline)."""
try:
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
data = json.loads(post_data.decode())
query = data.get('query', '')
if not query:
self.send_response(400)
self.send_header('Content-Type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(json.dumps({"error": "Query is required"}).encode())
return
# Optional parameters
max_technologies = data.get('max_technologies', 15)
min_fit_score = data.get('min_fit_score', 25)
sources = data.get('sources', None)
logger.info(f"Starting capability match for: {query[:50]}...")
pipeline = get_matcher_pipeline()
result = pipeline.match(
user_input=query,
max_technologies=max_technologies,
min_fit_score=min_fit_score,
sources=sources
)
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(json.dumps({"result": result.to_dict()}).encode())
logger.info(f"Match complete: {len(result.technologies)} technologies found")
except Exception as e:
logger.error(f"Match failed: {e}")
import traceback
traceback.print_exc()
self.send_response(500)
self.send_header('Content-Type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(json.dumps({"error": str(e)}).encode())
def _run_match_search(self):
"""Step 1: Parse capability and search sources."""
print("[DEBUG] _run_match_search: Handler started", flush=True)
logger.info("_run_match_search: Handler started")
try:
content_length = int(self.headers['Content-Length'])
print(f"[DEBUG] _run_match_search: Reading {content_length} bytes", flush=True)
logger.info(f"_run_match_search: Reading {content_length} bytes")
post_data = self.rfile.read(content_length)
print("[DEBUG] _run_match_search: Parsing JSON", flush=True)
logger.info(f"_run_match_search: Parsing JSON")
data = json.loads(post_data.decode())
print(f"[DEBUG] _run_match_search: Got data: {list(data.keys())}", flush=True)
logger.info(f"_run_match_search: Got data: {list(data.keys())}")
query = data.get('query', '')
if not query:
self.send_response(400)
self.send_header('Content-Type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(json.dumps({"error": "Query is required"}).encode())
return
sources = data.get('sources', None)
logger.info(f"Step 1 - Search: {query[:50]}...")
logger.info("Getting matcher pipeline...")
pipeline = get_matcher_pipeline()
logger.info("Got pipeline, calling step_search...")
try:
result = pipeline.step_search(
user_input=query,
sources=sources
)
logger.info(f"step_search returned: success={result.success}")
except Exception as e:
logger.error(f"step_search raised exception: {e}")
import traceback
traceback.print_exc()
raise
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(json.dumps({"result": result.to_dict()}).encode())
logger.info(f"Step 1 complete: {len(result.search_results)} results found")
except Exception as e:
logger.error(f"Match search failed: {e}")
import traceback
traceback.print_exc()
self.send_response(500)
self.send_header('Content-Type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(json.dumps({"error": str(e)}).encode())
def _run_match_extract(self):
"""Step 2: Extract technologies from search results."""
try:
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
data = json.loads(post_data.decode())
search_id = data.get('search_id', '')
if not search_id:
self.send_response(400)
self.send_header('Content-Type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(json.dumps({"error": "search_id is required"}).encode())
return
logger.info(f"Step 2 - Extract: search_id={search_id}")
pipeline = get_matcher_pipeline()
result = pipeline.step_extract(search_id=search_id)
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(json.dumps({"result": result.to_dict()}).encode())
logger.info(f"Step 2 complete: {len(result.technologies)} technologies extracted")
except Exception as e:
logger.error(f"Match extract failed: {e}")
import traceback
traceback.print_exc()
self.send_response(500)
self.send_header('Content-Type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(json.dumps({"error": str(e)}).encode())
def _run_match_evaluate(self):
"""Step 3: Evaluate selected technologies."""
try:
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
data = json.loads(post_data.decode())
extraction_id = data.get('extraction_id', '')
technology_ids = data.get('technology_ids', [])
if not extraction_id:
self.send_response(400)
self.send_header('Content-Type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(json.dumps({"error": "extraction_id is required"}).encode())
return
if not technology_ids:
self.send_response(400)
self.send_header('Content-Type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(json.dumps({"error": "technology_ids is required"}).encode())
return
logger.info(f"Step 3 - Evaluate: {len(technology_ids)} technologies")
pipeline = get_matcher_pipeline()
result = pipeline.step_evaluate(
extraction_id=extraction_id,
technology_ids=technology_ids
)
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(json.dumps({"result": result.to_dict()}).encode())
logger.info(f"Step 3 complete: {len(result.technologies)} technologies evaluated")
except Exception as e:
logger.error(f"Match evaluate failed: {e}")
import traceback
traceback.print_exc()
self.send_response(500)
self.send_header('Content-Type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(json.dumps({"error": str(e)}).encode())
def _list_discoveries(self):
import os
from pathlib import Path
analyses_dir = Path(__file__).parent / "analyses"
try:
discoveries = []
if analyses_dir.exists():
for f in sorted(analyses_dir.glob("discovery_*.json"), reverse=True):
with open(f) as fp:
data = json.load(fp)
discoveries.append({
"id": data.get("id"),
"capability_gap": data.get("capability_gap"),
"timestamp": data.get("timestamp"),
"candidate_count": len(data.get("candidates", [])),
"top_score": data["candidates"][0]["score"] if data.get("candidates") else 0
})
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(json.dumps({"discoveries": discoveries}).encode())
except Exception as e:
self.send_response(500)
self.send_header('Content-Type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(json.dumps({"error": str(e)}).encode())
def _get_discovery(self, discovery_id):
from pathlib import Path
analyses_dir = Path(__file__).parent / "analyses"
file_path = analyses_dir / f"discovery_{discovery_id}.json"
try:
if not file_path.exists():
self.send_response(404)
self.send_header('Content-Type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(json.dumps({"error": "Not found"}).encode())
return
with open(file_path) as f:
data = json.load(f)
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(json.dumps(data).encode())
except Exception as e:
self.send_response(500)
self.send_header('Content-Type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(json.dumps({"error": str(e)}).encode())
def log_message(self, format, *args):
logger.info("%s - %s" % (self.client_address[0], format % args))
def run_server(port=8000):
server = HTTPServer(('localhost', port), TechScoutHandler)
logger.info(f"TechScout API Server running on http://localhost:{port}")
logger.info("Endpoints:")
logger.info(" POST /api/discover - Run technology discovery (legacy)")
logger.info(" POST /api/deepdive - Run deep dive analysis")
logger.info("")
logger.info(" Step-based Capability Matching (Guided Workflow):")
logger.info(" POST /api/match/search - Step 1: Parse & Search")
logger.info(" POST /api/match/extract - Step 2: Extract Technologies")
logger.info(" POST /api/match/evaluate - Step 3: Evaluate Selected")
logger.info("")
logger.info(" POST /api/match - Run full matching pipeline (may timeout)")
logger.info(" GET /api/discoveries - List saved discoveries")
logger.info(" GET /api/discoveries/<id> - Get specific discovery")
server.serve_forever()
if __name__ == '__main__':
run_server(port=8000)