Source code for sim_panel.schema.validate

from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Tuple
from textwrap import shorten

from pydantic import ValidationError

from sim_panel.schema.registry import get_schema


[docs] @dataclass(frozen=True) class RowError: index: int message: str
[docs] @dataclass(frozen=True) class ValidationReport: schema_version: str n_rows: int n_valid: int n_invalid: int errors: List[RowError] warnings: List[str] @property def ok(self) -> bool: return self.n_invalid == 0
[docs] def summary(self, *, max_errors: int = 10, max_message_chars: int = 300) -> str: """ Human-readable summary for logs/CLI/errors. """ head = ( f"ValidationReport(schema_version={self.schema_version!r}, " f"n_rows={self.n_rows}, n_valid={self.n_valid}, n_invalid={self.n_invalid})" ) parts: List[str] = [head] if self.errors: n_show = min(max_errors, len(self.errors)) lines: List[str] = [] for e in self.errors[:n_show]: msg = shorten(e.message.replace("\n", " "), width=max_message_chars, placeholder="…") lines.append(f"row {e.index}: {msg}") more = f" (+{len(self.errors) - n_show} more)" if len(self.errors) > n_show else "" parts.append("Errors:\n- " + "\n- ".join(lines) + more) if self.warnings: parts.append("Warnings:\n- " + "\n- ".join(self.warnings)) return "\n".join(parts)
[docs] def validate_rows( rows: Iterable[Dict[str, Any]], schema_version: Optional[str] = None, *, max_errors: int = 50, ) -> ValidationReport: """ Validate rows against a specific schema version. If schema_version is None, attempt to read it from each row's "schema_version". In that mode, rows with missing/unknown schema_version are marked invalid. """ errors: List[RowError] = [] warnings: List[str] = [] n_rows = 0 n_valid = 0 for i, row in enumerate(rows): n_rows += 1 row_version = schema_version or row.get("schema_version") if not isinstance(row_version, str) or not row_version: if len(errors) < max_errors: errors.append(RowError(index=i, message="Missing/invalid 'schema_version' in row.")) continue try: spec = get_schema(row_version) except ValueError as e: if len(errors) < max_errors: errors.append(RowError(index=i, message=str(e))) continue try: spec.model.model_validate(row) n_valid += 1 except ValidationError as e: if len(errors) < max_errors: errors.append(RowError(index=i, message=str(e))) # If caller pinned schema_version, warn if any rows disagree (when row contains a value) if schema_version is not None: # This is best-effort because rows is an iterable that may be consumed already. # We can't reliably re-scan without materializing, so we only warn generally here. warnings.append( "validate_rows(schema_version=...) does not check per-row schema_version mismatches unless the caller " "pre-checks or provides rows as a re-iterable." ) return ValidationReport( schema_version=schema_version or "per-row", n_rows=n_rows, n_valid=n_valid, n_invalid=n_rows - n_valid, errors=errors, warnings=warnings, )
[docs] def validate_unique_event_id(rows: Iterable[Dict[str, Any]]) -> Tuple[bool, Optional[str]]: seen = set() for i, row in enumerate(rows): eid = row.get("event_id") if eid in seen: return False, f"Duplicate event_id at row {i}: {eid!r}" seen.add(eid) return True, None