Sources

class sim_panel.sources.base.BaseSource(config)[source]

Bases: ABC

Abstract base class for external data sources.

A source is responsible for: 1. validating its configuration, 2. loading raw source artifacts, 3. transforming them into canonical internal records, 4. exporting materialized bundles when applicable, and 5. optionally supporting a streaming import/export path.

Parameters:

config (SourceConfig)

name: str
abstractmethod validate_config()[source]

Validate the source configuration and raise a helpful error if invalid.

Return type:

None

abstractmethod load_raw()[source]

Load raw source artifacts from disk or other local inputs.

Sources that only support streaming may raise a RuntimeError here.

Return type:

SourceRawBundle

abstractmethod transform(raw)[source]

Convert raw source artifacts into canonical internal records.

Sources that only support streaming may raise a RuntimeError here.

Return type:

SourceExportBundle

Parameters:

raw (SourceRawBundle)

abstractmethod export(bundle, output_dir=None)[source]

Write a materialized source export bundle to disk.

Return type:

None

Parameters:
run()[source]

End-to-end in-memory source import: validate, load, transform.

Return type:

SourceExportBundle

export_streaming(output_dir=None)[source]

Streaming source import/export path.

Implementations may write artifacts incrementally to disk and return a lightweight bundle carrying metadata and stats only.

Return type:

SourceExportBundle

Parameters:

output_dir (Path | None)

class sim_panel.sources.types.SourceConfig(name, output_dir=None, seed=0, params=<factory>)[source]

Bases: object

Generic source-layer configuration.

Source-specific config classes may subclass this and add extra fields.

Parameters:
  • name (str)

  • output_dir (Path | None)

  • seed (int)

  • params (Dict[str, Any])

name: str
output_dir: Path | None
seed: int
params: Dict[str, Any]
class sim_panel.sources.types.SourceStats(n_raw_reviews=0, n_raw_products=0, n_events=0, n_products=0, n_personas=0, n_reviews_missing_product_metadata=0, extra=<factory>)[source]

Bases: object

Lightweight summary statistics for a source import run.

Parameters:
  • n_raw_reviews (int)

  • n_raw_products (int)

  • n_events (int)

  • n_products (int)

  • n_personas (int)

  • n_reviews_missing_product_metadata (int)

  • extra (Dict[str, Any])

n_raw_reviews: int
n_raw_products: int
n_events: int
n_products: int
n_personas: int
n_reviews_missing_product_metadata: int
extra: Dict[str, Any]
as_dict()[source]
Return type:

Dict[str, Any]

class sim_panel.sources.types.SourceRawBundle(reviews=<factory>, products=<factory>, aux=<factory>)[source]

Bases: object

Raw source artifacts loaded from external files, before canonical projection.

Parameters:
  • reviews (Sequence[Mapping[str, Any]])

  • products (Sequence[Mapping[str, Any]])

  • aux (Dict[str, Any])

reviews: Sequence[Mapping[str, Any]]
products: Sequence[Mapping[str, Any]]
aux: Dict[str, Any]
class sim_panel.sources.types.SourceExportBundle(events=<factory>, products=<factory>, personas=<factory>, metadata=<factory>, data_dictionary=<factory>, stats=<factory>)[source]

Bases: object

Canonical export payload produced by a source importer.

Events remain schema-valid row dicts. Products and personas are typed on-disk records.

Parameters:
events: List[Dict[str, Any]]
products: List[ProductRecord]
personas: List[PersonaRecord]
metadata: Dict[str, Any]
data_dictionary: Dict[str, Any]
stats: SourceStats
is_empty()[source]
Return type:

bool

as_dict()[source]
Return type:

Dict[str, Any]

class sim_panel.sources.registry.SourceRegistry(_registry=<factory>)[source]

Bases: object

Name-to-source registry.

Parameters:

_registry (Dict[str, Type[BaseSource]])

register(name, cls)[source]
Return type:

None

Parameters:
get(name)[source]
Return type:

Type[BaseSource]

Parameters:

name (str)

create(config)[source]
Return type:

BaseSource

Parameters:

config (SourceConfig)

names()[source]
Return type:

list[str]

sim_panel.sources.registry.get_registry()[source]
Return type:

SourceRegistry

sim_panel.sources.registry.register_source(name, cls)[source]
Return type:

None

Parameters:
sim_panel.sources.registry.build_source(config)[source]
Return type:

BaseSource

Parameters:

config (SourceConfig)

sim_panel.sources.registry.list_sources()[source]
Return type:

list[str]

sim_panel.sources.build.build_source_from_yaml_dict(d)[source]

Build a source instance from a parsed YAML dictionary.

Expected shape:
source:

name: amazon_reviews_2023 …

Parameters:

d (Dict[str, Any])

class sim_panel.sources.amazon_reviews_2023.config.AmazonReviews2023Config(name, output_dir=None, seed=0, params=<factory>, reviews_path=PosixPath('.'), metadata_path=PosixPath('.'), category=None, import_mode='in_memory', require_metadata_match_for_events=False, trace_field_map=<factory>, time_index_mode='panelist_sequence', product_description_fallback_to_features=True, include_raw_product_meta=True, include_raw_review_meta=True, min_reviews_per_persona=1, max_reviews=None, max_metadata_rows=None)[source]

Bases: SourceConfig

Configuration for the Amazon Reviews’23 source importer.

Design choices for v0

  • products.jsonl is built from the item metadata file

  • product_id is the parent/family identifier: parent_asin

  • events.jsonl may still retain child asin as source provenance

  • all rows in the provided metadata file are exported as products

  • textual review content is mapped into the single event-level traces dict

  • t is derived from timestamps, defaulting to within-panelist sequence order

Time index modes

  • panelist_sequence: default and recommended; assigns t = 0, 1, … within each panelist after chronological sorting of that panelist’s reviews

  • raw_timestamp: uses the source timestamp directly

  • global_sequence: assigns a corpus-wide chronological sequence; supported in in-memory mode but not yet in streaming mode

reviews_path: Path
metadata_path: Path
category: str | None
import_mode: Literal['in_memory', 'streaming']
require_metadata_match_for_events: bool
trace_field_map: Dict[str, str]
time_index_mode: Literal['panelist_sequence', 'global_sequence', 'raw_timestamp']
product_description_fallback_to_features: bool
include_raw_product_meta: bool
include_raw_review_meta: bool
min_reviews_per_persona: int
max_reviews: int | None
max_metadata_rows: int | None
property product_id_field: str
classmethod from_dict(data)[source]
Return type:

AmazonReviews2023Config

Parameters:

data (Dict[str, Any])

Parameters:
  • name (str)

  • output_dir (Path | None)

  • seed (int)

  • params (Dict[str, Any])

  • reviews_path (Path)

  • metadata_path (Path)

  • category (str | None)

  • import_mode (Literal['in_memory', 'streaming'])

  • require_metadata_match_for_events (bool)

  • trace_field_map (Dict[str, str])

  • time_index_mode (Literal['panelist_sequence', 'global_sequence', 'raw_timestamp'])

  • product_description_fallback_to_features (bool)

  • include_raw_product_meta (bool)

  • include_raw_review_meta (bool)

  • min_reviews_per_persona (int)

  • max_reviews (int | None)

  • max_metadata_rows (int | None)

class sim_panel.sources.amazon_reviews_2023.source.AmazonReviews2023Source(config)[source]

Bases: BaseSource

Source importer for Amazon Reviews’23.

Parameters:

config (AmazonReviews2023Config)

name: str = 'amazon_reviews_2023'
config: AmazonReviews2023Config
validate_config()[source]

Validate the source configuration and raise a helpful error if invalid.

Return type:

None

load_raw()[source]

Load raw source artifacts from disk or other local inputs.

Sources that only support streaming may raise a RuntimeError here.

Return type:

SourceRawBundle

transform(raw)[source]

Convert raw source artifacts into canonical internal records.

Sources that only support streaming may raise a RuntimeError here.

Return type:

SourceExportBundle

Parameters:

raw (SourceRawBundle)

export(bundle, output_dir=None)[source]

Write a materialized source export bundle to disk.

Return type:

None

Parameters:
export_streaming(output_dir=None)[source]

Streaming source import/export path.

Implementations may write artifacts incrementally to disk and return a lightweight bundle carrying metadata and stats only.

Return type:

SourceExportBundle

Parameters:

output_dir (Path | None)

sim_panel.sources.amazon_reviews_2023.transform.transform_amazon_reviews_2023(*, raw, config)[source]

Transform raw Amazon Reviews’23 rows into canonical internal records.

Return type:

SourceExportBundle

Parameters:

v0 contract

  • products are built from the metadata file and keyed by parent_asin

  • product display_name defaults to metadata title

  • product display_text defaults to metadata description, with optional fallback to features

  • products are emitted as ProductRecord objects

  • personas are derived from user review histories and emitted as PersonaRecord objects

  • events are built from review rows and linked to parent_asin

  • event product_display defaults to product display_text or display_name

  • event product_features mirror ProductRecord.attributes

  • event panelist_features mirror PersonaRecord.attributes

  • t is derived from source timestamps according to config.time_index_mode

  • child asin is retained under traces[‘source’]

  • review text fields are mapped into the single event-level traces dict according to config.trace_field_map

sim_panel.sources.amazon_reviews_2023.streaming.export_amazon_reviews_2023_streaming(*, config, output_dir)[source]

Streaming importer for Amazon Reviews’23.

Return type:

SourceExportBundle

Parameters:

Design notes

  • products are streamed from metadata and written incrementally

  • reviews are first sharded to disk by user_id

  • each shard is processed independently to derive personas and events

  • events are written incrementally; no full in-memory event list is built