Source code for sim_panel.benchmarks.subset

from __future__ import annotations

import json
import random
from collections import Counter
from dataclasses import asdict
from pathlib import Path
from typing import Any, Dict, Iterator, Mapping, Optional

from sim_panel.benchmarks.config import BenchmarkSubsetConfig
from sim_panel.utils.hashing import sha256_json
from sim_panel.utils.progress import tqdm_wrap


BENCHMARK_CONTRACT_VERSION = "0.1.0"
BENCHMARK_BUILDER_VERSION = "0.1.0"


[docs] def build_benchmark_subset(config: BenchmarkSubsetConfig) -> Dict[str, Any]: """ Build a frozen benchmark subset directory from imported real-data artifacts. Writes the following files into ``config.output_dir``: - events.jsonl - products.jsonl - metadata.json - stats.json Design ------ This is a streaming two-pass builder over events.jsonl: 1. pass 1 counts rating-bearing events per product 2. pass 2 writes events for the selected products This avoids loading the full events table into memory. """ import_dir = Path(config.import_dir) output_dir = Path(config.output_dir) output_dir.mkdir(parents=True, exist_ok=True) events_path = import_dir / "events.jsonl" products_path = import_dir / "products.jsonl" if not events_path.exists(): raise FileNotFoundError(f"Missing import events file: {events_path}") if config.require_product_record and not products_path.exists(): raise FileNotFoundError( f"Benchmark subset requires products.jsonl, but it was not found: {products_path}" ) known_product_ids = ( _load_product_ids(products_path) if products_path.exists() else None ) review_counts = _count_reviews_by_product( events_path=events_path, known_product_ids=known_product_ids if config.require_product_record else None, ) selected_product_ids = _select_product_ids( review_counts=review_counts, known_product_ids=known_product_ids, config=config, ) selected_product_id_set = set(selected_product_ids) exported_products = _write_selected_products( products_path=products_path, output_path=output_dir / "products.jsonl", selected_product_ids=selected_product_id_set, ) export_stats = _write_selected_events( events_path=events_path, output_path=output_dir / "events.jsonl", selected_product_ids=selected_product_id_set, ) stats = { "builder_version": BENCHMARK_BUILDER_VERSION, "benchmark_contract_version": BENCHMARK_CONTRACT_VERSION, "n_selected_products": len(selected_product_ids), "n_selected_products_written": exported_products, "n_selected_events": export_stats["n_events"], "n_unique_panelists": export_stats["n_unique_panelists"], "rating_histogram": export_stats["rating_histogram"], "selected_product_review_counts": { product_id: review_counts[product_id] for product_id in selected_product_ids }, "min_reviews_per_product": config.min_reviews_per_product, "max_products": config.max_products, } metadata = { "artifact_type": "benchmark_subset", "benchmark_contract_version": BENCHMARK_CONTRACT_VERSION, "builder_version": BENCHMARK_BUILDER_VERSION, "source_import_dir": str(import_dir.resolve()), "source_events_path": str(events_path.resolve()), "source_products_path": str(products_path.resolve()) if products_path.exists() else None, "selection_unit": "product", "selection_basis": "rating_event_count", "seed": config.seed, "config": asdict(config), "config_hash_sha256": sha256_json(asdict(config)), "files": { "events": "events.jsonl", "products": "products.jsonl", "metadata": "metadata.json", "stats": "stats.json", }, } _write_json(output_dir / "metadata.json", metadata) _write_json(output_dir / "stats.json", stats) return { "metadata": metadata, "stats": stats, "selected_product_ids": selected_product_ids, }
def _count_reviews_by_product( *, events_path: Path, known_product_ids: Optional[set[str]], ) -> Dict[str, int]: """ First pass over events.jsonl. Count rating-bearing events per product, optionally restricting to products known to exist in products.jsonl. """ counts: Counter[str] = Counter() for row in tqdm_wrap( _iter_jsonl(events_path), desc="Count reviews by product", enabled=True, ): product_id = _extract_product_id(row) rating = _extract_rating(row) if product_id is None or rating is None: continue if known_product_ids is not None and product_id not in known_product_ids: continue counts[product_id] += 1 return dict(counts) def _select_product_ids( *, review_counts: Mapping[str, int], known_product_ids: Optional[set[str]], config: BenchmarkSubsetConfig, ) -> list[str]: """ Select eligible product IDs reproducibly. Eligibility: - has at least min_reviews_per_product rating-bearing events - if require_product_record=True, must also exist in products.jsonl """ eligible = [ product_id for product_id, count in review_counts.items() if count >= config.min_reviews_per_product and ( not config.require_product_record or known_product_ids is None or product_id in known_product_ids ) ] eligible.sort() rng = random.Random(config.seed) rng.shuffle(eligible) if config.max_products is None: return eligible return eligible[: config.max_products] def _write_selected_events( *, events_path: Path, output_path: Path, selected_product_ids: set[str], ) -> Dict[str, Any]: """ Second pass over events.jsonl. Stream matching rows directly to events.jsonl and accumulate a few lightweight benchmark stats. """ n_events = 0 panelist_ids: set[str] = set() rating_histogram: Counter[str] = Counter() with output_path.open("w", encoding="utf-8") as fp: for row in tqdm_wrap( _iter_jsonl(events_path), desc="Write subset events", enabled=True, ): product_id = _extract_product_id(row) rating = _extract_rating(row) if product_id is None or rating is None: continue if product_id not in selected_product_ids: continue fp.write(json.dumps(row, ensure_ascii=False) + "\n") n_events += 1 panelist_id = row.get("panelist_id") or row.get("user_id") if panelist_id is not None: panelist_ids.add(str(panelist_id)) rating_histogram[_rating_bucket(rating)] += 1 return { "n_events": n_events, "n_unique_panelists": len(panelist_ids), "rating_histogram": dict( sorted(rating_histogram.items(), key=lambda kv: float(kv[0])) ), } def _write_selected_products( *, products_path: Path, output_path: Path, selected_product_ids: set[str], ) -> int: """ Stream products.jsonl and keep only products referenced by the selected subset. """ if not products_path.exists(): output_path.write_text("", encoding="utf-8") return 0 n_products = 0 with output_path.open("w", encoding="utf-8") as fp: for row in tqdm_wrap( _iter_jsonl(products_path), desc="Write subset products", enabled=True, ): product_id = _extract_product_id(row) if product_id is None or product_id not in selected_product_ids: continue fp.write(json.dumps(row, ensure_ascii=False) + "\n") n_products += 1 return n_products def _load_product_ids(products_path: Path) -> set[str]: """ Load only product IDs from products.jsonl. This is much cheaper than loading all products as full records, and is enough for benchmark subset eligibility filtering. """ product_ids: set[str] = set() for row in tqdm_wrap( _iter_jsonl(products_path), desc="Load product ids", enabled=True, ): product_id = _extract_product_id(row) if product_id is not None: product_ids.add(product_id) return product_ids def _extract_product_id(row: Mapping[str, Any]) -> Optional[str]: """ Extract product ID from either imported real-data rows or imported product rows. For Amazon-derived imports this should usually be product_id, with parent_asin kept as a fallback for robustness. """ product_id = row.get("product_id") or row.get("parent_asin") if product_id is None: return None return str(product_id) def _extract_rating(row: Mapping[str, Any]) -> Optional[float]: """ Extract rating from either: - top-level rating - outcomes.rating """ rating: Any outcomes = row.get("outcomes") if isinstance(outcomes, Mapping) and "rating" in outcomes: rating = outcomes.get("rating") else: rating = row.get("rating") if rating is None: return None try: return float(rating) except (TypeError, ValueError): return None def _iter_jsonl(path: Path) -> Iterator[Dict[str, Any]]: with path.open("r", encoding="utf-8") as fp: for line in fp: line = line.strip() if not line: continue yield json.loads(line) def _rating_bucket(value: float) -> str: if value.is_integer(): return str(int(value)) return f"{value:g}" def _write_json(path: Path, payload: Mapping[str, Any]) -> None: with path.open("w", encoding="utf-8") as fp: json.dump(payload, fp, ensure_ascii=False, indent=2, sort_keys=True) fp.write("\n")