from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, List, Optional, Sequence, Tuple
import numpy as np
from sim_panel.policies.base import Policy
from sim_panel.policies.types import ExposureDecision
def _normalize_probs(p: Dict[str, float]) -> Dict[str, float]:
s = 0.0
out: Dict[str, float] = {}
for k, v in p.items():
if v < 0:
raise ValueError(f"Negative probability for product_id={k}: {v}")
if v == 0:
continue
out[k] = float(v)
s += float(v)
if s <= 0:
raise ValueError("All probabilities are zero (or empty).")
for k in list(out.keys()):
out[k] = out[k] / s
return out
def _sample_without_replacement(
rng: np.random.Generator,
items: Sequence[str],
k: int,
) -> List[str]:
if k <= 0:
return []
if k >= len(items):
return list(items)
idx = rng.choice(len(items), size=k, replace=False)
return [items[i] for i in idx.tolist()]
@dataclass
class _BalancedScheduler:
"""
Internal helper to assign products to panelists in a balanced-quota way per period t.
For each t, build a product pool with near-equal counts across products, shuffle it,
then pop assignments for each panelist.
"""
rng: np.random.Generator
product_ids: List[str]
evals_per_period: int
# keyed by t: list of remaining product assignments (acting as a stack)
pools: Dict[int, List[str]] = None # type: ignore[assignment]
def __post_init__(self) -> None:
self.pools = {}
def assignments_for(self, *, t: int, n_panelists: int) -> List[List[str]]:
# Each panelist needs evals_per_period products, so total draws:
total = n_panelists * self.evals_per_period
pool = self._make_pool(total=total)
self.rng.shuffle(pool)
# Partition sequentially into per-panelist allocations
out: List[List[str]] = []
i = 0
for _ in range(n_panelists):
out.append(pool[i : i + self.evals_per_period])
i += self.evals_per_period
return out
def _make_pool(self, *, total: int) -> List[str]:
m = len(self.product_ids)
if m == 0:
return []
# Repeat full cycles + remainder to hit 'total'
q, r = divmod(total, m)
pool: List[str] = []
if q > 0:
pool.extend(self.product_ids * q)
if r > 0:
# sample r distinct products for the remainder to keep balance tight
pool.extend(_sample_without_replacement(self.rng, self.product_ids, r))
return pool
[docs]
class RandomAssignmentPolicy(Policy):
"""
Randomized exposure, with RCT-like balanced allocation by default.
Modes:
- balanced_quota: equal/near-equal panelist counts per product (per period)
- iid_probs: per-panelist independent draws using a product probability distribution
"""
def __init__(self, cfg) -> None:
super().__init__(cfg)
self._balanced_scheduler: Optional[_BalancedScheduler] = None
self._balanced_cache_key: Optional[Tuple[int, int, int]] = None # (seed_marker, n_products, evals_per_period)
[docs]
def prepare_for_period(
self,
*,
rng: np.random.Generator,
product_ids: Sequence[str],
) -> None:
"""
Optional hook: generator may call this once per run (or per period) to prime schedulers.
(Kept optional so Policy API stays minimal.)
"""
if self.cfg.random_mode != "balanced_quota":
return
# Cache scheduler keyed by product set size + evals_per_period.
# rng identity is external; scheduler uses rng passed at construction time.
key = (id(rng), len(product_ids), self.cfg.evals_per_period)
if self._balanced_scheduler is None or self._balanced_cache_key != key:
self._balanced_scheduler = _BalancedScheduler(
rng=rng,
product_ids=list(product_ids),
evals_per_period=self.cfg.evals_per_period,
)
self._balanced_cache_key = key
[docs]
def decide_batch(
self,
*,
rng: np.random.Generator,
panelist_ids: Sequence[str],
t: int,
product_ids: Sequence[str],
) -> List[ExposureDecision]:
"""
Batch decision method for balanced_quota (preferred) to guarantee balance.
Generator can call this for each t to get all decisions in one shot.
"""
if self.cfg.random_mode != "balanced_quota":
# Fallback to per-panelist decide
return [
self.decide(rng=rng, panelist_id=pid, t=t, product_ids=product_ids)
for pid in panelist_ids
]
if self._balanced_scheduler is None:
self.prepare_for_period(rng=rng, product_ids=product_ids)
assert self._balanced_scheduler is not None
allocs = self._balanced_scheduler.assignments_for(t=t, n_panelists=len(panelist_ids))
out: List[ExposureDecision] = []
for pid, chosen in zip(panelist_ids, allocs):
out.append(
ExposureDecision(
panelist_id=pid,
t=t,
policy=self.cfg.name,
evaluate_product_ids=list(chosen),
selection=None,
meta={"random_mode": "balanced_quota"},
)
)
return out
[docs]
def decide(
self,
*,
rng: np.random.Generator,
panelist_id: str,
t: int,
product_ids: Sequence[str],
) -> ExposureDecision:
if self.cfg.random_mode == "balanced_quota":
# Note: balanced_quota is best used via decide_batch; this is a reasonable fallback.
chosen = _sample_without_replacement(rng, product_ids, self.cfg.evals_per_period)
return ExposureDecision(
panelist_id=panelist_id,
t=t,
policy=self.cfg.name,
evaluate_product_ids=chosen,
selection=None,
meta={"random_mode": "balanced_quota", "note": "fallback_single_decide"},
)
if self.cfg.random_mode == "iid_probs":
probs = self.cfg.product_probs
if probs is None:
# Default to uniform iid
chosen = list(rng.choice(list(product_ids), size=self.cfg.evals_per_period, replace=False))
return ExposureDecision(
panelist_id=panelist_id,
t=t,
policy=self.cfg.name,
evaluate_product_ids=chosen,
selection=None,
meta={"random_mode": "iid_probs", "probs": "uniform"},
)
pnorm = _normalize_probs(probs)
# Align probabilities to product_ids; missing products get prob 0.
ids = list(product_ids)
pvec = np.array([pnorm.get(pid, 0.0) for pid in ids], dtype=float)
if pvec.sum() <= 0:
raise ValueError("product_probs assigns zero probability mass to all available products.")
pvec = pvec / pvec.sum()
chosen_idx = rng.choice(len(ids), size=min(self.cfg.evals_per_period, len(ids)), replace=False, p=pvec)
chosen = [ids[i] for i in chosen_idx.tolist()]
return ExposureDecision(
panelist_id=panelist_id,
t=t,
policy=self.cfg.name,
evaluate_product_ids=chosen,
selection=None,
meta={"random_mode": "iid_probs", "probs": "custom"},
)
raise ValueError(f"Unknown random_mode: {self.cfg.random_mode}")