Skip to main content

powerio_matrix/io/
gridfm.rs

1//! GridFM interchange: a parsed case as the gridfm-datakit Parquet schema.
2//!
3//! [`gridfm-datakit`](https://github.com/gridfm) writes per-scenario Parquet
4//! tables that [`gridfm-graphkit`](https://github.com/gridfm)'s
5//! `HeteroGridDatasetDisk` trains a GNN on. This module emits the same four
6//! tables — `bus_data`, `gen_data`, `branch_data`, `y_bus_data` — from one
7//! parsed [`Network`], so graphkit can train on powerio output directly and the
8//! scenario-batch path (issue #14) has its on-disk format.
9//!
10//! The reverse — [`read_gridfm_dataset`] / [`read_gridfm_scenarios`] /
11//! [`gridfm_base_case`], with the pure [`read_gridfm_network`] over in-memory
12//! batches — rebuilds a [`Network`] from such a dataset (lossy but
13//! power flow complete; see [`GridfmRead`]), the ML→classical return leg
14//! (issue #60). One reader plus the existing writers means gridfm → any classical
15//! format. `y_bus_data` is ignored on read; branches carry raw `r/x/b`.
16//!
17//! # Snapshots and scenarios
18//!
19//! powerio has no power flow solver. One parsed case is one snapshot
20//! (`scenario = 0`): voltages and generator dispatch are the case's stored
21//! values, and branch flows `pf/qf/pt/qt` are computed from those voltages and
22//! the branch admittances (`branch_flows`). For a solved MATPOWER case the
23//! stored voltages are the converged operating point, so the flows match what a
24//! solver would report to float tolerance; for an unsolved/flat start case they
25//! are the flows at the stored voltages, not a re-solved dispatch.
26//!
27//! A scenario batch ([`write_gridfm_batch`] / [`gridfm_record_batches_batch`])
28//! row-stacks many snapshots into the four tables, keyed by the `scenario`
29//! column. The snapshots share a base element set — the same bus/branch/gen
30//! counts and bus-id ordering, so the dense bus index means the same bus across
31//! scenarios — enforced by the shape check ([`Error::ScenarioShapeMismatch`]).
32//! Within that, load, dispatch, voltages, branch status, bus type, and costs may
33//! all differ per snapshot. This matches datakit, whose topology variants (N-K,
34//! random component drop) toggle `BR_STATUS`/`GEN_STATUS` on a fixed element set,
35//! and graphkit's `HeteroGridDatasetDisk`, which groups by `scenario` and
36//! rebuilds the graph independently for each one. powerio doesn't generate the
37//! perturbations; a caller (e.g. a scenario generator) supplies the snapshots.
38//!
39//! # Units
40//!
41//! - `Pd, Qd, Pg, Qg, p_mw, q_mvar` are MW/MVAr, passed through from the case
42//!   (loads and generator setpoints are already MW/MVAr). The branch flows
43//!   `pf, qf, pt, qt` are MW/MVAr too, computed in per-unit and scaled by
44//!   `base_mva`.
45//! - `Vm` per-unit, `Va` degrees; `r, x, b` and the `Y**` admittances per-unit.
46//! - `GS, BS` are the MATPOWER shunt values (MW/MVAr at V = 1) divided by
47//!   `base_mva`, matching datakit's normalization.
48//! - Costs are the raw MATPOWER coefficients: `cp2 = c2`, `cp1 = c1`,
49//!   `cp0 = c0`. A cost row gridfm can't represent (piecewise, missing,
50//!   malformed, or cubic and higher) emits zeros — graphkit ignores the cost
51//!   columns — and is counted in the manifest. The `_eur` suffixes are
52//!   datakit's column names, not a unit powerio converts to.
53//! - `bus`, `from_bus`, `to_bus` are dense `[0, n)` indices; `idx` is the
54//!   0-based generator/branch row. An out-of-service branch keeps its physical
55//!   `Y**` admittances but carries zero flows (its `br_status` is 0).
56
57// Bus/branch indices and Y_bus nnz counts cast to `i64` for the Arrow columns;
58// they are bounded far below `i64::MAX`, so the wrap clippy warns about can't
59// happen.
60#![allow(clippy::cast_possible_wrap)]
61
62use std::path::{Path, PathBuf};
63use std::sync::Arc;
64
65use arrow::array::{Array, ArrayRef, Float64Array, Int64Array};
66use arrow::datatypes::{Field, Schema};
67use arrow::record_batch::RecordBatch;
68use num_complex::Complex64;
69use parquet::arrow::ArrowWriter;
70use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
71use parquet::basic::Compression;
72use parquet::file::properties::WriterProperties;
73use serde::Serialize;
74
75use crate::indexed::IndexedNetwork;
76use crate::matrix::{BuildOptions, YbusFlags, branch_admittance, branch_flows, build_ybus};
77use crate::network::{Branch, Bus, BusId, BusType, Generator, Load, Shunt, SourceFormat};
78use crate::{
79    ElementCounts, Error, GenCost, GenCostPatch, MissingGenCostPolicy, Network, Result,
80    ScenarioMismatch,
81};
82
83/// Options for the gridfm export — the batch-wide knobs. The scenario id is a
84/// per-snapshot property (set via [`GridfmSnapshot::new`] / [`numbered_snapshots`],
85/// or the explicit argument to the single-case [`write_gridfm_dataset`] /
86/// [`gridfm_record_batches`]), not an option here.
87#[derive(Debug, Clone)]
88pub struct GridfmOptions {
89    /// Also write `y_bus_data.parquet`. graphkit reconstructs admittances from
90    /// the branch table and ignores it, but datakit emits it, so the default is
91    /// `true` for parity.
92    pub include_y_bus: bool,
93    /// Apply transformer tap ratios to the admittances. Default `true` (the
94    /// physical admittances datakit stores).
95    pub include_taps: bool,
96    /// Apply phase shifts to the admittances. Default `true`.
97    pub include_shifts: bool,
98    /// How missing generator cost rows are handled before export.
99    pub missing_gen_cost: MissingGenCostPolicy,
100    /// Explicit generator cost patches applied to each snapshot before the missing
101    /// cost policy.
102    pub gen_cost_patches: Vec<GenCostPatch>,
103}
104
105impl Default for GridfmOptions {
106    fn default() -> Self {
107        Self {
108            include_y_bus: true,
109            include_taps: true,
110            include_shifts: true,
111            missing_gen_cost: MissingGenCostPolicy::Preserve,
112            gen_cost_patches: Vec::new(),
113        }
114    }
115}
116
117impl GridfmOptions {
118    /// The Y_bus build flags these options select (only taps and shifts matter
119    /// for the gridfm admittances; the B'/B'' scheme and zero-impedance policy
120    /// don't apply).
121    fn build_options(&self) -> BuildOptions {
122        BuildOptions {
123            include_taps: self.include_taps,
124            include_shifts: self.include_shifts,
125            ..Default::default()
126        }
127    }
128
129    fn cost_options_default(&self) -> bool {
130        self.missing_gen_cost.is_preserve() && self.gen_cost_patches.is_empty()
131    }
132}
133
134/// One snapshot in a gridfm scenario batch: a parsed [`Network`] and the scenario
135/// id stamped into its rows.
136///
137/// powerio has no solver, so each snapshot is an operating point a caller (e.g. a
138/// scenario generator) has already produced. Snapshots in one batch share a base
139/// element set — the same bus/branch/gen counts and bus-id ordering — so the
140/// dense bus index means the same bus across snapshots and the tables stay
141/// schema-consistent. The builders enforce that and otherwise return
142/// [`Error::ScenarioShapeMismatch`]. Within that, load, dispatch, voltages,
143/// branch status, bus type, and costs may all vary per snapshot — this mirrors
144/// gridfm-datakit, whose topology variants (N-K, random component drop) toggle
145/// `BR_STATUS`/`GEN_STATUS` on a fixed element set, and gridfm-graphkit, which
146/// rebuilds the graph independently for every scenario.
147#[derive(Debug, Clone, Copy)]
148pub struct GridfmSnapshot<'a> {
149    /// The parsed case for this scenario.
150    net: &'a Network,
151    /// The scenario id stamped into the `scenario`/`load_scenario_idx` columns.
152    scenario: i64,
153}
154
155impl<'a> GridfmSnapshot<'a> {
156    /// A snapshot pairing a network with the scenario id stamped into its rows.
157    /// For the common "k-th input is `base + k`" numbering, prefer
158    /// [`numbered_snapshots`], which assigns ids with checked arithmetic.
159    #[must_use]
160    pub fn new(net: &'a Network, scenario: i64) -> Self {
161        Self { net, scenario }
162    }
163}
164
165/// The gridfm-datakit tables as Arrow record batches. The Parquet writer builds
166/// from these; a deferred gridfm-schema Arrow C Data Interface export (issue #38)
167/// would reuse them. (The raw network Arrow export that ships in powerio-capi is
168/// a different, lighter schema.)
169///
170/// For a scenario batch the tables are row-stacked: each table holds the rows of
171/// every snapshot back-to-back, keyed by the `scenario` column (0-based dense bus
172/// indices and generator/branch `idx` reset per scenario).
173#[derive(Debug, Clone)]
174#[non_exhaustive]
175pub struct GridfmTables {
176    pub bus: RecordBatch,
177    pub generator: RecordBatch,
178    pub branch: RecordBatch,
179    /// `None` when [`GridfmOptions::include_y_bus`] is off — the table isn't
180    /// built (graphkit reconstructs admittances from the branch table anyway).
181    pub y_bus: Option<RecordBatch>,
182}
183
184/// What [`write_gridfm_dataset`] wrote, plus the counts of columns it had to zero
185/// (see the manifest) so a caller can surface them.
186#[derive(Debug, Clone)]
187pub struct GridfmOutputs {
188    pub dir: PathBuf,
189    pub files: Vec<PathBuf>,
190    /// Branches with `r² + x² = 0`, whose admittance/flow columns were zeroed.
191    pub dropped_zero_impedance: usize,
192    /// Generators whose cost row gridfm couldn't represent, whose `cp*` columns
193    /// were zeroed.
194    pub degenerate_cost_gens: usize,
195    /// Generators with no cost after applying the export cost policy.
196    pub missing_cost_gens: usize,
197    /// Generators with a present cost row gridfm cannot represent.
198    pub unsupported_cost_gens: usize,
199    /// Generators whose missing cost was filled by the export policy.
200    pub synthesized_gen_costs: usize,
201    /// Generator costs replaced by explicit user patches.
202    pub patched_gen_costs: usize,
203}
204
205#[derive(Serialize)]
206struct GridfmMeta {
207    case_name: String,
208    base_mva: f64,
209    /// The first snapshot's scenario id (the base for a batch).
210    scenario: i64,
211    /// Number of stacked scenarios (1 for a single case).
212    n_scenarios: usize,
213    schema: &'static str,
214    /// Shared base element set (equal across all snapshots by the shape check).
215    n_buses: usize,
216    n_branches: usize,
217    /// In-service branch count of the **first** snapshot; branch status may
218    /// differ per scenario, so this describes scenario 0, not the whole batch.
219    n_branches_in_service: usize,
220    n_gens: usize,
221    /// Reference (slack) bus of the **first** snapshot. Each snapshot resolves
222    /// its own reference and carries it in the bus `REF` / gen `is_slack_gen`
223    /// columns; this records scenario 0's.
224    reference_bus: usize,
225    /// Branches with `r² + x² = 0` (admittance/flow columns zeroed), summed over
226    /// every snapshot in the batch.
227    dropped_zero_impedance: usize,
228    /// Generators whose cost row gridfm can't represent (piecewise, missing,
229    /// malformed, or cubic and higher; `cp*` columns zeroed), summed over every
230    /// snapshot in the batch.
231    degenerate_cost_gens: usize,
232    /// Missing cost rows after applying the export cost policy.
233    missing_cost_gens: usize,
234    /// Present cost rows that gridfm cannot represent.
235    unsupported_cost_gens: usize,
236    /// Same as `degenerate_cost_gens`; named for the physical `cp*` columns.
237    zeroed_cost_gens: usize,
238    cost_policy: MissingGenCostPolicy,
239    synthesized_gen_costs: usize,
240    patched_gen_costs: usize,
241    files: Vec<String>,
242    powerio_version: String,
243}
244
245/// Build the four gridfm tables for one network, stamping `scenario` into the id
246/// columns. Pure (no I/O). A thin wrapper over [`gridfm_record_batches_batch`]
247/// for one snapshot.
248///
249/// # Errors
250/// [`Error::ReferenceBusCount`] unless the case has exactly one reference bus
251/// (graphkit needs a slack), [`Error::NormalizedGridfmSnapshot`] for a normalized
252/// input, [`Error::NonFiniteGridfmValue`] for a NaN/Inf physical quantity (or a
253/// NaN limit — `±Inf` limits are the valid unbounded sentinel) that would reach
254/// Parquet, [`Error::NonFiniteSusceptance`] if a finite branch impedance still
255/// yields a non-finite admittance, and [`Error::UnknownBus`] if a generator or
256/// branch references a bus the network doesn't define.
257pub fn gridfm_record_batches(
258    net: &Network,
259    scenario: i64,
260    opts: &GridfmOptions,
261) -> Result<GridfmTables> {
262    let snap = GridfmSnapshot::new(net, scenario);
263    gridfm_record_batches_batch(std::slice::from_ref(&snap), opts)
264}
265
266/// Build the four gridfm tables for a batch of scenarios, row-stacked and keyed
267/// by the `scenario` column. Pure (no I/O). Each snapshot carries its own
268/// scenario id; the `include_y_bus`/taps/shifts flags apply to every snapshot.
269///
270/// # Errors
271/// [`Error::EmptyScenarioBatch`] for an empty batch,
272/// [`Error::ScenarioShapeMismatch`] if the snapshots don't share one base element
273/// set (counts + bus-id order), plus everything [`gridfm_record_batches`] can
274/// return.
275pub fn gridfm_record_batches_batch(
276    snapshots: &[GridfmSnapshot],
277    opts: &GridfmOptions,
278) -> Result<GridfmTables> {
279    if opts.cost_options_default() {
280        let views = snapshot_views(snapshots)?;
281        return tables_from_views(&views, opts);
282    }
283
284    let (nets, _) = policy_adjusted_snapshots(snapshots, opts)?;
285    let adjusted: Vec<_> = nets
286        .iter()
287        .zip(snapshots)
288        .map(|(net, snap)| GridfmSnapshot::new(net, snap.scenario))
289        .collect();
290    let views = snapshot_views(&adjusted)?;
291    tables_from_views(&views, opts)
292}
293
294/// The four tables from already-built, shape-checked snapshot views. The Y_bus
295/// table is built only when [`GridfmOptions::include_y_bus`] is set — otherwise
296/// it's `None` and the per-snapshot `build_ybus` is skipped entirely.
297fn tables_from_views(views: &[SnapshotView], opts: &GridfmOptions) -> Result<GridfmTables> {
298    Ok(GridfmTables {
299        bus: bus_batch(views)?,
300        generator: gen_batch(views)?,
301        branch: branch_batch(views, opts)?,
302        y_bus: if opts.include_y_bus {
303            Some(y_bus_batch(views, opts)?)
304        } else {
305            None
306        },
307    })
308}
309
310/// A resolved snapshot: its indexed view, scenario id, and reference bus.
311struct SnapshotView<'a> {
312    view: IndexedNetwork<'a>,
313    scenario: i64,
314    ref_bus: usize,
315}
316
317/// Build and shape-check the views for a scenario batch. Every snapshot must
318/// resolve to exactly one reference bus and share the first snapshot's base
319/// element set (bus / branch / generator counts and bus-id ordering), so the
320/// row-stacked tables stay schema-consistent.
321fn snapshot_views<'a>(snapshots: &'a [GridfmSnapshot<'a>]) -> Result<Vec<SnapshotView<'a>>> {
322    let first = snapshots.first().ok_or(Error::EmptyScenarioBatch)?;
323    let expected = shape_of(first.net);
324    let expected_ids: Vec<BusId> = first.net.buses.iter().map(|b| b.id).collect();
325
326    let mut views = Vec::with_capacity(snapshots.len());
327    for (k, snap) in snapshots.iter().enumerate() {
328        let got = shape_of(snap.net);
329        if got != expected {
330            return Err(Error::ScenarioShapeMismatch {
331                index: k,
332                reason: ScenarioMismatch::Counts { expected, got },
333            });
334        }
335        let ids_match = snap
336            .net
337            .buses
338            .iter()
339            .map(|b| b.id)
340            .eq(expected_ids.iter().copied());
341        if !ids_match {
342            return Err(Error::ScenarioShapeMismatch {
343                index: k,
344                reason: ScenarioMismatch::BusOrder,
345            });
346        }
347        validate_snapshot_inputs(snap.net, snap.scenario)?;
348        let view = IndexedNetwork::new(snap.net);
349        let ref_bus = view.reference_bus_index()?;
350        views.push(SnapshotView {
351            view,
352            scenario: snap.scenario,
353            ref_bus,
354        });
355    }
356    Ok(views)
357}
358
359fn validate_snapshot_inputs(net: &Network, scenario: i64) -> Result<()> {
360    if net.is_normalized() {
361        return Err(Error::NormalizedGridfmSnapshot { scenario });
362    }
363    net.check_base_mva()?;
364
365    // Two tiers: physical quantities must be finite, while limit fields admit
366    // `±Inf` as the unbounded sentinel (the PowerModels reader defaults absent
367    // gen limits to `±Inf`, MATPOWER files carry literal `Inf` tokens, and
368    // Parquet stores `±Inf` natively) — only `NaN` is corrupt there.
369    for (row, b) in net.buses.iter().enumerate() {
370        finite(scenario, "bus", row, "vm", b.vm)?;
371        finite(scenario, "bus", row, "va", b.va)?;
372        finite(scenario, "bus", row, "base_kv", b.base_kv)?;
373        not_nan(scenario, "bus", row, "vmax", b.vmax)?;
374        not_nan(scenario, "bus", row, "vmin", b.vmin)?;
375    }
376    for (row, l) in net.loads.iter().enumerate() {
377        finite(scenario, "load", row, "p", l.p)?;
378        finite(scenario, "load", row, "q", l.q)?;
379    }
380    for (row, s) in net.shunts.iter().enumerate() {
381        finite(scenario, "shunt", row, "g", s.g)?;
382        finite(scenario, "shunt", row, "b", s.b)?;
383    }
384    for (row, br) in net.branches.iter().enumerate() {
385        finite(scenario, "branch", row, "r", br.r)?;
386        finite(scenario, "branch", row, "x", br.x)?;
387        finite(scenario, "branch", row, "b", br.legacy_total_charging_b())?;
388        finite(scenario, "branch", row, "tap", br.tap)?;
389        finite(scenario, "branch", row, "shift", br.shift)?;
390        not_nan(scenario, "branch", row, "angmin", br.angmin)?;
391        not_nan(scenario, "branch", row, "angmax", br.angmax)?;
392        not_nan(scenario, "branch", row, "rate_a", br.rate_a)?;
393    }
394    for (row, g) in net.generators.iter().enumerate() {
395        finite(scenario, "generator", row, "pg", g.pg)?;
396        finite(scenario, "generator", row, "qg", g.qg)?;
397        not_nan(scenario, "generator", row, "pmax", g.pmax)?;
398        not_nan(scenario, "generator", row, "pmin", g.pmin)?;
399        not_nan(scenario, "generator", row, "qmax", g.qmax)?;
400        not_nan(scenario, "generator", row, "qmin", g.qmin)?;
401        // Validate exactly what lands in the `cp0/cp1/cp2` columns, not the
402        // raw coefficient list (non-representable rows export as zeros).
403        let (cp0, cp1, cp2) = gridfm_cost(g.cost.as_ref());
404        finite(scenario, "gencost", row, "cp0", cp0)?;
405        finite(scenario, "gencost", row, "cp1", cp1)?;
406        finite(scenario, "gencost", row, "cp2", cp2)?;
407    }
408    Ok(())
409}
410
411fn finite(
412    scenario: i64,
413    element: &'static str,
414    row: usize,
415    field: &'static str,
416    value: f64,
417) -> Result<()> {
418    if value.is_finite() {
419        Ok(())
420    } else {
421        Err(Error::NonFiniteGridfmValue {
422            scenario,
423            element,
424            row,
425            field,
426            value,
427        })
428    }
429}
430
431/// Limit fields: `±Inf` is the valid unbounded sentinel, only `NaN` is corrupt.
432fn not_nan(
433    scenario: i64,
434    element: &'static str,
435    row: usize,
436    field: &'static str,
437    value: f64,
438) -> Result<()> {
439    if value.is_nan() {
440        Err(Error::NonFiniteGridfmValue {
441            scenario,
442            element,
443            row,
444            field,
445            value,
446        })
447    } else {
448        Ok(())
449    }
450}
451
452/// The base element shape a scenario batch shares.
453fn shape_of(net: &Network) -> ElementCounts {
454    ElementCounts {
455        buses: net.buses.len(),
456        branches: net.branches.len(),
457        gens: net.generators.len(),
458    }
459}
460
461#[derive(Debug, Clone, Copy, Default)]
462struct CostPolicyBatchReport {
463    synthesized: usize,
464    patched: usize,
465}
466
467fn policy_adjusted_snapshots(
468    snapshots: &[GridfmSnapshot],
469    opts: &GridfmOptions,
470) -> Result<(Vec<Network>, CostPolicyBatchReport)> {
471    let mut report = CostPolicyBatchReport::default();
472    let mut nets = Vec::with_capacity(snapshots.len());
473    for snap in snapshots {
474        let mut net = snap.net.clone();
475        let r = net.apply_gen_cost_policy(&opts.gen_cost_patches, opts.missing_gen_cost)?;
476        report.synthesized += r.synthesized;
477        report.patched += r.patched;
478        nets.push(net);
479    }
480    Ok((nets, report))
481}
482
483/// Number a list of networks into snapshots, stamping the k-th `base + k` — the
484/// one place the "k-th input is scenario `base + k`" rule lives, so the CLI and
485/// the Python binding can't drift. Checked: returns [`Error::ScenarioIdOverflow`]
486/// rather than wrapping or panicking if a scenario id exceeds `i64`.
487pub fn numbered_snapshots<'a>(nets: &[&'a Network], base: i64) -> Result<Vec<GridfmSnapshot<'a>>> {
488    nets.iter()
489        .enumerate()
490        .map(|(k, &net)| {
491            let scenario = i64::try_from(k)
492                .ok()
493                .and_then(|offset| base.checked_add(offset))
494                .ok_or(Error::ScenarioIdOverflow { base, index: k })?;
495            Ok(GridfmSnapshot::new(net, scenario))
496        })
497        .collect()
498}
499
500/// Write the gridfm-datakit Parquet dataset for one case under
501/// `out_dir/<network_name>/raw/`, matching datakit's directory layout. Stamps
502/// `scenario` into the id columns. Writes `bus_data.parquet`, `gen_data.parquet`,
503/// `branch_data.parquet`, optionally `y_bus_data.parquet`, and a
504/// `gridfm_meta.json` manifest.
505///
506/// Expects a raw snapshot (powers in MW, angles in degrees); pass the parsed
507/// `Network`, not a [`to_normalized`](powerio::Network::to_normalized) per-unit
508/// product, whose fields would be mislabeled.
509///
510/// # Errors
511/// Propagates [`gridfm_record_batches`] and any filesystem/Parquet error.
512pub fn write_gridfm_dataset(
513    net: &Network,
514    scenario: i64,
515    out_dir: impl AsRef<Path>,
516    opts: &GridfmOptions,
517) -> Result<GridfmOutputs> {
518    let snap = GridfmSnapshot::new(net, scenario);
519    write_gridfm_batch(std::slice::from_ref(&snap), out_dir, opts)
520}
521
522/// Write a batch of scenarios as one gridfm-datakit dataset under
523/// `out_dir/<network_name>/raw/`, row-stacking every snapshot's tables and keying
524/// them by the `scenario` column. The dataset name and the base element counts
525/// come from the first snapshot (shared across the batch by the shape check); the
526/// dropped/degenerate counts are summed over every snapshot, while `reference_bus`
527/// / `n_branches_in_service` record the first snapshot only (they can differ per
528/// scenario, so the manifest documents them as scenario 0's).
529///
530/// # Errors
531/// Propagates [`gridfm_record_batches_batch`] and any filesystem/Parquet error.
532pub fn write_gridfm_batch(
533    snapshots: &[GridfmSnapshot],
534    out_dir: impl AsRef<Path>,
535    opts: &GridfmOptions,
536) -> Result<GridfmOutputs> {
537    if opts.cost_options_default() {
538        return write_gridfm_batch_inner(
539            snapshots,
540            out_dir.as_ref(),
541            opts,
542            CostPolicyBatchReport::default(),
543        );
544    }
545
546    let (nets, cost_report) = policy_adjusted_snapshots(snapshots, opts)?;
547    let adjusted: Vec<_> = nets
548        .iter()
549        .zip(snapshots)
550        .map(|(net, snap)| GridfmSnapshot::new(net, snap.scenario))
551        .collect();
552    write_gridfm_batch_inner(&adjusted, out_dir.as_ref(), opts, cost_report)
553}
554
555fn write_gridfm_batch_inner(
556    snapshots: &[GridfmSnapshot],
557    out_dir: &Path,
558    opts: &GridfmOptions,
559    cost_report: CostPolicyBatchReport,
560) -> Result<GridfmOutputs> {
561    let views = snapshot_views(snapshots)?;
562    let tables = tables_from_views(&views, opts)?;
563
564    // The shape check guarantees every snapshot shares the base element set, so
565    // the name and structural counts come from the first.
566    let net = views[0].view.network();
567    let dir = out_dir.join(&net.name).join("raw");
568    std::fs::create_dir_all(&dir)?;
569
570    let mut files = Vec::new();
571    put_parquet(&dir, "bus_data.parquet", &tables.bus, &mut files)?;
572    put_parquet(&dir, "gen_data.parquet", &tables.generator, &mut files)?;
573    put_parquet(&dir, "branch_data.parquet", &tables.branch, &mut files)?;
574    if let Some(y_bus) = &tables.y_bus {
575        put_parquet(&dir, "y_bus_data.parquet", y_bus, &mut files)?;
576    }
577
578    // Branch status and costs may differ per scenario, so count the zeroed rows
579    // across every snapshot — the totals describe the whole stacked dataset.
580    let dropped_zero_impedance: usize = views
581        .iter()
582        .flat_map(|v| v.view.network().branches.iter())
583        .filter(|br| br.r * br.r + br.x * br.x == 0.0)
584        .count();
585    let missing_cost_gens: usize = views
586        .iter()
587        .flat_map(|v| v.view.network().generators.iter())
588        .filter(|g| g.cost.is_none())
589        .count();
590    let unsupported_cost_gens: usize = views
591        .iter()
592        .flat_map(|v| v.view.network().generators.iter())
593        .filter(|g| g.cost.is_some() && !cost_representable(g.cost.as_ref()))
594        .count();
595    let degenerate_cost_gens = missing_cost_gens + unsupported_cost_gens;
596
597    let meta = GridfmMeta {
598        case_name: net.name.clone(),
599        base_mva: net.base_mva,
600        scenario: views[0].scenario,
601        n_scenarios: views.len(),
602        schema: "gridfm-datakit",
603        n_buses: net.buses.len(),
604        n_branches: net.branches.len(),
605        n_branches_in_service: net.branches.iter().filter(|b| b.in_service).count(),
606        n_gens: net.generators.len(),
607        reference_bus: views[0].ref_bus,
608        dropped_zero_impedance,
609        degenerate_cost_gens,
610        missing_cost_gens,
611        unsupported_cost_gens,
612        zeroed_cost_gens: degenerate_cost_gens,
613        cost_policy: opts.missing_gen_cost,
614        synthesized_gen_costs: cost_report.synthesized,
615        patched_gen_costs: cost_report.patched,
616        files: files
617            .iter()
618            .filter_map(|p| p.file_name().and_then(|s| s.to_str()).map(str::to_string))
619            .collect(),
620        powerio_version: env!("CARGO_PKG_VERSION").to_string(),
621    };
622    let meta_path = dir.join("gridfm_meta.json");
623    let json = serde_json::to_string_pretty(&meta).map_err(|e| Error::Parquet(e.to_string()))?;
624    std::fs::write(&meta_path, json)?;
625    files.push(meta_path);
626
627    Ok(GridfmOutputs {
628        dir,
629        files,
630        dropped_zero_impedance,
631        degenerate_cost_gens,
632        missing_cost_gens,
633        unsupported_cost_gens,
634        synthesized_gen_costs: cost_report.synthesized,
635        patched_gen_costs: cost_report.patched,
636    })
637}
638
639// --- table builders --------------------------------------------------------
640
641fn bus_batch(snaps: &[SnapshotView]) -> Result<RecordBatch> {
642    let total: usize = snaps.iter().map(|s| s.view.n()).sum();
643    let mut scenario = Vec::with_capacity(total);
644    let mut bus_idx = Vec::with_capacity(total);
645    let (mut pd, mut qd) = (Vec::with_capacity(total), Vec::with_capacity(total));
646    let (mut pg_col, mut qg_col) = (Vec::with_capacity(total), Vec::with_capacity(total));
647    let (mut vm, mut va) = (Vec::with_capacity(total), Vec::with_capacity(total));
648    let (mut pq, mut pv, mut refc) = (
649        Vec::with_capacity(total),
650        Vec::with_capacity(total),
651        Vec::with_capacity(total),
652    );
653    let mut vn_kv = Vec::with_capacity(total);
654    let (mut min_vm, mut max_vm) = (Vec::with_capacity(total), Vec::with_capacity(total));
655    let (mut gs, mut bs) = (Vec::with_capacity(total), Vec::with_capacity(total));
656
657    for s in snaps {
658        let view = &s.view;
659        let n = view.n();
660        let base = view.base_mva();
661        let buses = &view.network().buses;
662
663        // Per-bus generation, summed over in-service generators (dense order).
664        let mut pg = vec![0.0; n];
665        let mut qg = vec![0.0; n];
666        for (_, g) in view.in_service_gens() {
667            if let Some(i) = view.bus_index(g.bus) {
668                pg[i] += g.pg;
669                qg[i] += g.qg;
670            }
671        }
672
673        scenario.resize(scenario.len() + n, s.scenario);
674        bus_idx.extend(0..n as i64);
675        pd.extend_from_slice(view.pd());
676        qd.extend_from_slice(view.qd());
677        pg_col.extend(pg);
678        qg_col.extend(qg);
679        vm.extend(buses.iter().map(|b| b.vm));
680        va.extend(buses.iter().map(|b| b.va));
681        pq.extend(buses.iter().map(|b| i64::from(b.kind == BusType::Pq)));
682        pv.extend(buses.iter().map(|b| i64::from(b.kind == BusType::Pv)));
683        refc.extend(buses.iter().map(|b| i64::from(b.kind == BusType::Ref)));
684        vn_kv.extend(buses.iter().map(|b| b.base_kv));
685        min_vm.extend(buses.iter().map(|b| b.vmin));
686        max_vm.extend(buses.iter().map(|b| b.vmax));
687        gs.extend(view.gs().iter().map(|g| g / base));
688        bs.extend(view.bs().iter().map(|b| b / base));
689    }
690
691    batch(with_scenario_pair(
692        scenario,
693        vec![
694            ("bus", i64s(bus_idx)),
695            ("Pd", f64s(pd)),
696            ("Qd", f64s(qd)),
697            ("Pg", f64s(pg_col)),
698            ("Qg", f64s(qg_col)),
699            ("Vm", f64s(vm)),
700            ("Va", f64s(va)),
701            ("PQ", i64s(pq)),
702            ("PV", i64s(pv)),
703            ("REF", i64s(refc)),
704            ("vn_kv", f64s(vn_kv)),
705            ("min_vm_pu", f64s(min_vm)),
706            ("max_vm_pu", f64s(max_vm)),
707            ("GS", f64s(gs)),
708            ("BS", f64s(bs)),
709        ],
710    ))
711}
712
713fn gen_batch(snaps: &[SnapshotView]) -> Result<RecordBatch> {
714    let total: usize = snaps.iter().map(|s| s.view.generators().len()).sum();
715    let mut scenario = Vec::with_capacity(total);
716    let mut idx = Vec::with_capacity(total);
717    let mut bus = Vec::with_capacity(total);
718    let (mut p_mw, mut q_mvar) = (Vec::with_capacity(total), Vec::with_capacity(total));
719    let (mut min_p, mut max_p) = (Vec::with_capacity(total), Vec::with_capacity(total));
720    let (mut min_q, mut max_q) = (Vec::with_capacity(total), Vec::with_capacity(total));
721    let (mut cp0, mut cp1, mut cp2) = (
722        Vec::with_capacity(total),
723        Vec::with_capacity(total),
724        Vec::with_capacity(total),
725    );
726    let mut in_service = Vec::with_capacity(total);
727    let mut is_slack = Vec::with_capacity(total);
728
729    for s in snaps {
730        let view = &s.view;
731        // One pass over the snapshot's generators: every column gets one push per
732        // generator, in dense source order.
733        for (row, g) in view.generators().iter().enumerate() {
734            let i = view.bus_index(g.bus).ok_or(Error::UnknownBus {
735                bus_id: g.bus,
736                element_index: row,
737            })?;
738            scenario.push(s.scenario);
739            idx.push(row as i64);
740            bus.push(i as i64);
741            is_slack.push(i64::from(i == s.ref_bus));
742            let (c0, c1, c2) = gridfm_cost(g.cost.as_ref());
743            cp0.push(c0);
744            cp1.push(c1);
745            cp2.push(c2);
746            p_mw.push(g.pg);
747            q_mvar.push(g.qg);
748            min_p.push(g.pmin);
749            max_p.push(g.pmax);
750            min_q.push(g.qmin);
751            max_q.push(g.qmax);
752            in_service.push(i64::from(g.in_service));
753        }
754    }
755
756    batch(with_scenario_pair(
757        scenario,
758        vec![
759            ("idx", i64s(idx)),
760            ("bus", i64s(bus)),
761            ("p_mw", f64s(p_mw)),
762            ("q_mvar", f64s(q_mvar)),
763            ("min_p_mw", f64s(min_p)),
764            ("max_p_mw", f64s(max_p)),
765            ("min_q_mvar", f64s(min_q)),
766            ("max_q_mvar", f64s(max_q)),
767            ("cp0_eur", f64s(cp0)),
768            ("cp1_eur_per_mw", f64s(cp1)),
769            ("cp2_eur_per_mw2", f64s(cp2)),
770            ("in_service", i64s(in_service)),
771            ("is_slack_gen", i64s(is_slack)),
772        ],
773    ))
774}
775
776#[allow(clippy::too_many_lines, clippy::many_single_char_names)]
777fn branch_batch(snaps: &[SnapshotView], opts: &GridfmOptions) -> Result<RecordBatch> {
778    let total: usize = snaps.iter().map(|s| s.view.branches().len()).sum();
779
780    // Same flags the Y_bus builder derives, so the branch admittance columns and
781    // y_bus_data come from one kernel. The taps/shifts flags are batch-wide (from
782    // `opts`); the admittances themselves are recomputed per snapshot.
783    let flags = YbusFlags {
784        unity_taps: !opts.include_taps,
785        zero_shifts: !opts.include_shifts,
786        ..Default::default()
787    };
788
789    let mut scenario = Vec::with_capacity(total);
790    let mut idx = Vec::with_capacity(total);
791    let (mut from_bus, mut to_bus) = (Vec::with_capacity(total), Vec::with_capacity(total));
792    let (mut pf, mut qf, mut pt, mut qt) = (
793        Vec::with_capacity(total),
794        Vec::with_capacity(total),
795        Vec::with_capacity(total),
796        Vec::with_capacity(total),
797    );
798    let (mut yff_r, mut yff_i) = (Vec::with_capacity(total), Vec::with_capacity(total));
799    let (mut yft_r, mut yft_i) = (Vec::with_capacity(total), Vec::with_capacity(total));
800    let (mut ytf_r, mut ytf_i) = (Vec::with_capacity(total), Vec::with_capacity(total));
801    let (mut ytt_r, mut ytt_i) = (Vec::with_capacity(total), Vec::with_capacity(total));
802    let (mut r_col, mut x_col, mut b_col) = (
803        Vec::with_capacity(total),
804        Vec::with_capacity(total),
805        Vec::with_capacity(total),
806    );
807    let (mut tap, mut shift) = (Vec::with_capacity(total), Vec::with_capacity(total));
808    let (mut ang_min, mut ang_max) = (Vec::with_capacity(total), Vec::with_capacity(total));
809    let mut rate_a = Vec::with_capacity(total);
810    let mut br_status = Vec::with_capacity(total);
811
812    for s in snaps {
813        let view = &s.view;
814        let base = view.base_mva();
815        let branches = view.branches();
816        let buses = &view.network().buses;
817        // Complex bus voltages `vm·e^{jθ}`, dense order, for the flow evaluation.
818        let v: Vec<Complex64> = buses
819            .iter()
820            .map(|b| Complex64::from_polar(b.vm, b.va.to_radians()))
821            .collect();
822
823        scenario.resize(scenario.len() + branches.len(), s.scenario);
824        idx.extend(0..branches.len() as i64);
825
826        for (row, br) in branches.iter().enumerate() {
827            let i = view.bus_index(br.from).ok_or(Error::UnknownBus {
828                bus_id: br.from,
829                element_index: row,
830            })?;
831            let j = view.bus_index(br.to).ok_or(Error::UnknownBus {
832                bus_id: br.to,
833                element_index: row,
834            })?;
835            from_bus.push(i as i64);
836            to_bus.push(j as i64);
837
838            // Zero-impedance branch → `None` → zeroed admittance/flow columns (never NaN).
839            let shift_rad = if flags.zero_shifts {
840                0.0
841            } else {
842                view.angle_radians(br.shift)
843            };
844            let block = branch_admittance(br, flags, shift_rad, row)?;
845            let [y_ff, y_ft, y_tf, y_tt] = block.unwrap_or([Complex64::new(0.0, 0.0); 4]);
846            yff_r.push(y_ff.re);
847            yff_i.push(y_ff.im);
848            yft_r.push(y_ft.re);
849            yft_i.push(y_ft.im);
850            ytf_r.push(y_tf.re);
851            ytf_i.push(y_tf.im);
852            ytt_r.push(y_tt.re);
853            ytt_i.push(y_tt.im);
854
855            let (sf, st) = if br.in_service && block.is_some() {
856                branch_flows(&[y_ff, y_ft, y_tf, y_tt], v[i], v[j])
857            } else {
858                (Complex64::new(0.0, 0.0), Complex64::new(0.0, 0.0))
859            };
860            pf.push(sf.re * base);
861            qf.push(sf.im * base);
862            pt.push(st.re * base);
863            qt.push(st.im * base);
864
865            r_col.push(br.r);
866            x_col.push(br.x);
867            b_col.push(br.legacy_total_charging_b());
868            tap.push(br.effective_tap());
869            shift.push(br.shift);
870            ang_min.push(br.angmin);
871            ang_max.push(br.angmax);
872            rate_a.push(br.rate_a);
873            br_status.push(i64::from(br.in_service));
874        }
875    }
876
877    batch(with_scenario_pair(
878        scenario,
879        vec![
880            ("idx", i64s(idx)),
881            ("from_bus", i64s(from_bus)),
882            ("to_bus", i64s(to_bus)),
883            ("pf", f64s(pf)),
884            ("qf", f64s(qf)),
885            ("pt", f64s(pt)),
886            ("qt", f64s(qt)),
887            ("r", f64s(r_col)),
888            ("x", f64s(x_col)),
889            ("b", f64s(b_col)),
890            ("Yff_r", f64s(yff_r)),
891            ("Yff_i", f64s(yff_i)),
892            ("Yft_r", f64s(yft_r)),
893            ("Yft_i", f64s(yft_i)),
894            ("Ytf_r", f64s(ytf_r)),
895            ("Ytf_i", f64s(ytf_i)),
896            ("Ytt_r", f64s(ytt_r)),
897            ("Ytt_i", f64s(ytt_i)),
898            ("tap", f64s(tap)),
899            ("shift", f64s(shift)),
900            ("ang_min", f64s(ang_min)),
901            ("ang_max", f64s(ang_max)),
902            ("rate_a", f64s(rate_a)),
903            ("br_status", i64s(br_status)),
904        ],
905    ))
906}
907
908fn y_bus_batch(snaps: &[SnapshotView], opts: &GridfmOptions) -> Result<RecordBatch> {
909    // Upper bound on stacked nnz: each snapshot's Y_bus has at most 4 entries per
910    // branch plus a diagonal. The exact count varies (lossless branches, dropped
911    // zeros, per-scenario branch status), so this only sizes the allocation.
912    let est: usize = snaps
913        .iter()
914        .map(|s| 4 * s.view.branches().len() + s.view.n())
915        .sum();
916    let mut scenario = Vec::with_capacity(est);
917    let mut index1 = Vec::with_capacity(est);
918    let mut index2 = Vec::with_capacity(est);
919    let mut g_vals = Vec::with_capacity(est);
920    let mut b_vals = Vec::with_capacity(est);
921
922    for s in snaps {
923        let parts = build_ybus(&s.view, &opts.build_options())?;
924        // G and B don't share a sparsity pattern: a lossless branch (r = 0) is a
925        // pure reactance, so its G entries are structurally zero where B's aren't.
926        // datakit keys y_bus rows on the complex value being nonzero, i.e. the
927        // union of the G and B positions. Merge into a sorted (row, col) map so the
928        // output is row-major like `np.nonzero`, then drop any all-zero position.
929        let mut entries: std::collections::BTreeMap<(usize, usize), (f64, f64)> =
930            std::collections::BTreeMap::new();
931        for (row, g_row) in parts.g.outer_iterator().enumerate() {
932            for (col, &gv) in g_row.iter() {
933                entries.entry((row, col)).or_default().0 = gv;
934            }
935        }
936        for (row, b_row) in parts.b.outer_iterator().enumerate() {
937            for (col, &bv) in b_row.iter() {
938                entries.entry((row, col)).or_default().1 = bv;
939            }
940        }
941
942        for ((row, col), (gv, bv)) in entries {
943            if gv == 0.0 && bv == 0.0 {
944                continue;
945            }
946            scenario.push(s.scenario);
947            index1.push(row as i64);
948            index2.push(col as i64);
949            g_vals.push(gv);
950            b_vals.push(bv);
951        }
952    }
953
954    batch(with_scenario_pair(
955        scenario,
956        vec![
957            ("index1", i64s(index1)),
958            ("index2", i64s(index2)),
959            ("G", f64s(g_vals)),
960            ("B", f64s(b_vals)),
961        ],
962    ))
963}
964
965// --- small helpers ---------------------------------------------------------
966
967/// `(cp0, cp1, cp2)` = raw MATPOWER `(c0, c1, c2)` from a polynomial cost row.
968/// Coefficients are highest-order first, so `ncost == 3` is `[c2, c1, c0]`.
969/// Piecewise, missing, or malformed rows give zeros.
970fn gridfm_cost(cost: Option<&GenCost>) -> (f64, f64, f64) {
971    match cost {
972        Some(c) if c.model == 2 && c.coeffs.len() >= c.ncost => match c.ncost {
973            3 => (c.coeffs[2], c.coeffs[1], c.coeffs[0]),
974            2 => (c.coeffs[1], c.coeffs[0], 0.0),
975            1 => (c.coeffs[0], 0.0, 0.0),
976            _ => (0.0, 0.0, 0.0),
977        },
978        _ => (0.0, 0.0, 0.0),
979    }
980}
981
982/// Whether [`gridfm_cost`] can represent this cost row (used for the manifest's
983/// degenerate-cost count).
984fn cost_representable(cost: Option<&GenCost>) -> bool {
985    matches!(cost, Some(c) if c.model == 2 && c.coeffs.len() >= c.ncost && (1..=3).contains(&c.ncost))
986}
987
988fn put_parquet(
989    dir: &Path,
990    name: &str,
991    batch: &RecordBatch,
992    files: &mut Vec<PathBuf>,
993) -> Result<()> {
994    let path = dir.join(name);
995    let file = std::fs::File::create(&path)?;
996    let props = WriterProperties::builder()
997        .set_compression(Compression::SNAPPY)
998        .build();
999    let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props))
1000        .map_err(|e| Error::Parquet(e.to_string()))?;
1001    writer
1002        .write(batch)
1003        .map_err(|e| Error::Parquet(e.to_string()))?;
1004    writer.close().map_err(|e| Error::Parquet(e.to_string()))?;
1005    files.push(path);
1006    Ok(())
1007}
1008
1009/// Assemble a [`RecordBatch`] from named columns, in order; field types are read
1010/// off the arrays and all columns are non-null.
1011fn batch(columns: Vec<(&str, ArrayRef)>) -> Result<RecordBatch> {
1012    let fields: Vec<Field> = columns
1013        .iter()
1014        .map(|(name, arr)| Field::new(*name, arr.data_type().clone(), false))
1015        .collect();
1016    let arrays: Vec<ArrayRef> = columns.into_iter().map(|(_, arr)| arr).collect();
1017    RecordBatch::try_new(Arc::new(Schema::new(fields)), arrays)
1018        .map_err(|e| Error::Parquet(e.to_string()))
1019}
1020
1021fn i64s(v: Vec<i64>) -> ArrayRef {
1022    Arc::new(Int64Array::from(v))
1023}
1024
1025fn f64s(v: Vec<f64>) -> ArrayRef {
1026    Arc::new(Float64Array::from(v))
1027}
1028
1029/// Prepend the `scenario` / `load_scenario_idx` id columns (which hold identical
1030/// values) to a table's other columns. The Int64 array is built once and the Arc
1031/// shared, so the duplicate column costs a pointer, not a second `Vec`.
1032fn with_scenario_pair(
1033    scenario: Vec<i64>,
1034    rest: Vec<(&'static str, ArrayRef)>,
1035) -> Vec<(&'static str, ArrayRef)> {
1036    let scenario = i64s(scenario);
1037    let mut cols = Vec::with_capacity(rest.len() + 2);
1038    cols.push(("scenario", scenario.clone()));
1039    cols.push(("load_scenario_idx", scenario));
1040    cols.extend(rest);
1041    cols
1042}
1043
1044// === Reader: gridfm dataset → Network (issue #60) ==========================
1045//
1046// The inverse of the writer above: select one `scenario` out of the gridfm tables
1047// and rebuild a `Network`. Lossy but power flow complete — original bus ids are
1048// synthesized `1..n`, nodal load/shunt is folded to one synthetic element per
1049// bus, and HVDC/storage/piecewise costs are gone (see [`GridfmRead::warnings`]).
1050// `y_bus_data` is never read; branches carry raw `r/x/b`.
1051
1052/// One scenario read out of a gridfm dataset: the reconstructed [`Network`] plus
1053/// the fidelity warnings the lossy read couldn't avoid (mirroring
1054/// [`Conversion::warnings`](powerio::Conversion)).
1055#[derive(Debug, Clone)]
1056#[non_exhaustive]
1057pub struct GridfmRead {
1058    /// The reconstructed network (`source_format = SourceFormat::Gridfm`).
1059    pub network: Network,
1060    /// The scenario id these rows came from.
1061    pub scenario: i64,
1062    /// What the gridfm schema couldn't round-trip — synthesized bus ids, folded
1063    /// per-bus load/shunt, dropped HVDC/storage, etc.
1064    pub warnings: Vec<String>,
1065}
1066
1067/// Build one [`Network`] from in-memory gridfm tables, selecting `scenario`'s
1068/// rows. The pure inverse of [`gridfm_record_batches`]: `base_mva` and `name`
1069/// come from the caller (the disk path reads them from `gridfm_meta.json`).
1070///
1071/// # Errors
1072/// [`Error::FormatRead`] if a required column is missing or mistyped, a column
1073/// carries nulls, a dense index is negative, or `scenario` isn't present; plus
1074/// whatever [`Network::validate`] rejects (duplicate / dangling bus ids).
1075pub fn read_gridfm_network(
1076    tables: &GridfmTables,
1077    scenario: i64,
1078    base_mva: f64,
1079    name: &str,
1080) -> Result<GridfmRead> {
1081    let bus = bus_columns(std::slice::from_ref(&tables.bus))?;
1082    let gens = gen_columns(std::slice::from_ref(&tables.generator))?;
1083    let branch = branch_columns(std::slice::from_ref(&tables.branch))?;
1084    build_network_from_columns(&bus, &gens, &branch, scenario, base_mva, name, Vec::new())
1085}
1086
1087/// Read one `scenario` from a gridfm dataset on disk and rebuild a [`Network`].
1088/// The inverse of [`write_gridfm_dataset`].
1089///
1090/// `dir` is resolved leniently: the leaf `raw/` directory holding the parquet
1091/// files, a `<case>/` directory with a `raw/` child, or a parent directory with
1092/// exactly one `*/raw/` child all work. `base_mva` and the case name come from
1093/// `gridfm_meta.json` (a missing manifest defaults `base_mva` to 100 and warns).
1094///
1095/// # Errors
1096/// Propagates [`read_gridfm_network`] plus any filesystem / Parquet read error.
1097pub fn read_gridfm_dataset(dir: impl AsRef<Path>, scenario: i64) -> Result<GridfmRead> {
1098    let raw = resolve_raw_dir(dir.as_ref())?;
1099    let (base_mva, name, warnings) = read_meta(&raw);
1100    let bus = bus_columns(&read_parquet(&raw.join("bus_data.parquet"))?)?;
1101    let gens = gen_columns(&read_parquet(&raw.join("gen_data.parquet"))?)?;
1102    let branch = branch_columns(&read_parquet(&raw.join("branch_data.parquet"))?)?;
1103    build_network_from_columns(&bus, &gens, &branch, scenario, base_mva, &name, warnings)
1104}
1105
1106/// Read every scenario from a gridfm dataset, one [`Network`] per `scenario` id
1107/// (sorted ascending) over the shared topology — the read side of the scenario
1108/// batch (#57). Each scenario is rebuilt independently, so two scenarios may
1109/// differ in branch status, bus types, and reference bus.
1110///
1111/// # Errors
1112/// Propagates [`read_gridfm_dataset`]'s filesystem / Parquet / build errors.
1113pub fn read_gridfm_scenarios(dir: impl AsRef<Path>) -> Result<Vec<GridfmRead>> {
1114    let raw = resolve_raw_dir(dir.as_ref())?;
1115    let (base_mva, name, warnings) = read_meta(&raw);
1116    // Extract every column once and reuse across scenarios; rebuilding each
1117    // scenario from the raw batches would re-concatenate each table n_scenarios
1118    // times (O(n_scenarios × table_size)).
1119    let bus = bus_columns(&read_parquet(&raw.join("bus_data.parquet"))?)?;
1120    let gens = gen_columns(&read_parquet(&raw.join("gen_data.parquet"))?)?;
1121    let branch = branch_columns(&read_parquet(&raw.join("branch_data.parquet"))?)?;
1122
1123    distinct_sorted(&bus.scenario)
1124        .into_iter()
1125        .map(|s| {
1126            build_network_from_columns(&bus, &gens, &branch, s, base_mva, &name, warnings.clone())
1127        })
1128        .collect()
1129}
1130
1131/// The distinct scenario ids in a gridfm dataset, ascending — the keys
1132/// [`read_gridfm_scenarios`] rebuilds a [`Network`] for. Reads only `bus_data`'s
1133/// scenario column, so it enumerates a dataset's scenarios without rebuilding
1134/// every network; the C ABI's `pio_scenario_ids` is a thin wrapper over it.
1135///
1136/// # Errors
1137/// Propagates the directory resolution and `bus_data.parquet` read errors.
1138pub fn gridfm_scenario_ids(dir: impl AsRef<Path>) -> Result<Vec<i64>> {
1139    let raw = resolve_raw_dir(dir.as_ref())?;
1140    let bus = bus_columns(&read_parquet(&raw.join("bus_data.parquet"))?)?;
1141    Ok(distinct_sorted(&bus.scenario))
1142}
1143
1144/// The distinct values of `scenario`, ascending.
1145fn distinct_sorted(scenario: &[i64]) -> Vec<i64> {
1146    let mut ids = scenario.to_vec();
1147    ids.sort_unstable();
1148    ids.dedup();
1149    ids
1150}
1151
1152/// The unperturbed base case: [`read_gridfm_dataset`] at `scenario = 0` (datakit's
1153/// convention). There is no single "shared base" beyond a chosen scenario — bus
1154/// types, branch status, and reference bus all vary per scenario — so the base
1155/// case is just scenario 0.
1156///
1157/// # Errors
1158/// Propagates [`read_gridfm_dataset`].
1159pub fn gridfm_base_case(dir: impl AsRef<Path>) -> Result<GridfmRead> {
1160    read_gridfm_dataset(dir, 0)
1161}
1162
1163/// Every `bus_data` column the reader uses, concatenated across all batches once.
1164/// Extracting columns up front lets a multi-scenario read reuse them rather than
1165/// re-concatenating the whole table for each scenario.
1166struct BusColumns {
1167    scenario: Vec<i64>,
1168    bus: Vec<i64>,
1169    pv: Vec<i64>,
1170    refc: Vec<i64>,
1171    vm: Vec<f64>,
1172    va: Vec<f64>,
1173    vn_kv: Vec<f64>,
1174    min_vm: Vec<f64>,
1175    max_vm: Vec<f64>,
1176    pd: Vec<f64>,
1177    qd: Vec<f64>,
1178    gs: Vec<f64>,
1179    bs: Vec<f64>,
1180}
1181
1182fn bus_columns(batches: &[RecordBatch]) -> Result<BusColumns> {
1183    Ok(BusColumns {
1184        scenario: i64_col(batches, "scenario")?,
1185        bus: i64_col(batches, "bus")?,
1186        pv: i64_col(batches, "PV")?,
1187        refc: i64_col(batches, "REF")?,
1188        vm: f64_col(batches, "Vm")?,
1189        va: f64_col(batches, "Va")?,
1190        vn_kv: f64_col(batches, "vn_kv")?,
1191        min_vm: f64_col(batches, "min_vm_pu")?,
1192        max_vm: f64_col(batches, "max_vm_pu")?,
1193        pd: f64_col(batches, "Pd")?,
1194        qd: f64_col(batches, "Qd")?,
1195        gs: f64_col(batches, "GS")?,
1196        bs: f64_col(batches, "BS")?,
1197    })
1198}
1199
1200/// Every `gen_data` column the reader uses (cost is `cp0`/`cp1`/`cp2`).
1201struct GenColumns {
1202    scenario: Vec<i64>,
1203    bus: Vec<i64>,
1204    p_mw: Vec<f64>,
1205    q_mvar: Vec<f64>,
1206    min_p: Vec<f64>,
1207    max_p: Vec<f64>,
1208    min_q: Vec<f64>,
1209    max_q: Vec<f64>,
1210    cp0: Vec<f64>,
1211    cp1: Vec<f64>,
1212    cp2: Vec<f64>,
1213    in_service: Vec<i64>,
1214}
1215
1216fn gen_columns(batches: &[RecordBatch]) -> Result<GenColumns> {
1217    Ok(GenColumns {
1218        scenario: i64_col(batches, "scenario")?,
1219        bus: i64_col(batches, "bus")?,
1220        p_mw: f64_col(batches, "p_mw")?,
1221        q_mvar: f64_col(batches, "q_mvar")?,
1222        min_p: f64_col(batches, "min_p_mw")?,
1223        max_p: f64_col(batches, "max_p_mw")?,
1224        min_q: f64_col(batches, "min_q_mvar")?,
1225        max_q: f64_col(batches, "max_q_mvar")?,
1226        cp0: f64_col(batches, "cp0_eur")?,
1227        cp1: f64_col(batches, "cp1_eur_per_mw")?,
1228        cp2: f64_col(batches, "cp2_eur_per_mw2")?,
1229        in_service: i64_col(batches, "in_service")?,
1230    })
1231}
1232
1233/// Every `branch_data` column the reader uses (`Y**` / flow columns are ignored).
1234struct BranchColumns {
1235    scenario: Vec<i64>,
1236    from_bus: Vec<i64>,
1237    to_bus: Vec<i64>,
1238    r: Vec<f64>,
1239    x: Vec<f64>,
1240    b: Vec<f64>,
1241    tap: Vec<f64>,
1242    shift: Vec<f64>,
1243    ang_min: Vec<f64>,
1244    ang_max: Vec<f64>,
1245    rate_a: Vec<f64>,
1246    status: Vec<i64>,
1247}
1248
1249fn branch_columns(batches: &[RecordBatch]) -> Result<BranchColumns> {
1250    Ok(BranchColumns {
1251        scenario: i64_col(batches, "scenario")?,
1252        from_bus: i64_col(batches, "from_bus")?,
1253        to_bus: i64_col(batches, "to_bus")?,
1254        r: f64_col(batches, "r")?,
1255        x: f64_col(batches, "x")?,
1256        b: f64_col(batches, "b")?,
1257        tap: f64_col(batches, "tap")?,
1258        shift: f64_col(batches, "shift")?,
1259        ang_min: f64_col(batches, "ang_min")?,
1260        ang_max: f64_col(batches, "ang_max")?,
1261        rate_a: f64_col(batches, "rate_a")?,
1262        status: i64_col(batches, "br_status")?,
1263    })
1264}
1265
1266/// The shared core: rebuild one scenario's [`Network`] from already-extracted
1267/// columns. The columns are concatenated once by the caller and reused across
1268/// scenarios, so a multi-scenario read doesn't re-copy each table per scenario.
1269/// `warnings` is seeded with any manifest-level notes (e.g. a defaulted
1270/// `base_mva`) and extended with the per-read fidelity notes.
1271// tap == 1.0 / != 0.0 reads are exact, not approximate; the builder is one long
1272// linear pass over the three tables, so the length is inherent.
1273#[allow(clippy::float_cmp, clippy::too_many_lines)]
1274fn build_network_from_columns(
1275    bus: &BusColumns,
1276    gens: &GenColumns,
1277    branch: &BranchColumns,
1278    scenario: i64,
1279    base_mva: f64,
1280    name: &str,
1281    mut warnings: Vec<String>,
1282) -> Result<GridfmRead> {
1283    // --- buses, loads, shunts (bus_data) ---
1284    let bus_rows = scenario_rows(&bus.scenario, scenario);
1285    if bus_rows.is_empty() {
1286        let mut avail = bus.scenario.clone();
1287        avail.sort_unstable();
1288        avail.dedup();
1289        return Err(Error::FormatRead {
1290            format: "gridfm",
1291            message: format!("scenario {scenario} not present; available: {avail:?}"),
1292        });
1293    }
1294
1295    let bus_id = &bus.bus;
1296    let pv = &bus.pv;
1297    let refc = &bus.refc;
1298    let vm = &bus.vm;
1299    let va = &bus.va;
1300    let vn_kv = &bus.vn_kv;
1301    let min_vm = &bus.min_vm;
1302    let max_vm = &bus.max_vm;
1303    let pd = &bus.pd;
1304    let qd = &bus.qd;
1305    let gs = &bus.gs;
1306    let bs = &bus.bs;
1307
1308    let mut buses = Vec::with_capacity(bus_rows.len());
1309    let mut loads = Vec::new();
1310    let mut shunts = Vec::new();
1311    // Dense bus index -> voltage magnitude, so a generator recovers its `vg`
1312    // setpoint from its bus (gridfm has no separate gen voltage column, but a
1313    // generator's setpoint is its bus's regulated `Vm`).
1314    let mut bus_vm: std::collections::HashMap<i64, f64> =
1315        std::collections::HashMap::with_capacity(bus_rows.len());
1316    for &r in &bus_rows {
1317        let id = dense_bus_id(bus_id[r])?;
1318        bus_vm.insert(bus_id[r], vm[r]);
1319        // REF / PV / PQ one-hot; the writer guarantees exactly one set, but read
1320        // defensively (REF wins, then PV, else PQ).
1321        let kind = if refc[r] != 0 {
1322            BusType::Ref
1323        } else if pv[r] != 0 {
1324            BusType::Pv
1325        } else {
1326            BusType::Pq
1327        };
1328        let mut bus = Bus::new(id, kind, vn_kv[r]);
1329        bus.vm = vm[r];
1330        bus.va = va[r];
1331        bus.vmax = max_vm[r];
1332        bus.vmin = min_vm[r];
1333        bus.area = 0;
1334        bus.zone = 0;
1335        buses.push(bus);
1336        if pd[r] != 0.0 || qd[r] != 0.0 {
1337            loads.push(Load::new(id, pd[r], qd[r]));
1338        }
1339        // Undo the writer's `/ base_mva` (gridfm.rs:487) to recover MW/MVAr at V=1.
1340        if gs[r] != 0.0 || bs[r] != 0.0 {
1341            shunts.push(Shunt::new(id, gs[r] * base_mva, bs[r] * base_mva));
1342        }
1343    }
1344
1345    // --- generators (gen_data) ---
1346    let gen_rows = scenario_rows(&gens.scenario, scenario);
1347    require_scenario_block(&gens.scenario, scenario, &gen_rows, "gen_data")?;
1348    let g_bus = &gens.bus;
1349    let p_mw = &gens.p_mw;
1350    let q_mvar = &gens.q_mvar;
1351    let min_p = &gens.min_p;
1352    let max_p = &gens.max_p;
1353    let min_q = &gens.min_q;
1354    let max_q = &gens.max_q;
1355    let cp0 = &gens.cp0;
1356    let cp1 = &gens.cp1;
1357    let cp2 = &gens.cp2;
1358    let g_in = &gens.in_service;
1359
1360    let mut generators = Vec::with_capacity(gen_rows.len());
1361    for &r in &gen_rows {
1362        // Any nonzero coefficient is a real polynomial cost. An all-zero triple is
1363        // ambiguous in the schema — a generator with no cost, a genuine zero
1364        // polynomial cost, or a piecewise/cubic+ cost the writer couldn't represent
1365        // (all written as `(0, 0, 0)`) — so it reads back as `None` (see warnings).
1366        let cost = if cp0[r] != 0.0 || cp1[r] != 0.0 || cp2[r] != 0.0 {
1367            Some(GenCost::new(2, 0.0, 0.0, vec![cp2[r], cp1[r], cp0[r]]))
1368        } else {
1369            None
1370        };
1371        let mut generator = Generator::new(dense_bus_id(g_bus[r])?);
1372        generator.pg = p_mw[r];
1373        generator.qg = q_mvar[r];
1374        generator.pmax = max_p[r];
1375        generator.pmin = min_p[r];
1376        generator.qmax = max_q[r];
1377        generator.qmin = min_q[r];
1378        // The schema has no gen vg; recover the setpoint from the bus's Vm
1379        // (falls back to 1.0 only if the gen references an absent bus, which
1380        // `validate()` below then rejects).
1381        generator.vg = bus_vm.get(&g_bus[r]).copied().unwrap_or(1.0);
1382        generator.mbase = base_mva;
1383        generator.in_service = g_in[r] != 0;
1384        generator.cost = cost;
1385        generators.push(generator);
1386    }
1387
1388    // --- branches (branch_data); Y** and pf/qf/pt/qt are ignored ---
1389    let br_rows = scenario_rows(&branch.scenario, scenario);
1390    require_scenario_block(&branch.scenario, scenario, &br_rows, "branch_data")?;
1391    let from_bus = &branch.from_bus;
1392    let to_bus = &branch.to_bus;
1393    let r_col = &branch.r;
1394    let x_col = &branch.x;
1395    let b_col = &branch.b;
1396    let tap = &branch.tap;
1397    let shift = &branch.shift;
1398    let ang_min = &branch.ang_min;
1399    let ang_max = &branch.ang_max;
1400    let rate_a = &branch.rate_a;
1401    let br_status = &branch.status;
1402
1403    let mut branches = Vec::with_capacity(br_rows.len());
1404    // The writer stores the *effective* tap (`Branch::effective_tap`), so a line
1405    // (raw tap 0) lands as 1.0. Map unit tap + no shift back to the raw `tap == 0`
1406    // line convention, otherwise every line reads as a transformer
1407    // (`is_transformer` keys off `tap != 0`) and a read→write to a format that
1408    // splits lines from transformers (PSS/E, PowerWorld) misclassifies them. A
1409    // genuine unity-ratio, zero-shift transformer is electrically identical to a
1410    // line and is (unavoidably) read as one.
1411    let mut unit_tap_lines = 0usize;
1412    for &row in &br_rows {
1413        let shift_v = shift[row];
1414        let tap_out = if tap[row] == 1.0 && shift_v == 0.0 {
1415            unit_tap_lines += 1;
1416            0.0
1417        } else {
1418            tap[row]
1419        };
1420        let mut branch = Branch::new(
1421            dense_bus_id(from_bus[row])?,
1422            dense_bus_id(to_bus[row])?,
1423            r_col[row],
1424            x_col[row],
1425        );
1426        branch.b = b_col[row];
1427        branch.rate_a = rate_a[row];
1428        branch.tap = tap_out;
1429        branch.shift = shift_v;
1430        branch.in_service = br_status[row] != 0;
1431        branch.angmin = ang_min[row];
1432        branch.angmax = ang_max[row];
1433        branches.push(branch);
1434    }
1435
1436    let mut net = Network::new(name, base_mva);
1437    net.buses = buses;
1438    net.loads = loads;
1439    net.shunts = shunts;
1440    net.branches = branches;
1441    net.generators = generators;
1442    net.source_format = SourceFormat::Gridfm;
1443    net.validate()?;
1444
1445    // --- fidelity warnings: what the gridfm schema couldn't carry back ---
1446    warnings.push(format!(
1447        "synthesized bus ids 1..={}; original bus ids are not stored in a gridfm dataset, \
1448         so a written case is renumbered",
1449        net.buses.len()
1450    ));
1451    if !net.loads.is_empty() {
1452        warnings.push(format!(
1453            "folded nodal load into {} synthetic per-bus Load(s); per-load granularity is \
1454             not recoverable",
1455            net.loads.len()
1456        ));
1457    }
1458    if !net.shunts.is_empty() {
1459        warnings.push(format!(
1460            "folded nodal shunts into {} synthetic per-bus Shunt(s); per-shunt granularity \
1461             is not recoverable",
1462            net.shunts.len()
1463        ));
1464    }
1465    if unit_tap_lines > 0 {
1466        warnings.push(format!(
1467            "{unit_tap_lines} branch(es) had unit effective tap and no phase shift and were read \
1468             as lines (raw tap 0); a unity-ratio, zero-shift transformer in the source is \
1469             indistinguishable from a line and is read as one (the power flow is identical)"
1470        ));
1471    }
1472    let no_cost_gens = net.generators.iter().filter(|g| g.cost.is_none()).count();
1473    if no_cost_gens > 0 {
1474        warnings.push(format!(
1475            "{no_cost_gens} generator(s) read with no cost: an all-zero cost triple in the dataset \
1476             is the writer's encoding for a generator with no cost, a genuine zero polynomial \
1477             cost, or a piecewise/cubic+ cost it couldn't represent — indistinguishable on read"
1478        ));
1479    }
1480    warnings.push(
1481        "HVDC, storage, areas/zones, bus names, rate_b/rate_c, generator mbase/ramp limits, \
1482         and startup/shutdown costs are absent from the gridfm schema"
1483            .to_string(),
1484    );
1485
1486    Ok(GridfmRead {
1487        network: net,
1488        scenario,
1489        warnings,
1490    })
1491}
1492
1493// --- reader helpers --------------------------------------------------------
1494
1495/// Resolve the directory holding the gridfm parquet files, accepting (in order)
1496/// the leaf `raw/` dir itself, a `<case>/` dir with a `raw/` child, or a parent
1497/// dir with exactly one `*/raw/` child.
1498fn resolve_raw_dir(dir: &Path) -> Result<PathBuf> {
1499    let has_bus = |d: &Path| d.join("bus_data.parquet").is_file();
1500    if has_bus(dir) {
1501        return Ok(dir.to_path_buf());
1502    }
1503    let nested = dir.join("raw");
1504    if has_bus(&nested) {
1505        return Ok(nested);
1506    }
1507    // Parent-of-<case> fallback: scan for exactly one `*/raw/bus_data.parquet`.
1508    // Surface read_dir / entry IO errors (missing dir, permissions) rather than
1509    // masking them as "no dataset found".
1510    let mut matches: Vec<PathBuf> = Vec::new();
1511    let entries = std::fs::read_dir(dir).map_err(|e| Error::FormatRead {
1512        format: "gridfm",
1513        message: format!("reading directory {}: {e}", dir.display()),
1514    })?;
1515    for entry in entries {
1516        let entry = entry.map_err(|e| Error::FormatRead {
1517            format: "gridfm",
1518            message: format!("reading an entry of {}: {e}", dir.display()),
1519        })?;
1520        let raw = entry.path().join("raw");
1521        if has_bus(&raw) {
1522            matches.push(raw);
1523        }
1524    }
1525    match matches.len() {
1526        1 => Ok(matches.pop().expect("len checked")),
1527        0 => Err(Error::FormatRead {
1528            format: "gridfm",
1529            message: format!(
1530                "no gridfm dataset under {}; expected bus_data.parquet in the directory, a \
1531                 raw/ child, or a single <case>/raw/ child",
1532                dir.display()
1533            ),
1534        }),
1535        n => Err(Error::FormatRead {
1536            format: "gridfm",
1537            message: format!(
1538                "{n} gridfm datasets under {}; point at the specific <case>/raw directory",
1539                dir.display()
1540            ),
1541        }),
1542    }
1543}
1544
1545/// Read `base_mva` and the case name from `raw/gridfm_meta.json`. A missing or
1546/// unreadable manifest is not fatal — a bare prediction may lack one — so default
1547/// `base_mva` to 100, derive the name from the `<case>/raw` parent, and warn.
1548fn read_meta(raw: &Path) -> (f64, String, Vec<String>) {
1549    let case_from_path = || {
1550        raw.parent()
1551            .and_then(Path::file_name)
1552            .and_then(|s| s.to_str())
1553            .map_or_else(|| "gridfm".to_string(), str::to_string)
1554    };
1555    let text = match std::fs::read_to_string(raw.join("gridfm_meta.json")) {
1556        Ok(text) => text,
1557        // Not just "not found": surface the underlying error (permissions, non-UTF-8,
1558        // etc.) so a dataset problem is diagnosable, while still defaulting base_mva.
1559        Err(e) => {
1560            return (
1561                100.0,
1562                case_from_path(),
1563                vec![format!(
1564                    "gridfm_meta.json could not be read ({e}); base_mva defaulted to 100"
1565                )],
1566            );
1567        }
1568    };
1569    let Ok(meta) = serde_json::from_str::<serde_json::Value>(&text) else {
1570        return (
1571            100.0,
1572            case_from_path(),
1573            vec!["gridfm_meta.json is not valid JSON; base_mva defaulted to 100".to_string()],
1574        );
1575    };
1576    let name = meta
1577        .get("case_name")
1578        .and_then(serde_json::Value::as_str)
1579        .map_or_else(case_from_path, str::to_string);
1580    let mut warnings = Vec::new();
1581    // base_mva must be a positive, finite number; anything else (absent, zero,
1582    // negative, NaN) defaults to 100 with a note — shunt recovery scales by it.
1583    let base = match meta.get("base_mva").and_then(serde_json::Value::as_f64) {
1584        Some(b) if b.is_finite() && b > 0.0 => b,
1585        _ => {
1586            warnings.push(
1587                "gridfm_meta.json has no usable base_mva (absent or not a positive number); \
1588                 defaulted to 100"
1589                    .to_string(),
1590            );
1591            100.0
1592        }
1593    };
1594    (base, name, warnings)
1595}
1596
1597/// Open a parquet file and collect every record batch (one for powerio-written
1598/// data, possibly several for an externally-written prediction).
1599fn read_parquet(path: &Path) -> Result<Vec<RecordBatch>> {
1600    let file = std::fs::File::open(path).map_err(|e| Error::FormatRead {
1601        format: "gridfm",
1602        message: format!("opening {}: {e}", path.display()),
1603    })?;
1604    let reader = ParquetRecordBatchReaderBuilder::try_new(file)
1605        .and_then(ParquetRecordBatchReaderBuilder::build)
1606        .map_err(|e| Error::FormatRead {
1607            format: "gridfm",
1608            message: format!("reading {}: {e}", path.display()),
1609        })?;
1610    reader
1611        .collect::<std::result::Result<Vec<_>, _>>()
1612        .map_err(|e| Error::FormatRead {
1613            format: "gridfm",
1614            message: format!("decoding {}: {e}", path.display()),
1615        })
1616}
1617
1618/// Row indices whose `scenario` column equals `scenario`, in table order.
1619fn scenario_rows(scen: &[i64], scenario: i64) -> Vec<usize> {
1620    scen.iter()
1621        .enumerate()
1622        .filter_map(|(i, &s)| (s == scenario).then_some(i))
1623        .collect()
1624}
1625
1626/// Guard a gen/branch table: empty `rows` is fine when the whole table is empty
1627/// (a case legitimately has no generators, or no branches), but if the table
1628/// holds rows for *other* scenarios yet none for this one, the dataset is partial
1629/// or corrupt and would silently yield a wrong-but-valid network — error instead.
1630fn require_scenario_block(
1631    scen_col: &[i64],
1632    scenario: i64,
1633    rows: &[usize],
1634    table: &str,
1635) -> Result<()> {
1636    if rows.is_empty() && !scen_col.is_empty() {
1637        return Err(Error::FormatRead {
1638            format: "gridfm",
1639            message: format!(
1640                "scenario {scenario} has no {table} rows, but the table holds {} row(s) for other \
1641                 scenarios — a partial or corrupt dataset",
1642                scen_col.len()
1643            ),
1644        });
1645    }
1646    Ok(())
1647}
1648
1649/// A dense `[0, n)` parquet index → 1-based [`BusId`]. Errors on a negative index.
1650fn dense_bus_id(v: i64) -> Result<BusId> {
1651    let idx = usize::try_from(v).map_err(|_| Error::FormatRead {
1652        format: "gridfm",
1653        message: format!("negative dense bus index {v}"),
1654    })?;
1655    Ok(BusId(idx + 1))
1656}
1657
1658/// Look up a named column, erroring if absent.
1659fn column<'a>(b: &'a RecordBatch, name: &str) -> Result<&'a ArrayRef> {
1660    b.column_by_name(name).ok_or_else(|| Error::FormatRead {
1661        format: "gridfm",
1662        message: format!("missing column `{name}`"),
1663    })
1664}
1665
1666/// Concatenate a named non-null `Int64` column across all batches.
1667fn i64_col(batches: &[RecordBatch], name: &str) -> Result<Vec<i64>> {
1668    let mut out = Vec::with_capacity(batches.iter().map(RecordBatch::num_rows).sum());
1669    for b in batches {
1670        let arr = column(b, name)?;
1671        let col = arr
1672            .as_any()
1673            .downcast_ref::<Int64Array>()
1674            .ok_or_else(|| Error::FormatRead {
1675                format: "gridfm",
1676                message: format!("column `{name}` is not Int64"),
1677            })?;
1678        if col.null_count() > 0 {
1679            return Err(Error::FormatRead {
1680                format: "gridfm",
1681                message: format!("column `{name}` has nulls"),
1682            });
1683        }
1684        out.extend_from_slice(col.values());
1685    }
1686    Ok(out)
1687}
1688
1689/// Concatenate a named non-null `Float64` column across all batches.
1690fn f64_col(batches: &[RecordBatch], name: &str) -> Result<Vec<f64>> {
1691    let mut out = Vec::with_capacity(batches.iter().map(RecordBatch::num_rows).sum());
1692    for b in batches {
1693        let arr = column(b, name)?;
1694        let col = arr
1695            .as_any()
1696            .downcast_ref::<Float64Array>()
1697            .ok_or_else(|| Error::FormatRead {
1698                format: "gridfm",
1699                message: format!("column `{name}` is not Float64"),
1700            })?;
1701        if col.null_count() > 0 {
1702            return Err(Error::FormatRead {
1703                format: "gridfm",
1704                message: format!("column `{name}` has nulls"),
1705            });
1706        }
1707        out.extend_from_slice(col.values());
1708    }
1709    Ok(out)
1710}
1711
1712#[cfg(test)]
1713mod tests {
1714    use super::*;
1715    use crate::network::{Branch, BranchCharging, Bus, BusId, BusType, Generator};
1716    use arrow::array::{Float64Array, Int64Array};
1717    use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
1718
1719    const BUS_COLS: &[&str] = &[
1720        "scenario",
1721        "load_scenario_idx",
1722        "bus",
1723        "Pd",
1724        "Qd",
1725        "Pg",
1726        "Qg",
1727        "Vm",
1728        "Va",
1729        "PQ",
1730        "PV",
1731        "REF",
1732        "vn_kv",
1733        "min_vm_pu",
1734        "max_vm_pu",
1735        "GS",
1736        "BS",
1737    ];
1738    const GEN_COLS: &[&str] = &[
1739        "scenario",
1740        "load_scenario_idx",
1741        "idx",
1742        "bus",
1743        "p_mw",
1744        "q_mvar",
1745        "min_p_mw",
1746        "max_p_mw",
1747        "min_q_mvar",
1748        "max_q_mvar",
1749        "cp0_eur",
1750        "cp1_eur_per_mw",
1751        "cp2_eur_per_mw2",
1752        "in_service",
1753        "is_slack_gen",
1754    ];
1755    const BRANCH_COLS: &[&str] = &[
1756        "scenario",
1757        "load_scenario_idx",
1758        "idx",
1759        "from_bus",
1760        "to_bus",
1761        "pf",
1762        "qf",
1763        "pt",
1764        "qt",
1765        "r",
1766        "x",
1767        "b",
1768        "Yff_r",
1769        "Yff_i",
1770        "Yft_r",
1771        "Yft_i",
1772        "Ytf_r",
1773        "Ytf_i",
1774        "Ytt_r",
1775        "Ytt_i",
1776        "tap",
1777        "shift",
1778        "ang_min",
1779        "ang_max",
1780        "rate_a",
1781        "br_status",
1782    ];
1783    const YBUS_COLS: &[&str] = &[
1784        "scenario",
1785        "load_scenario_idx",
1786        "index1",
1787        "index2",
1788        "G",
1789        "B",
1790    ];
1791
1792    fn case14() -> Network {
1793        let path = concat!(env!("CARGO_MANIFEST_DIR"), "/../tests/data/case14.m");
1794        crate::parse_matpower_file(path).unwrap()
1795    }
1796
1797    fn names(b: &RecordBatch) -> Vec<String> {
1798        b.schema()
1799            .fields()
1800            .iter()
1801            .map(|f| f.name().clone())
1802            .collect()
1803    }
1804
1805    fn col_i64<'a>(b: &'a RecordBatch, name: &str) -> &'a Int64Array {
1806        b.column_by_name(name)
1807            .unwrap()
1808            .as_any()
1809            .downcast_ref()
1810            .unwrap()
1811    }
1812
1813    fn col_f64<'a>(b: &'a RecordBatch, name: &str) -> &'a Float64Array {
1814        b.column_by_name(name)
1815            .unwrap()
1816            .as_any()
1817            .downcast_ref()
1818            .unwrap()
1819    }
1820
1821    fn read(path: &Path) -> RecordBatch {
1822        let file = std::fs::File::open(path).unwrap();
1823        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
1824            .unwrap()
1825            .build()
1826            .unwrap();
1827        // case14 fits in one row group / batch.
1828        reader.next().unwrap().unwrap()
1829    }
1830
1831    #[test]
1832    fn schema_and_row_counts_match_case14() {
1833        let net = case14();
1834        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
1835
1836        assert_eq!(names(&tables.bus), BUS_COLS);
1837        assert_eq!(names(&tables.generator), GEN_COLS);
1838        assert_eq!(names(&tables.branch), BRANCH_COLS);
1839        assert_eq!(names(tables.y_bus.as_ref().unwrap()), YBUS_COLS);
1840
1841        assert_eq!(tables.bus.num_rows(), net.buses.len()); // 14
1842        assert_eq!(tables.generator.num_rows(), net.generators.len()); // 5
1843        assert_eq!(tables.branch.num_rows(), net.branches.len()); // 20
1844    }
1845
1846    #[test]
1847    fn branch_b_uses_terminal_charging_projection() {
1848        let mut net = Network::in_memory(
1849            "terminal-projection",
1850            100.0,
1851            vec![bus(1, BusType::Ref), bus(2, BusType::Pq)],
1852            Vec::new(),
1853        );
1854        let mut br = branch(1, 2, 0.01, 0.1);
1855        br.charging = Some(BranchCharging::new(0.01, 0.02, 0.03, 0.05));
1856        net.branches.push(br);
1857
1858        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
1859        let b = col_f64(&tables.branch, "b").value(0);
1860        assert!((b - 0.07).abs() < 1e-12);
1861
1862        let yff_i = col_f64(&tables.branch, "Yff_i").value(0);
1863        let ytt_i = col_f64(&tables.branch, "Ytt_i").value(0);
1864        let y_series_i = -0.1 / (0.01 * 0.01 + 0.1 * 0.1);
1865        let recovered_b = yff_i + ytt_i - 2.0 * y_series_i;
1866        assert!((recovered_b - b).abs() < 1e-12);
1867    }
1868
1869    #[test]
1870    fn parquet_round_trips_through_reader() {
1871        let net = case14();
1872        let dir = tempfile::tempdir().unwrap();
1873        let out = write_gridfm_dataset(&net, 0, dir.path(), &GridfmOptions::default()).unwrap();
1874
1875        let raw = dir.path().join("case14").join("raw");
1876        assert_eq!(out.dir, raw);
1877        for f in ["bus_data", "gen_data", "branch_data", "y_bus_data"] {
1878            assert!(raw.join(format!("{f}.parquet")).is_file(), "missing {f}");
1879        }
1880        assert!(raw.join("gridfm_meta.json").is_file());
1881
1882        let bus = read(&raw.join("bus_data.parquet"));
1883        assert_eq!(names(&bus), BUS_COLS);
1884        assert_eq!(bus.num_rows(), net.buses.len());
1885        assert_eq!(names(&read(&raw.join("gen_data.parquet"))), GEN_COLS);
1886        assert_eq!(names(&read(&raw.join("branch_data.parquet"))), BRANCH_COLS);
1887        assert_eq!(names(&read(&raw.join("y_bus_data.parquet"))), YBUS_COLS);
1888    }
1889
1890    #[test]
1891    fn bus_table_values_are_consistent() {
1892        let net = case14();
1893        let view = IndexedNetwork::new(&net);
1894        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
1895        let bus = &tables.bus;
1896
1897        // Exactly one reference bus; PQ/PV/REF partition every bus.
1898        let (pq, pv, r) = (col_i64(bus, "PQ"), col_i64(bus, "PV"), col_i64(bus, "REF"));
1899        assert_eq!(r.values().iter().sum::<i64>(), 1);
1900        for i in 0..bus.num_rows() {
1901            assert_eq!(pq.value(i) + pv.value(i) + r.value(i), 1);
1902        }
1903
1904        // GS/BS are the per-bus shunt aggregate divided by base_mva.
1905        let base = net.base_mva;
1906        let gs = col_f64(bus, "GS");
1907        for i in 0..bus.num_rows() {
1908            assert!((gs.value(i) - view.gs()[i] / base).abs() < 1e-12);
1909        }
1910
1911        // `bus` column is the dense 0..n range.
1912        let bus_idx = col_i64(bus, "bus");
1913        for i in 0..bus.num_rows() {
1914            assert_eq!(bus_idx.value(i), i as i64);
1915        }
1916    }
1917
1918    #[test]
1919    fn branch_admittance_columns_match_build_ybus() {
1920        // The branch table's Y** columns are the same kernel build_ybus scatters,
1921        // so a known in-service branch's block must equal branch_admittance.
1922        let net = case14();
1923        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
1924        let br = &tables.branch;
1925
1926        let yff_r = col_f64(br, "Yff_r");
1927        let yff_i = col_f64(br, "Yff_i");
1928        for (row, branch) in net.branches.iter().enumerate() {
1929            // Raw fixture, so the shift is in degrees — convert as build_ybus does.
1930            let shift_rad = branch.shift.to_radians();
1931            if let Some(block) =
1932                branch_admittance(branch, YbusFlags::default(), shift_rad, row).unwrap()
1933            {
1934                assert!((yff_r.value(row) - block[0].re).abs() < 1e-12);
1935                assert!((yff_i.value(row) - block[0].im).abs() < 1e-12);
1936            }
1937        }
1938    }
1939
1940    #[test]
1941    fn is_slack_gen_marks_the_reference_bus() {
1942        let net = case14();
1943        let view = IndexedNetwork::new(&net);
1944        let ref_bus = view.reference_bus_index().unwrap();
1945        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
1946        let g = &tables.generator;
1947
1948        let bus = col_i64(g, "bus");
1949        let slack = col_i64(g, "is_slack_gen");
1950        for i in 0..g.num_rows() {
1951            assert_eq!(slack.value(i) == 1, bus.value(i) as usize == ref_bus);
1952        }
1953        assert!(slack.values().contains(&1), "no slack generator");
1954    }
1955
1956    #[test]
1957    fn branch_flows_close_the_power_balance_on_a_solved_case() {
1958        // case14 ships a converged operating point, so the active flows must obey
1959        // KCL: total branch loss = generation - demand - shunt draw. This is the
1960        // value-level guard on branch_flows (a missing ×base, a sign flip, or a
1961        // wrong conj would break it). Every branch's real loss is also ≥ 0.
1962        let net = case14();
1963        let view = IndexedNetwork::new(&net);
1964        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
1965        let br = &tables.branch;
1966        let (pf, pt, status) = (
1967            col_f64(br, "pf"),
1968            col_f64(br, "pt"),
1969            col_i64(br, "br_status"),
1970        );
1971
1972        let mut loss = 0.0;
1973        for i in 0..br.num_rows() {
1974            if status.value(i) == 1 {
1975                let l = pf.value(i) + pt.value(i);
1976                assert!(l >= -1e-6, "branch {i} has negative real loss {l}");
1977                loss += l;
1978            }
1979        }
1980        assert!(loss > 1.0, "case14 has ~13 MW of real loss, got {loss}");
1981
1982        let gen_p: f64 = net
1983            .generators
1984            .iter()
1985            .filter(|g| g.in_service)
1986            .map(|g| g.pg)
1987            .sum();
1988        let load_p: f64 = net.loads.iter().map(|l| l.p).sum();
1989        // Real shunt draw at the stored voltages: gs() is MW at V = 1.
1990        let shunt_p: f64 = (0..view.n())
1991            .map(|i| view.gs()[i] * net.buses[i].vm.powi(2))
1992            .sum();
1993        assert!(
1994            (loss - (gen_p - load_p - shunt_p)).abs() < 0.5,
1995            "power balance off: loss {loss} vs gen-load-shunt {}",
1996            gen_p - load_p - shunt_p
1997        );
1998    }
1999
2000    #[test]
2001    fn zero_impedance_branch_zeros_columns_and_is_counted() {
2002        // No vendored fixture has r = x = 0, so build one: branch 0 is a zero-
2003        // impedance tie, branch 1 is normal. The tie's admittance and flow columns
2004        // must be zero (never NaN), and the manifest must count it.
2005        let net = Network::in_memory(
2006            "zeroimp",
2007            100.0,
2008            vec![
2009                bus(1, BusType::Ref),
2010                bus(2, BusType::Pq),
2011                bus(3, BusType::Pq),
2012            ],
2013            vec![branch(1, 2, 0.0, 0.0), branch(2, 3, 0.01, 0.1)],
2014        );
2015        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
2016        let br = &tables.branch;
2017        for col in [
2018            "Yff_r", "Yff_i", "Yft_r", "Yft_i", "Ytf_r", "Ytf_i", "Ytt_r", "Ytt_i", "pf", "qf",
2019            "pt", "qt",
2020        ] {
2021            let v = col_f64(br, col).value(0);
2022            assert!(
2023                v == 0.0,
2024                "{col} should be 0 for the zero-impedance branch, got {v}"
2025            );
2026        }
2027
2028        let dir = tempfile::tempdir().unwrap();
2029        let out = write_gridfm_dataset(&net, 0, dir.path(), &GridfmOptions::default()).unwrap();
2030        assert_eq!(out.dropped_zero_impedance, 1);
2031        let meta: serde_json::Value = serde_json::from_str(
2032            &std::fs::read_to_string(out.dir.join("gridfm_meta.json")).unwrap(),
2033        )
2034        .unwrap();
2035        assert_eq!(meta["dropped_zero_impedance"], 1);
2036    }
2037
2038    #[test]
2039    fn gridfm_cost_maps_every_arm_to_raw_coefficients() {
2040        // Polynomial coeffs are highest-order first: [c2, c1, c0] -> (cp0, cp1, cp2).
2041        assert_eq!(
2042            gridfm_cost(Some(&gencost(2, 3, vec![2.0, 3.0, 4.0]))),
2043            (4.0, 3.0, 2.0)
2044        );
2045        assert_eq!(
2046            gridfm_cost(Some(&gencost(2, 2, vec![3.0, 4.0]))),
2047            (4.0, 3.0, 0.0)
2048        );
2049        assert_eq!(
2050            gridfm_cost(Some(&gencost(2, 1, vec![4.0]))),
2051            (4.0, 0.0, 0.0)
2052        );
2053        // Unrepresentable rows collapse to zeros and report as not representable.
2054        let piecewise = gencost(1, 2, vec![0.0, 0.0, 1.0, 1.0]);
2055        let malformed = gencost(2, 3, vec![1.0]); // fewer coeffs than ncost claims
2056        assert_eq!(gridfm_cost(Some(&piecewise)), (0.0, 0.0, 0.0));
2057        assert_eq!(gridfm_cost(Some(&malformed)), (0.0, 0.0, 0.0));
2058        assert_eq!(gridfm_cost(None), (0.0, 0.0, 0.0));
2059        assert!(!cost_representable(Some(&piecewise)));
2060        assert!(!cost_representable(Some(&malformed)));
2061        assert!(!cost_representable(None));
2062        assert!(cost_representable(Some(&gencost(
2063            2,
2064            3,
2065            vec![1.0, 2.0, 3.0]
2066        ))));
2067    }
2068
2069    #[test]
2070    fn missing_reference_bus_errors() {
2071        // gridfm_record_batches' documented precondition: exactly one ref bus.
2072        let net = Network::in_memory(
2073            "noref",
2074            100.0,
2075            vec![bus(1, BusType::Pq), bus(2, BusType::Pq)],
2076            vec![branch(1, 2, 0.01, 0.1)],
2077        );
2078        let err = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap_err();
2079        assert!(
2080            matches!(err, Error::ReferenceBusCount { .. }),
2081            "got {err:?}"
2082        );
2083    }
2084
2085    #[test]
2086    fn non_finite_bus_voltage_errors_before_parquet() {
2087        let mut net = case14();
2088        net.buses[0].vm = f64::NAN;
2089        let err = gridfm_record_batches(&net, 7, &GridfmOptions::default()).unwrap_err();
2090        match err {
2091            Error::NonFiniteGridfmValue {
2092                scenario,
2093                element,
2094                row,
2095                field,
2096                value,
2097            } => {
2098                assert_eq!(scenario, 7);
2099                assert_eq!(element, "bus");
2100                assert_eq!(row, 0);
2101                assert_eq!(field, "vm");
2102                assert!(value.is_nan());
2103            }
2104            other => panic!("expected NonFiniteGridfmValue, got {other:?}"),
2105        }
2106    }
2107
2108    #[test]
2109    fn non_finite_tap_errors_even_without_y_bus_table() {
2110        let mut net = case14();
2111        net.branches[0].tap = f64::NAN;
2112        let opts = GridfmOptions {
2113            include_y_bus: false,
2114            ..Default::default()
2115        };
2116        let err = gridfm_record_batches(&net, 0, &opts).unwrap_err();
2117        assert!(
2118            matches!(
2119                err,
2120                Error::NonFiniteGridfmValue {
2121                    element: "branch",
2122                    row: 0,
2123                    field: "tap",
2124                    ..
2125                }
2126            ),
2127            "got {err:?}"
2128        );
2129    }
2130
2131    #[test]
2132    fn normalized_snapshot_is_rejected_in_release_builds() {
2133        let net = case14().to_normalized().unwrap();
2134        let err = gridfm_record_batches(&net, 3, &GridfmOptions::default()).unwrap_err();
2135        assert!(
2136            matches!(err, Error::NormalizedGridfmSnapshot { scenario: 3 }),
2137            "got {err:?}"
2138        );
2139    }
2140
2141    #[test]
2142    fn non_finite_representable_cost_errors() {
2143        let mut net = Network::in_memory(
2144            "badcost",
2145            100.0,
2146            vec![bus(1, BusType::Ref), bus(2, BusType::Pq)],
2147            vec![branch(1, 2, 0.01, 0.1)],
2148        );
2149        net.generators
2150            .push(gen_at(1, gencost(2, 3, vec![f64::NAN, 1.0, 0.0])));
2151
2152        // coeffs are highest-order first, so the NaN lands in the cp2 column.
2153        let err = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap_err();
2154        assert!(
2155            matches!(
2156                err,
2157                Error::NonFiniteGridfmValue {
2158                    element: "gencost",
2159                    row: 0,
2160                    field: "cp2",
2161                    ..
2162                }
2163            ),
2164            "got {err:?}"
2165        );
2166    }
2167
2168    #[test]
2169    fn unbounded_limits_export_as_infinity() {
2170        // PowerModels defaults absent gen limits to ±Inf and MATPOWER carries
2171        // literal Inf tokens; the unbounded sentinel must reach the Parquet
2172        // columns, not abort the export (only NaN is corrupt in a limit).
2173        let mut net = case14();
2174        net.generators[0].qmax = f64::INFINITY;
2175        net.generators[0].qmin = f64::NEG_INFINITY;
2176        net.generators[1].pmax = f64::INFINITY;
2177        net.branches[0].angmin = f64::NEG_INFINITY;
2178        net.branches[0].angmax = f64::INFINITY;
2179        net.branches[1].rate_a = f64::INFINITY;
2180        net.buses[0].vmax = f64::INFINITY;
2181        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
2182        let qmax = col_f64(&tables.generator, "max_q_mvar");
2183        assert!(qmax.value(0).is_infinite() && qmax.value(0) > 0.0);
2184    }
2185
2186    #[test]
2187    fn nan_limit_still_errors() {
2188        let mut net = case14();
2189        net.generators[0].qmax = f64::NAN;
2190        let err = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap_err();
2191        assert!(
2192            matches!(
2193                err,
2194                Error::NonFiniteGridfmValue {
2195                    element: "generator",
2196                    field: "qmax",
2197                    ..
2198                }
2199            ),
2200            "got {err:?}"
2201        );
2202    }
2203
2204    /// case14 with every load and generator setpoint scaled — a perturbed
2205    /// operating point on the same topology, the scenario-batch invariant.
2206    fn scaled(net: &Network, factor: f64) -> Network {
2207        let mut s = net.clone();
2208        for l in &mut s.loads {
2209            l.p *= factor;
2210            l.q *= factor;
2211        }
2212        for g in &mut s.generators {
2213            g.pg *= factor;
2214            g.qg *= factor;
2215        }
2216        s
2217    }
2218
2219    #[test]
2220    fn batch_stacks_scenarios_keyed_by_scenario_column() {
2221        let base = case14();
2222        let up = scaled(&base, 1.1);
2223        let down = scaled(&base, 0.9);
2224        let snaps = [
2225            GridfmSnapshot {
2226                net: &base,
2227                scenario: 0,
2228            },
2229            GridfmSnapshot {
2230                net: &up,
2231                scenario: 1,
2232            },
2233            GridfmSnapshot {
2234                net: &down,
2235                scenario: 2,
2236            },
2237        ];
2238        let tables = gridfm_record_batches_batch(&snaps, &GridfmOptions::default()).unwrap();
2239
2240        // Schema is unchanged; rows are 3× the single-snapshot counts.
2241        assert_eq!(names(&tables.bus), BUS_COLS);
2242        assert_eq!(names(&tables.branch), BRANCH_COLS);
2243        assert_eq!(tables.bus.num_rows(), 3 * base.buses.len());
2244        assert_eq!(tables.generator.num_rows(), 3 * base.generators.len());
2245        assert_eq!(tables.branch.num_rows(), 3 * base.branches.len());
2246
2247        // The scenario column is blocked 0.., and the dense bus index resets to
2248        // 0..n within each scenario.
2249        let n = base.buses.len();
2250        let scen = col_i64(&tables.bus, "scenario");
2251        let lsi = col_i64(&tables.bus, "load_scenario_idx");
2252        let bus_idx = col_i64(&tables.bus, "bus");
2253        for k in 0..3 {
2254            for i in 0..n {
2255                let row = k * n + i;
2256                assert_eq!(scen.value(row), k as i64);
2257                assert_eq!(lsi.value(row), k as i64);
2258                assert_eq!(bus_idx.value(row), i as i64);
2259            }
2260        }
2261
2262        // The first scenario's rows match the standalone single-case tables, so
2263        // batching is a pure row-stack over the established single-snapshot path.
2264        // Compare every column bit-exactly (not just one), so a per-column offset
2265        // or ordering regression in the row-stack can't slip through.
2266        let single = gridfm_record_batches(&base, 0, &GridfmOptions::default()).unwrap();
2267        let bit_exact = |b: &RecordBatch, s: &RecordBatch, col: &str, rows: usize| {
2268            let (bb, ss) = (col_f64(b, col), col_f64(s, col));
2269            for i in 0..rows {
2270                assert_eq!(
2271                    bb.value(i).to_bits(),
2272                    ss.value(i).to_bits(),
2273                    "scenario-0 {col}[{i}] differs from the single-case path"
2274                );
2275            }
2276        };
2277        for col in ["Pd", "Qd", "Pg", "Qg", "Vm", "Va", "GS", "BS"] {
2278            bit_exact(&tables.bus, &single.bus, col, n);
2279        }
2280        bit_exact(
2281            &tables.generator,
2282            &single.generator,
2283            "p_mw",
2284            base.generators.len(),
2285        );
2286        bit_exact(&tables.branch, &single.branch, "pf", base.branches.len());
2287
2288        // The perturbed scenario's load really differs (guards against stamping
2289        // the same network three times).
2290        let pd_batch = col_f64(&tables.bus, "Pd");
2291        let pd_single = col_f64(&single.bus, "Pd");
2292        assert!((pd_batch.value(n) - 1.1 * pd_single.value(0)).abs() < 1e-9);
2293    }
2294
2295    #[test]
2296    fn batch_dataset_writes_stacked_parquet_with_scenario_count() {
2297        let base = case14();
2298        let up = scaled(&base, 1.25);
2299        let snaps = [
2300            GridfmSnapshot {
2301                net: &base,
2302                scenario: 0,
2303            },
2304            GridfmSnapshot {
2305                net: &up,
2306                scenario: 1,
2307            },
2308        ];
2309        let dir = tempfile::tempdir().unwrap();
2310        let out = write_gridfm_batch(&snaps, dir.path(), &GridfmOptions::default()).unwrap();
2311
2312        let bus = read(&out.dir.join("bus_data.parquet"));
2313        assert_eq!(bus.num_rows(), 2 * base.buses.len());
2314        let scen = col_i64(&bus, "scenario");
2315        assert_eq!(scen.value(0), 0);
2316        assert_eq!(scen.value(base.buses.len()), 1);
2317
2318        let meta: serde_json::Value = serde_json::from_str(
2319            &std::fs::read_to_string(out.dir.join("gridfm_meta.json")).unwrap(),
2320        )
2321        .unwrap();
2322        assert_eq!(meta["n_scenarios"], 2);
2323        assert_eq!(meta["scenario"], 0);
2324    }
2325
2326    #[test]
2327    fn empty_batch_errors() {
2328        let err = gridfm_record_batches_batch(&[], &GridfmOptions::default()).unwrap_err();
2329        assert!(matches!(err, Error::EmptyScenarioBatch), "got {err:?}");
2330    }
2331
2332    #[test]
2333    fn shape_mismatch_across_snapshots_errors() {
2334        let big = case14();
2335        let small = Network::in_memory(
2336            "small",
2337            100.0,
2338            vec![bus(1, BusType::Ref), bus(2, BusType::Pq)],
2339            vec![branch(1, 2, 0.01, 0.1)],
2340        );
2341        let snaps = [
2342            GridfmSnapshot {
2343                net: &big,
2344                scenario: 0,
2345            },
2346            GridfmSnapshot {
2347                net: &small,
2348                scenario: 1,
2349            },
2350        ];
2351        let err = gridfm_record_batches_batch(&snaps, &GridfmOptions::default()).unwrap_err();
2352        assert!(
2353            matches!(
2354                err,
2355                Error::ScenarioShapeMismatch {
2356                    index: 1,
2357                    reason: ScenarioMismatch::Counts { .. }
2358                }
2359            ),
2360            "got {err:?}"
2361        );
2362    }
2363
2364    #[test]
2365    fn bus_order_mismatch_is_reported_distinctly() {
2366        // Same counts and the same bus-id set, but a different ordering: the dense
2367        // bus index would mean different buses across snapshots, so the batch is
2368        // rejected with the BusOrder reason (not the same-tuple Counts message).
2369        let base = case14();
2370        let mut reordered = base.clone();
2371        reordered.buses.swap(0, 1);
2372        let snaps = [
2373            GridfmSnapshot {
2374                net: &base,
2375                scenario: 0,
2376            },
2377            GridfmSnapshot {
2378                net: &reordered,
2379                scenario: 1,
2380            },
2381        ];
2382        let err = gridfm_record_batches_batch(&snaps, &GridfmOptions::default()).unwrap_err();
2383        assert!(
2384            matches!(
2385                err,
2386                Error::ScenarioShapeMismatch {
2387                    index: 1,
2388                    reason: ScenarioMismatch::BusOrder
2389                }
2390            ),
2391            "got {err:?}"
2392        );
2393    }
2394
2395    #[test]
2396    fn manifest_counts_sum_over_the_batch() {
2397        // Two snapshots on the same element set, but only the second has a zeroed
2398        // branch impedance. The manifest's dropped count describes the whole
2399        // dataset (a total of 1), not just the first snapshot — branch status and
2400        // impedance may legitimately differ per scenario.
2401        let base = case14();
2402        let mut perturbed = base.clone();
2403        perturbed.branches[0].r = 0.0;
2404        perturbed.branches[0].x = 0.0;
2405        let snaps = [
2406            GridfmSnapshot {
2407                net: &base,
2408                scenario: 0,
2409            },
2410            GridfmSnapshot {
2411                net: &perturbed,
2412                scenario: 1,
2413            },
2414        ];
2415        let dir = tempfile::tempdir().unwrap();
2416        let out = write_gridfm_batch(&snaps, dir.path(), &GridfmOptions::default()).unwrap();
2417        assert_eq!(out.dropped_zero_impedance, 1);
2418        let meta: serde_json::Value = serde_json::from_str(
2419            &std::fs::read_to_string(out.dir.join("gridfm_meta.json")).unwrap(),
2420        )
2421        .unwrap();
2422        assert_eq!(meta["dropped_zero_impedance"], 1);
2423    }
2424
2425    #[test]
2426    fn y_bus_table_is_absent_when_disabled() {
2427        let net = case14();
2428        let opts = GridfmOptions {
2429            include_y_bus: false,
2430            ..Default::default()
2431        };
2432        let tables = gridfm_record_batches(&net, 0, &opts).unwrap();
2433        assert!(tables.y_bus.is_none(), "y_bus should not be built");
2434
2435        let dir = tempfile::tempdir().unwrap();
2436        let out = write_gridfm_dataset(&net, 0, dir.path(), &opts).unwrap();
2437        assert!(
2438            !out.dir.join("y_bus_data.parquet").exists(),
2439            "y_bus_data.parquet should not be written"
2440        );
2441    }
2442
2443    #[test]
2444    fn numbered_snapshots_stamps_base_plus_k_and_checks_overflow() {
2445        // The shared builder both bindings use: the k-th network is scenario
2446        // `base + k`, in order.
2447        let net = case14();
2448        let snaps = numbered_snapshots(&[&net, &net, &net], 5).unwrap();
2449        assert_eq!(snaps.len(), 3);
2450        assert_eq!(snaps[0].scenario, 5);
2451        assert_eq!(snaps[1].scenario, 6);
2452        assert_eq!(snaps[2].scenario, 7);
2453
2454        // Overflow is checked (not wrapped to a negative id, not a panic) and names
2455        // the offending index.
2456        let err = numbered_snapshots(&[&net, &net], i64::MAX).unwrap_err();
2457        assert!(
2458            matches!(err, Error::ScenarioIdOverflow { index: 1, .. }),
2459            "got {err:?}"
2460        );
2461    }
2462
2463    #[test]
2464    fn out_of_service_generator_is_listed_but_excluded_from_bus_aggregate() {
2465        // Two paths react to `g.in_service`: gen_data emits an `in_service` column
2466        // for every generator (keeping its setpoint), while bus `Pg`/`Qg` aggregate
2467        // only in-service generation (`view.in_service_gens()`). Exercise the
2468        // `false` case on both.
2469        let mut net = Network::in_memory(
2470            "genoutage",
2471            100.0,
2472            vec![bus(1, BusType::Ref), bus(2, BusType::Pq)],
2473            vec![branch(1, 2, 0.01, 0.1)],
2474        );
2475        let mut g_on = gen_at(1, gencost(2, 3, vec![0.0, 1.0, 0.0]));
2476        g_on.pg = 50.0;
2477        let mut g_off = gen_at(2, gencost(2, 3, vec![0.0, 1.0, 0.0]));
2478        g_off.pg = 30.0;
2479        g_off.in_service = false;
2480        net.generators.push(g_on);
2481        net.generators.push(g_off);
2482
2483        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
2484
2485        // gen_data lists both gens in source order, flags the out-of-service one,
2486        // and keeps its setpoint.
2487        let g = &tables.generator;
2488        assert_eq!(g.num_rows(), 2);
2489        let in_service = col_i64(g, "in_service");
2490        assert_eq!(in_service.value(0), 1, "in-service gen flagged 1");
2491        assert_eq!(in_service.value(1), 0, "out-of-service gen flagged 0");
2492        assert!(
2493            (col_f64(g, "p_mw").value(1) - 30.0).abs() < 1e-12,
2494            "gen_data keeps the out-of-service setpoint"
2495        );
2496
2497        // bus Pg aggregates only in-service generation: bus 1 (dense 0) gets 50,
2498        // bus 2 (dense 1) excludes the out-of-service gen's 30.
2499        let pg = col_f64(&tables.bus, "Pg");
2500        assert!(
2501            (pg.value(0) - 50.0).abs() < 1e-12,
2502            "in-service gen folded into bus Pg"
2503        );
2504        assert!(
2505            pg.value(1) == 0.0,
2506            "out-of-service gen excluded from bus Pg, got {}",
2507            pg.value(1)
2508        );
2509    }
2510
2511    #[test]
2512    fn out_of_service_branch_zeros_flows_but_keeps_admittance() {
2513        // An out-of-service branch keeps its physical Y** admittances but carries
2514        // zero flows and `br_status = 0` — the path datakit's topology variants
2515        // exercise. Use non-flat voltages so an *in-service* branch carries real
2516        // flow, which makes the zero on the tripped branch meaningful (not just an
2517        // artifact of a flat start).
2518        let mut net = Network::in_memory(
2519            "outage",
2520            100.0,
2521            vec![
2522                bus(1, BusType::Ref),
2523                bus(2, BusType::Pq),
2524                bus(3, BusType::Pq),
2525            ],
2526            vec![branch(1, 2, 0.01, 0.1), branch(2, 3, 0.02, 0.2)],
2527        );
2528        net.buses[1].va = -3.0;
2529        net.buses[2].va = -6.0;
2530        net.branches[0].in_service = false; // trip branch 0
2531
2532        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
2533        let br = &tables.branch;
2534        let status = col_i64(br, "br_status");
2535        assert_eq!(status.value(0), 0, "tripped branch reports br_status 0");
2536        assert_eq!(status.value(1), 1, "in-service branch reports br_status 1");
2537
2538        for col in ["pf", "qf", "pt", "qt"] {
2539            let v = col_f64(br, col).value(0);
2540            assert!(
2541                v == 0.0,
2542                "{col} must be zero on the out-of-service branch, got {v}"
2543            );
2544        }
2545        // The in-service branch really carries flow at these voltages — guards
2546        // against a flat start false pass that would zero every branch anyway.
2547        assert!(
2548            col_f64(br, "pf").value(1).abs() > 1e-6,
2549            "in-service branch should carry nonzero flow"
2550        );
2551        // Admittances are retained for the tripped branch (unlike a zero-impedance
2552        // branch, which zeroes them).
2553        assert!(
2554            col_f64(br, "Yff_i").value(0).abs() > 0.0,
2555            "out-of-service branch keeps its physical Y** admittances"
2556        );
2557    }
2558
2559    fn bus(id: usize, kind: BusType) -> Bus {
2560        Bus::new(BusId(id), kind, 1.0)
2561    }
2562
2563    fn branch(from: usize, to: usize, r: f64, x: f64) -> Branch {
2564        Branch::new(BusId(from), BusId(to), r, x)
2565    }
2566
2567    fn gencost(model: u8, ncost: usize, coeffs: Vec<f64>) -> GenCost {
2568        GenCost::with_ncost(model, 0.0, 0.0, ncost, coeffs)
2569    }
2570
2571    fn gen_at(bus: usize, cost: GenCost) -> Generator {
2572        let mut generator = Generator::new(BusId(bus));
2573        generator.pmax = 100.0;
2574        generator.qmax = 50.0;
2575        generator.qmin = -50.0;
2576        generator.mbase = 100.0;
2577        generator.cost = Some(cost);
2578        generator
2579    }
2580
2581    #[test]
2582    fn degenerate_cost_gen_zeros_columns_and_is_counted() {
2583        // Counterpart to the zero-impedance test: a piecewise (model 1) cost row
2584        // gets zeroed cp* columns and is counted; a polynomial row is kept.
2585        let mut net = Network::in_memory(
2586            "degen",
2587            100.0,
2588            vec![bus(1, BusType::Ref), bus(2, BusType::Pq)],
2589            vec![branch(1, 2, 0.01, 0.1)],
2590        );
2591        net.generators
2592            .push(gen_at(1, gencost(1, 2, vec![0.0, 0.0, 1.0, 1.0]))); // piecewise
2593        net.generators
2594            .push(gen_at(2, gencost(2, 3, vec![0.01, 5.0, 0.0]))); // polynomial
2595
2596        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
2597        let g = &tables.generator;
2598        let (cp0, cp1, cp2) = (
2599            col_f64(g, "cp0_eur"),
2600            col_f64(g, "cp1_eur_per_mw"),
2601            col_f64(g, "cp2_eur_per_mw2"),
2602        );
2603        assert_eq!((cp0.value(0), cp1.value(0), cp2.value(0)), (0.0, 0.0, 0.0));
2604        assert_eq!((cp0.value(1), cp1.value(1), cp2.value(1)), (0.0, 5.0, 0.01));
2605
2606        let dir = tempfile::tempdir().unwrap();
2607        let out = write_gridfm_dataset(&net, 0, dir.path(), &GridfmOptions::default()).unwrap();
2608        assert_eq!(out.degenerate_cost_gens, 1);
2609        assert_eq!(out.missing_cost_gens, 0);
2610        assert_eq!(out.unsupported_cost_gens, 1);
2611        let meta: serde_json::Value = serde_json::from_str(
2612            &std::fs::read_to_string(out.dir.join("gridfm_meta.json")).unwrap(),
2613        )
2614        .unwrap();
2615        assert_eq!(meta["degenerate_cost_gens"], 1);
2616        assert_eq!(meta["missing_cost_gens"], 0);
2617        assert_eq!(meta["unsupported_cost_gens"], 1);
2618        assert_eq!(meta["zeroed_cost_gens"], 1);
2619    }
2620
2621    #[test]
2622    fn missing_gridfm_costs_are_split_and_fill_policy_reduces_missing_count() {
2623        let mut net = Network::in_memory(
2624            "missing-cost",
2625            100.0,
2626            vec![bus(1, BusType::Ref), bus(2, BusType::Pq)],
2627            vec![branch(1, 2, 0.01, 0.1)],
2628        );
2629        let mut missing = gen_at(1, gencost(2, 3, vec![1.0, 2.0, 3.0]));
2630        missing.cost = None;
2631        net.generators.push(missing);
2632        net.generators
2633            .push(gen_at(2, gencost(1, 2, vec![0.0, 0.0, 1.0, 1.0])));
2634
2635        let dir = tempfile::tempdir().unwrap();
2636        let out = write_gridfm_dataset(&net, 0, dir.path(), &GridfmOptions::default()).unwrap();
2637        assert_eq!(out.degenerate_cost_gens, 2);
2638        assert_eq!(out.missing_cost_gens, 1);
2639        assert_eq!(out.unsupported_cost_gens, 1);
2640
2641        let opts = GridfmOptions {
2642            missing_gen_cost: MissingGenCostPolicy::zero(),
2643            ..Default::default()
2644        };
2645        let dir = tempfile::tempdir().unwrap();
2646        let out = write_gridfm_dataset(&net, 0, dir.path(), &opts).unwrap();
2647        assert_eq!(out.synthesized_gen_costs, 1);
2648        assert_eq!(out.missing_cost_gens, 0);
2649        assert_eq!(out.unsupported_cost_gens, 1);
2650        assert_eq!(out.degenerate_cost_gens, 1);
2651        let meta: serde_json::Value = serde_json::from_str(
2652            &std::fs::read_to_string(out.dir.join("gridfm_meta.json")).unwrap(),
2653        )
2654        .unwrap();
2655        assert_eq!(meta["cost_policy"]["mode"], "fill");
2656        assert_eq!(meta["synthesized_gen_costs"], 1);
2657    }
2658
2659    #[test]
2660    fn scenario_id_and_tap_toggle_take_effect() {
2661        let net = case14();
2662
2663        // The scenario id (an explicit argument now) reaches both id columns.
2664        let bus = gridfm_record_batches(&net, 7, &GridfmOptions::default())
2665            .unwrap()
2666            .bus;
2667        assert_eq!(col_i64(&bus, "scenario").value(0), 7);
2668        assert_eq!(col_i64(&bus, "load_scenario_idx").value(0), 7);
2669
2670        // Turning taps off changes a transformer's admittance columns.
2671        let on = gridfm_record_batches(&net, 0, &GridfmOptions::default())
2672            .unwrap()
2673            .branch;
2674        let off = gridfm_record_batches(
2675            &net,
2676            0,
2677            &GridfmOptions {
2678                include_taps: false,
2679                ..Default::default()
2680            },
2681        )
2682        .unwrap()
2683        .branch;
2684        let tap = col_f64(&on, "tap");
2685        let xfmr = (0..on.num_rows())
2686            .find(|&i| (tap.value(i) - 1.0).abs() > 1e-9)
2687            .expect("case14 has off-nominal transformers");
2688        // case14 transformers are lossless (r = 0), so compare the susceptance
2689        // (imaginary) part, which scales with 1/tap².
2690        assert!(
2691            (col_f64(&on, "Yff_i").value(xfmr) - col_f64(&off, "Yff_i").value(xfmr)).abs() > 1e-9,
2692            "taps off should change the transformer's Yff"
2693        );
2694    }
2695
2696    // --- reader (issue #60) ------------------------------------------------
2697
2698    /// The power flow complete fingerprint the read path preserves: element
2699    /// counts, nodal load / generation totals, branch impedance sums, base_mva,
2700    /// and the reference-bus count. Bus ids, per-element granularity, costs, and
2701    /// HVDC/storage are excluded from that guarantee.
2702    #[allow(clippy::type_complexity)]
2703    fn fingerprint(
2704        net: &Network,
2705    ) -> (
2706        usize,
2707        usize,
2708        usize,
2709        usize,
2710        f64,
2711        f64,
2712        f64,
2713        f64,
2714        f64,
2715        f64,
2716        f64,
2717    ) {
2718        (
2719            net.buses.len(),
2720            net.branches.len(),
2721            net.generators.len(),
2722            net.buses.iter().filter(|b| b.kind == BusType::Ref).count(),
2723            net.loads.iter().map(|l| l.p).sum(),
2724            net.loads.iter().map(|l| l.q).sum(),
2725            net.generators.iter().map(|g| g.pg).sum(),
2726            net.branches.iter().map(|b| b.r).sum(),
2727            net.branches.iter().map(|b| b.x).sum(),
2728            net.branches.iter().map(|b| b.b).sum(),
2729            net.base_mva,
2730        )
2731    }
2732
2733    fn assert_fingerprint_close(got: &Network, want: &Network) {
2734        let (g, w) = (fingerprint(got), fingerprint(want));
2735        assert_eq!(
2736            (g.0, g.1, g.2, g.3),
2737            (w.0, w.1, w.2, w.3),
2738            "bus/branch/gen/ref counts differ"
2739        );
2740        for (a, b, label) in [
2741            (g.4, w.4, "load P"),
2742            (g.5, w.5, "load Q"),
2743            (g.6, w.6, "gen P"),
2744            (g.7, w.7, "sum r"),
2745            (g.8, w.8, "sum x"),
2746            (g.9, w.9, "sum b"),
2747            (g.10, w.10, "base_mva"),
2748        ] {
2749            assert!((a - b).abs() < 1e-9, "{label} differs: {a} vs {b}");
2750        }
2751    }
2752
2753    #[test]
2754    fn read_round_trips_power_flow_fingerprint() {
2755        let net = case14();
2756        let dir = tempfile::tempdir().unwrap();
2757        write_gridfm_dataset(&net, 0, dir.path(), &GridfmOptions::default()).unwrap();
2758
2759        let read = read_gridfm_dataset(dir.path().join("case14").join("raw"), 0).unwrap();
2760        assert_eq!(read.scenario, 0);
2761        assert_eq!(read.network.source_format, SourceFormat::Gridfm);
2762        assert_eq!(read.network.name, "case14");
2763        assert!(read.network.source.is_none());
2764        assert_fingerprint_close(&read.network, &net);
2765        // The reconstruction is structurally valid (validate() already ran inside).
2766        read.network.validate().unwrap();
2767    }
2768
2769    #[test]
2770    fn read_gridfm_network_pure_path_matches_disk() {
2771        // The in-memory inverse of gridfm_record_batches reproduces the same
2772        // fingerprint with no disk I/O.
2773        let net = case14();
2774        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
2775        let read = read_gridfm_network(&tables, 0, net.base_mva, &net.name).unwrap();
2776        assert_fingerprint_close(&read.network, &net);
2777    }
2778
2779    #[test]
2780    fn read_recovers_shunt_at_base_mva() {
2781        // case14 has a single bus shunt (Bs = 19 at bus 9). The writer divides by
2782        // base_mva; the reader must multiply it back.
2783        let net = case14();
2784        let want_b: f64 = net.shunts.iter().map(|s| s.b).sum();
2785        assert!(want_b.abs() > 1.0, "fixture should have a real shunt");
2786
2787        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
2788        let read = read_gridfm_network(&tables, 0, net.base_mva, &net.name).unwrap();
2789        let got_b: f64 = read.network.shunts.iter().map(|s| s.b).sum();
2790        assert!(
2791            (got_b - want_b).abs() < 1e-9,
2792            "shunt b not recovered at base_mva: {got_b} vs {want_b}"
2793        );
2794    }
2795
2796    #[test]
2797    fn read_scenarios_yields_distinct_networks() {
2798        // A 2-scenario batch (base + load×1.1): each scenario reads back to its own
2799        // Network, and gridfm_base_case picks scenario 0.
2800        let base = case14();
2801        let up = scaled(&base, 1.1);
2802        let snaps = [
2803            GridfmSnapshot {
2804                net: &base,
2805                scenario: 0,
2806            },
2807            GridfmSnapshot {
2808                net: &up,
2809                scenario: 1,
2810            },
2811        ];
2812        let dir = tempfile::tempdir().unwrap();
2813        let out = write_gridfm_batch(&snaps, dir.path(), &GridfmOptions::default()).unwrap();
2814
2815        let reads = read_gridfm_scenarios(&out.dir).unwrap();
2816        assert_eq!(reads.len(), 2);
2817        assert_eq!((reads[0].scenario, reads[1].scenario), (0, 1));
2818
2819        let load0: f64 = reads[0].network.loads.iter().map(|l| l.p).sum();
2820        let load1: f64 = reads[1].network.loads.iter().map(|l| l.p).sum();
2821        assert!(load0 > 0.0);
2822        assert!(
2823            (load1 - 1.1 * load0).abs() < 1e-6,
2824            "scenario 1 load should be 1.1× scenario 0: {load1} vs {load0}"
2825        );
2826
2827        let base_case = gridfm_base_case(&out.dir).unwrap();
2828        assert_fingerprint_close(&base_case.network, &reads[0].network);
2829    }
2830
2831    #[test]
2832    fn read_resolves_lenient_directory_layouts() {
2833        // resolve_raw_dir accepts the leaf raw/ dir, the <case>/ dir, and the
2834        // parent out/ dir.
2835        let net = case14();
2836        let dir = tempfile::tempdir().unwrap();
2837        write_gridfm_dataset(&net, 0, dir.path(), &GridfmOptions::default()).unwrap();
2838        let out = dir.path(); // parent
2839        let case_dir = out.join("case14");
2840        let raw_dir = case_dir.join("raw");
2841        for d in [raw_dir.clone(), case_dir, out.to_path_buf()] {
2842            let read = read_gridfm_dataset(&d, 0)
2843                .unwrap_or_else(|e| panic!("failed to resolve {}: {e}", d.display()));
2844            assert_eq!(read.network.buses.len(), net.buses.len());
2845        }
2846    }
2847
2848    #[test]
2849    fn read_missing_scenario_errors() {
2850        let net = case14();
2851        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
2852        let err = read_gridfm_network(&tables, 99, net.base_mva, &net.name).unwrap_err();
2853        assert!(
2854            matches!(
2855                err,
2856                Error::FormatRead {
2857                    format: "gridfm",
2858                    ..
2859                }
2860            ),
2861            "got {err:?}"
2862        );
2863    }
2864
2865    #[test]
2866    fn read_no_dataset_errors() {
2867        // An empty (but existing) directory: read_dir succeeds, finds nothing.
2868        let dir = tempfile::tempdir().unwrap();
2869        let err = read_gridfm_dataset(dir.path(), 0).unwrap_err();
2870        assert!(
2871            matches!(
2872                err,
2873                Error::FormatRead {
2874                    format: "gridfm",
2875                    ..
2876                }
2877            ),
2878            "got {err:?}"
2879        );
2880        // A non-existent directory: read_dir's IO error is surfaced, not masked.
2881        let missing = dir.path().join("does-not-exist");
2882        let err = read_gridfm_dataset(&missing, 0).unwrap_err();
2883        assert!(
2884            matches!(
2885                err,
2886                Error::FormatRead {
2887                    format: "gridfm",
2888                    ..
2889                }
2890            ),
2891            "got {err:?}"
2892        );
2893    }
2894
2895    #[test]
2896    fn read_defaults_unusable_base_mva_to_100() {
2897        // A manifest base_mva of 0 (or negative/NaN) is unusable — shunt recovery
2898        // scales by it — so the reader defaults to 100 and warns instead of
2899        // silently producing a network with zeroed shunts.
2900        let net = case14();
2901        let dir = tempfile::tempdir().unwrap();
2902        let out = write_gridfm_dataset(&net, 0, dir.path(), &GridfmOptions::default()).unwrap();
2903        let meta_path = out.dir.join("gridfm_meta.json");
2904        let mut meta: serde_json::Value =
2905            serde_json::from_str(&std::fs::read_to_string(&meta_path).unwrap()).unwrap();
2906        meta["base_mva"] = serde_json::json!(0.0);
2907        std::fs::write(&meta_path, serde_json::to_string(&meta).unwrap()).unwrap();
2908
2909        let read = read_gridfm_dataset(&out.dir, 0).unwrap();
2910        assert!(
2911            (read.network.base_mva - 100.0).abs() < 1e-9,
2912            "base_mva should default to 100, got {}",
2913            read.network.base_mva
2914        );
2915        assert!(
2916            read.warnings.iter().any(|w| w.contains("base_mva")),
2917            "expected a base_mva warning, got {:?}",
2918            read.warnings
2919        );
2920    }
2921
2922    #[test]
2923    fn read_surfaces_fidelity_warnings() {
2924        let net = case14();
2925        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
2926        let read = read_gridfm_network(&tables, 0, net.base_mva, &net.name).unwrap();
2927        assert!(!read.warnings.is_empty());
2928        assert!(
2929            read.warnings
2930                .iter()
2931                .any(|w| w.contains("synthesized bus ids")),
2932            "expected the bus-id synthesis warning, got {:?}",
2933            read.warnings
2934        );
2935        // case14 has loads and a shunt, so those folding warnings appear too.
2936        assert!(read.warnings.iter().any(|w| w.contains("nodal load")));
2937        assert!(read.warnings.iter().any(|w| w.contains("nodal shunts")));
2938    }
2939
2940    #[test]
2941    fn read_recovers_gen_vg_from_bus_vm() {
2942        // gridfm has no gen vg column; vg is recovered from the gen's bus Vm.
2943        // case14's slack bus 1 sits at Vm = 1.06, so its generator reads vg ≈ 1.06
2944        // (not the old hard-coded 1.0).
2945        let net = case14();
2946        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
2947        let read = read_gridfm_network(&tables, 0, net.base_mva, &net.name).unwrap();
2948        for g in &read.network.generators {
2949            let bus = read
2950                .network
2951                .buses
2952                .iter()
2953                .find(|b| b.id == g.bus)
2954                .expect("gen references a known bus");
2955            assert!(
2956                (g.vg - bus.vm).abs() < 1e-12,
2957                "vg should equal the bus Vm: {} vs {}",
2958                g.vg,
2959                bus.vm
2960            );
2961        }
2962        assert!(
2963            read.network
2964                .generators
2965                .iter()
2966                .any(|g| (g.vg - 1.0).abs() > 1e-3),
2967            "expected a generator with vg != 1.0 (case14's slack is at 1.06)"
2968        );
2969    }
2970
2971    #[test]
2972    fn read_maps_unit_tap_lines_back_to_zero() {
2973        // The writer stores effective tap (a line's raw 0 becomes 1.0). The reader
2974        // must map unit tap + no shift back to raw tap 0 so lines stay lines;
2975        // otherwise every line reads as a transformer and a read→write to PSS/E /
2976        // PowerWorld misclassifies them. case14 has both lines and off-nominal
2977        // transformers, so the line/transformer split must survive the round trip.
2978        let net = case14();
2979        let n_lines = net.branches.iter().filter(|b| !b.is_transformer()).count();
2980        let n_xfmr = net.branches.iter().filter(|b| b.is_transformer()).count();
2981        assert!(
2982            n_lines > 0 && n_xfmr > 0,
2983            "fixture needs both lines and transformers"
2984        );
2985
2986        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
2987        let read = read_gridfm_network(&tables, 0, net.base_mva, &net.name).unwrap();
2988        let read_lines = read
2989            .network
2990            .branches
2991            .iter()
2992            .filter(|b| !b.is_transformer())
2993            .count();
2994        let read_xfmr = read
2995            .network
2996            .branches
2997            .iter()
2998            .filter(|b| b.is_transformer())
2999            .count();
3000        assert_eq!(
3001            read_lines, n_lines,
3002            "lines must read back as lines (raw tap 0)"
3003        );
3004        assert_eq!(
3005            read_xfmr, n_xfmr,
3006            "transformers must keep their off-nominal ratio"
3007        );
3008        assert!(
3009            read.warnings.iter().any(|w| w.contains("read as lines")),
3010            "expected the unit-tap warning, got {:?}",
3011            read.warnings
3012        );
3013    }
3014
3015    #[test]
3016    fn read_allows_a_case_with_no_generators() {
3017        // gen_data may be legitimately empty (a power flow case with no mpc.gen);
3018        // the scenario guard must not reject it — only a *partial* table (rows for
3019        // other scenarios but not this one) is an error.
3020        let net = Network::in_memory(
3021            "nogen",
3022            100.0,
3023            vec![bus(1, BusType::Ref), bus(2, BusType::Pq)],
3024            vec![branch(1, 2, 0.01, 0.1)],
3025        );
3026        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
3027        let read = read_gridfm_network(&tables, 0, net.base_mva, &net.name).unwrap();
3028        assert!(read.network.generators.is_empty());
3029        assert_eq!(read.network.branches.len(), 1);
3030    }
3031
3032    #[test]
3033    fn read_all_zero_cost_reads_as_none_with_ambiguity_warning() {
3034        // A genuine zero polynomial cost writes (0,0,0), indistinguishable from a
3035        // no-cost generator or a zeroed unrepresentable cost; the reader reads None
3036        // and the warning describes the ambiguity (not a false "piecewise/cubic").
3037        let mut net = Network::in_memory(
3038            "zerocost",
3039            100.0,
3040            vec![bus(1, BusType::Ref), bus(2, BusType::Pq)],
3041            vec![branch(1, 2, 0.01, 0.1)],
3042        );
3043        net.generators
3044            .push(gen_at(1, gencost(2, 3, vec![0.0, 0.0, 0.0])));
3045        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
3046        let read = read_gridfm_network(&tables, 0, net.base_mva, &net.name).unwrap();
3047        assert!(
3048            read.network.generators[0].cost.is_none(),
3049            "all-zero cost should read back as None"
3050        );
3051        assert!(
3052            read.warnings
3053                .iter()
3054                .any(|w| w.contains("read with no cost")),
3055            "expected the no-cost ambiguity warning, got {:?}",
3056            read.warnings
3057        );
3058    }
3059
3060    #[test]
3061    fn require_scenario_block_flags_partial_tables() {
3062        // Empty table → ok (a legitimately element-less case). Present → ok. Absent
3063        // from a non-empty table → error (the partial/corrupt dataset Copilot flagged).
3064        assert!(require_scenario_block(&[], 0, &[], "gen_data").is_ok());
3065        assert!(require_scenario_block(&[0, 0, 1], 0, &[0, 1], "gen_data").is_ok());
3066        let err = require_scenario_block(&[0, 0], 1, &[], "branch_data").unwrap_err();
3067        assert!(
3068            matches!(
3069                err,
3070                Error::FormatRead {
3071                    format: "gridfm",
3072                    ..
3073                }
3074            ),
3075            "got {err:?}"
3076        );
3077    }
3078}