#!/usr/bin/env python3 """Plan C — local-model triage of GIG Gulf reviews on qwen3:8b ($0/call). The smallest money-touching test of the local-model thesis: classify 1,227 real GIG reviews with a local 8B model, then MEASURE (accuracy spot-check + cost saved vs a paid API). Run ON THE STUDIO (Ollama serving qwen3:8b): python3 triage_reviews.py # full run over the CSV python3 triage_reviews.py --limit 30 # quick smoke run python3 triage_reviews.py --mock # no model — validates the pipeline end-to-end Read-only on the source CSV. Writes only to ./out/. Rollback: rm -rf ./out """ import argparse, csv, json, os, sys, time, urllib.request from pathlib import Path from collections import Counter HERE = Path(__file__).resolve().parent SRC = HERE.parent / "data" / "reviews-2026-06-02.csv" OUT = HERE / "out" OLLAMA = os.environ.get("OLLAMA_HOST_URL", "http://localhost:11434") MODEL = os.environ.get("TRIAGE_MODEL", "qwen3:8b") # Fixed taxonomy — the model must choose from THESE only (keeps output comparable + auditable). THEMES = [ "renewal_process", "representative_service", "pricing_value", "claims_handling", "digital_app_website", "speed_efficiency", "communication_clarity", "coverage_product", "other" ] SENTIMENTS = ["positive", "neutral", "negative"] SYSTEM = ( "You are a precise insurance-review classifier for GIG Gulf. " "Return ONLY compact JSON, no prose, no markdown. " f"Schema: {{\"theme\": one of {THEMES}, \"sentiment\": one of {SENTIMENTS}, " "\"actionable\": true|false, \"reason\": \"<=12 words\"}}. " "actionable=true means the review names a concrete problem GIG could fix." ) def classify(review_text, rating, mock=False): if mock: # Deterministic stand-in so the whole pipeline can be proven without a model. s = "positive" if str(rating) in ("4", "5") else "negative" if str(rating) in ("1", "2") else "neutral" return {"theme": "representative_service", "sentiment": s, "actionable": s == "negative", "reason": "mock classification", "_mock": True} payload = { "model": MODEL, "messages": [ {"role": "system", "content": SYSTEM}, {"role": "user", "content": f"Rating: {rating}/5. Review: \"{review_text}\""}, ], "stream": False, "format": "json", "options": {"temperature": 0}, } req = urllib.request.Request( f"{OLLAMA}/api/chat", data=json.dumps(payload).encode(), headers={"Content-Type": "application/json"}, ) with urllib.request.urlopen(req, timeout=120) as r: content = json.load(r)["message"]["content"] obj = json.loads(content) # Guard the model to the taxonomy — fail loud into 'other'/'neutral' rather than silently drift. if obj.get("theme") not in THEMES: obj["theme"] = "other" if obj.get("sentiment") not in SENTIMENTS: obj["sentiment"] = "neutral" obj["actionable"] = bool(obj.get("actionable")) return obj def main(): ap = argparse.ArgumentParser() ap.add_argument("--limit", type=int, default=0, help="only first N reviews") ap.add_argument("--mock", action="store_true", help="no model; deterministic stub") ap.add_argument("--sample", type=int, default=30, help="rows to dump for human spot-check") args = ap.parse_args() if not SRC.exists(): sys.exit(f"✗ source not found: {SRC}") OUT.mkdir(exist_ok=True) rows = list(csv.DictReader(SRC.open(encoding="utf-8"))) if args.limit: rows = rows[: args.limit] labelled, t0, errors = [], time.time(), 0 for i, row in enumerate(rows, 1): text = (row.get("review") or "").strip() rating = row.get("review_rating", "") if not text: res = {"theme": "other", "sentiment": "neutral", "actionable": False, "reason": "empty review"} else: try: res = classify(text, rating, mock=args.mock) except Exception as e: errors += 1 res = {"theme": "other", "sentiment": "neutral", "actionable": False, "reason": f"ERROR: {e}"} labelled.append({**row, **{f"ai_{k}": v for k, v in res.items()}}) if i % 50 == 0: print(f" …{i}/{len(rows)} ({i/(time.time()-t0):.1f}/s)", flush=True) elapsed = time.time() - t0 # write labelled CSV fields = list(rows[0].keys()) + ["ai_theme", "ai_sentiment", "ai_actionable", "ai_reason"] with (OUT / "reviews-labelled.csv").open("w", newline="", encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=fields, extrasaction="ignore") w.writeheader() w.writerows(labelled) # rollups theme_c = Counter(r["ai_theme"] for r in labelled) sent_c = Counter(r["ai_sentiment"] for r in labelled) actionable = [r for r in labelled if r["ai_actionable"] in (True, "True", "true")] summary = { "reviews": len(labelled), "errors": errors, "elapsed_sec": round(elapsed, 1), "throughput_per_sec": round(len(labelled) / elapsed, 2) if elapsed else None, "model": "MOCK" if args.mock else MODEL, "by_theme": dict(theme_c.most_common()), "by_sentiment": dict(sent_c), "actionable_count": len(actionable), "cost_local_usd": 0.0, "cost_if_paid_usd_est": estimate_paid_cost(labelled), } (OUT / "summary.json").write_text(json.dumps(summary, indent=2)) # human spot-check sample (the measurement that matters) sample = labelled[: args.sample] with (OUT / "spot-check-sample.md").open("w", encoding="utf-8") as f: f.write("# Spot-check — does the local 8B agree with a human?\n\n") f.write("Mark each ✓/✗. Accuracy = ✓ / total. Target ≥ 85% to trust the local model for this job.\n\n") for r in sample: f.write(f"- [{r['review_rating']}★] {(r['review'] or '')[:160]}\n") f.write(f" → theme=`{r['ai_theme']}` sentiment=`{r['ai_sentiment']}` " f"actionable=`{r['ai_actionable']}` — {r['ai_reason']} ☐✓ ☐✗\n\n") print("\n✓ done") print(json.dumps(summary, indent=2)) print(f"\n labelled : {OUT/'reviews-labelled.csv'}") print(f" summary : {OUT/'summary.json'}") print(f" spotcheck: {OUT/'spot-check-sample.md'} ← score this by hand") def estimate_paid_cost(labelled): """Rough $ if this had run on a paid frontier API instead of $0 local. ~250 input + ~40 output tokens/review; priced at a Sonnet-class blended ~$4.50/Mtok.""" in_tok = len(labelled) * 250 out_tok = len(labelled) * 40 return round((in_tok * 3 + out_tok * 15) / 1_000_000, 4) if __name__ == "__main__": main()