Source code for cooldata.metadata
from __future__ import annotations
import os
import random
import shutil
from collections import defaultdict
from pathlib import Path
from typing import List, NamedTuple
import pandas as pd
_QUAD_INDICES = [1, 2, 3, 4]
_CYLINDER_INDICES = [5, 6]
_REPO_ID = "datasets/bgce/cooldata-v2"
# ── Domain objects ────────────────────────────────────────────────────────────
[docs]
class Position(NamedTuple):
"""Represents a 3D position. Is always the center of a body."""
x: float
y: float
z: float
[docs]
class Quader:
"""Represents a quader (rectangular prism) with its properties."""
def __init__(self, temperature: float, position: Position,
size_x: float, size_y: float, size_z: float):
self.temperature = temperature
self.position = position
self.size_x = size_x
self.size_y = size_y
self.size_z = size_z
def __repr__(self) -> str:
return (f"Quader(T={self.temperature}, Pos={self.position}, "
f"Size=({self.size_x},{self.size_y},{self.size_z}))")
[docs]
class Cylinder:
"""Represents a cylinder with its properties."""
def __init__(self, temperature: float, position: Position,
radius: float, height: float):
self.temperature = temperature
self.position = position
self.radius = radius
self.height = height
def __repr__(self) -> str:
return (f"Cylinder(T={self.temperature}, Pos={self.position}, "
f"R={self.radius}, H={self.height})")
[docs]
class SystemParameters:
"""Holds the parameters for a system of quaders, cylinders, and inflow velocity."""
def __init__(self, quads: List[Quader], cylinders: List[Cylinder],
inflow_velocity: float):
self.quads = quads
self.cylinders = cylinders
self.inflow_velocity = inflow_velocity
def __repr__(self) -> str:
return (f"SystemParameters(V={self.inflow_velocity}, "
f"Quads={self.quads}, Cylinders={self.cylinders})")
[docs]
@staticmethod
def from_dataframe_row(row: pd.Series) -> SystemParameters:
"""
Creates a SystemParameters object from a pandas DataFrame row.
Columns expected: T1-T6, x1-x6, y1-y6, xs1-xs4, ys1-ys4,
zs1-zs6, r5-r6, V.
z-positions default to 0.0 if not present.
"""
quads: List[Quader] = []
for i in range(1, 5):
quads.append(Quader(
temperature = float(row[f"T{i}"]),
position = Position(float(row[f"x{i}"]), float(row[f"y{i}"]),
float(row.get(f"z{i}", 0.0))),
size_x = float(row[f"xs{i}"]),
size_y = float(row[f"ys{i}"]),
size_z = float(row[f"zs{i}"]),
))
cylinders: List[Cylinder] = []
for i in range(5, 7):
cylinders.append(Cylinder(
temperature = float(row[f"T{i}"]),
position = Position(float(row[f"x{i}"]), float(row[f"y{i}"]),
float(row.get(f"z{i}", 0.0))),
radius = float(row[f"r{i}"]),
height = float(row[f"zs{i}"]),
))
return SystemParameters(quads=quads, cylinders=cylinders,
inflow_velocity=float(row["V"]))
[docs]
def df_row_to_system_parameters(df: pd.DataFrame, design_id: int) -> SystemParameters:
"""
Create a SystemParameters object for a given design_id from a DataFrame.
Expects a 'design_id' column.
"""
if not isinstance(df, pd.DataFrame):
raise TypeError("Input 'df' must be a pandas DataFrame.")
if not isinstance(design_id, int):
raise TypeError("'design_id' must be an integer.")
row = df[df["design_id"] == design_id]
if row.empty:
raise IndexError(f"design_id {design_id} not found in metadata.")
return SystemParameters.from_dataframe_row(row.iloc[0])
# ── MetadataFilter ────────────────────────────────────────────────────────────
[docs]
class MetadataFilter:
"""
Explore and download cooldata-v2 samples by metadata — no downloading
needed until you call .load().
Expects metadata.parquet which contains the 60,848
simulated samples with columns: design_id, run, batch, T1-T6,
x1-x6, y1-y6, xs1-xs4, ys1-ys4, zs1-zs6, r5, r6, V.
Generate it once with trim_metadata.py if you only have the original
metadata.parquet and sample_index.json.
Quick start
-----------
>>> from cooldata.metadata import MetadataFilter
>>> f = MetadataFilter("Cooldataset/metadata.parquet")
>>> f.summary()
Download by filter:
>>> ds = f.velocity(min=4.0).temperature(body=1, min=50.0).load(num_samples=50)
Download by design ID:
>>> ds = f.load_by_ids([125002, 125037, 212515])
Download randomly:
>>> ds = f.load_random(n=20)
Download by run:
>>> ds = f.load_by_run("run_1", num_samples=100)
"""
def __init__(self, metadata_path: str | Path):
"""
Parameters
----------
metadata_path : path to metadata.parquet
"""
self._path = Path(metadata_path)
self._df = pd.read_parquet(self._path)
self._mask = pd.Series(True, index=self._df.index)
# Validate expected format
required = {"design_id", "run", "batch"}
missing = required - set(self._df.columns)
if missing:
raise ValueError(
f"metadata file is missing columns: {missing}. "
f"Run trim_metadata.py to generate metadata_simulated.parquet."
)
# ── Summary ───────────────────────────────────────────────────────────────
[docs]
def summary(self) -> None:
"""Print a summary of the dataset and current filter match count."""
df = self._df
sep = "=" * 54
run_counts = df["run"].value_counts().sort_index().to_dict()
print(sep)
print(" Cooldata v2 — Dataset Summary")
print(sep)
print(f" Total samples : {len(df):,}")
print()
print(" Samples per run :")
for run, count in sorted(run_counts.items()):
print(f" {run} : {count:,}")
print()
print(f" Inlet velocity V : {df['V'].min():.2f} – {df['V'].max():.2f}"
f" (mean {df['V'].mean():.2f})")
print()
print(" Temperatures (T1–T6):")
for i in range(1, 7):
col = f"T{i}"
print(f" T{i}: {df[col].min():.1f} – {df[col].max():.1f}"
f" (mean {df[col].mean():.1f})")
print()
print(" Active bodies (y == 1.0 = inactive sentinel):")
print(f" {'Body':<8} {'Type':<12} {'Active rows':<16} {'Always active'}")
for i in range(1, 7):
btype = "Quad" if i <= 4 else "Cylinder"
n_active = (df[f"y{i}"] != 1.0).sum()
always = "yes" if n_active == len(df) else "no"
print(f" body {i:<3} {btype:<12} {n_active:>8,} / {len(df):,} {always}")
print()
print(" Body positions (active only, x / y range):")
for i in range(1, 7):
active = df[df[f"y{i}"] != 1.0]
if len(active) == 0:
continue
print(f" body {i}: "
f"x [{active[f'x{i}'].min():.3f} – {active[f'x{i}'].max():.3f}] "
f"y [{active[f'y{i}'].min():.3f} – {active[f'y{i}'].max():.3f}]")
print()
print(sep)
print(f" Current filter matches : {self._mask.sum():,} samples")
print(sep)
# ── Filter methods (chainable) ────────────────────────────────────────────
[docs]
def velocity(self, min: float = None, max: float = None) -> MetadataFilter:
"""Filter by inlet velocity V (range: 1.0 – 7.0)."""
if min is not None:
self._mask &= self._df["V"] >= min
if max is not None:
self._mask &= self._df["V"] <= max
return self
[docs]
def temperature(self, body: int = None,
min: float = None, max: float = None) -> MetadataFilter:
"""
Filter by temperature (range: 20.0 – 80.0).
body=None : at least one body must satisfy the range.
body=1–6 : only that specific body is checked.
"""
if body is not None:
if body not in range(1, 7):
raise ValueError(f"body must be 1–6, got {body}")
if min is not None:
self._mask &= self._df[f"T{body}"] >= min
if max is not None:
self._mask &= self._df[f"T{body}"] <= max
else:
any_match = pd.Series(False, index=self._df.index)
for i in range(1, 7):
ok = pd.Series(True, index=self._df.index)
if min is not None:
ok &= self._df[f"T{i}"] >= min
if max is not None:
ok &= self._df[f"T{i}"] <= max
any_match |= ok
self._mask &= any_match
return self
[docs]
def position(self, body: int,
x_min: float = None, x_max: float = None,
y_min: float = None, y_max: float = None) -> MetadataFilter:
"""Filter by position of a specific body (1–6)."""
if body not in range(1, 7):
raise ValueError(f"body must be 1–6, got {body}")
if x_min is not None:
self._mask &= self._df[f"x{body}"] >= x_min
if x_max is not None:
self._mask &= self._df[f"x{body}"] <= x_max
if y_min is not None:
self._mask &= self._df[f"y{body}"] >= y_min
if y_max is not None:
self._mask &= self._df[f"y{body}"] <= y_max
return self
[docs]
def size(self, quad: int,
xs_min: float = None, xs_max: float = None,
ys_min: float = None, ys_max: float = None) -> MetadataFilter:
"""Filter by x/y size of a quad body (1–4)."""
if quad not in _QUAD_INDICES:
raise ValueError(f"quad must be 1–4, got {quad}")
if xs_min is not None:
self._mask &= self._df[f"xs{quad}"] >= xs_min
if xs_max is not None:
self._mask &= self._df[f"xs{quad}"] <= xs_max
if ys_min is not None:
self._mask &= self._df[f"ys{quad}"] >= ys_min
if ys_max is not None:
self._mask &= self._df[f"ys{quad}"] <= ys_max
return self
[docs]
def radius(self, cylinder: int,
min: float = None, max: float = None) -> MetadataFilter:
"""Filter by radius of a cylinder body (5 or 6)."""
if cylinder not in _CYLINDER_INDICES:
raise ValueError(f"cylinder must be 5 or 6, got {cylinder}")
if min is not None:
self._mask &= self._df[f"r{cylinder}"] >= min
if max is not None:
self._mask &= self._df[f"r{cylinder}"] <= max
return self
[docs]
def n_quads(self, exactly: int = None,
min: int = None, max: int = None) -> MetadataFilter:
"""
Filter by number of active quads (bodies 1–4).
A body is inactive when y == 1.0.
"""
active_quads = sum(
(self._df[f"y{i}"] != 1.0).astype(int)
for i in _QUAD_INDICES
)
if exactly is not None:
self._mask &= active_quads == exactly
else:
if min is not None:
self._mask &= active_quads >= min
if max is not None:
self._mask &= active_quads <= max
return self
[docs]
def n_cylinders(self, exactly: int = None,
min: int = None, max: int = None) -> MetadataFilter:
"""
Filter by number of active cylinders (bodies 5–6).
A body is inactive when y == 1.0.
"""
active_cyls = sum(
(self._df[f"y{i}"] != 1.0).astype(int)
for i in _CYLINDER_INDICES
)
if exactly is not None:
self._mask &= active_cyls == exactly
else:
if min is not None:
self._mask &= active_cyls >= min
if max is not None:
self._mask &= active_cyls <= max
return self
[docs]
def n_bodies(self, exactly: int = None,
min: int = None, max: int = None) -> MetadataFilter:
"""
Filter by total number of active bodies (quads + cylinders).
A body is inactive when y == 1.0.
"""
active = sum(
(self._df[f"y{i}"] != 1.0).astype(int)
for i in range(1, 7)
)
if exactly is not None:
self._mask &= active == exactly
else:
if min is not None:
self._mask &= active >= min
if max is not None:
self._mask &= active <= max
return self
[docs]
def run(self, *run_names: str) -> MetadataFilter:
"""
Restrict to samples from specific runs.
Available runs: run_1, run_3, run_4, run_6, run_7
Example: f.run("run_1", "run_3")
"""
self._mask &= self._df["run"].isin(run_names)
return self
[docs]
def custom(self, expr: str) -> MetadataFilter:
"""
Apply a raw pandas query string on the metadata columns.
Available: design_id, run, batch, T1–T6, x1–x6, y1–y6,
xs1–xs4, ys1–ys4, zs1–zs6, r5, r6, V
Example: f.custom("V > 4.5 and T1 < 80")
"""
self._mask &= self._df.index.isin(self._df.query(expr).index)
return self
[docs]
def reset(self) -> MetadataFilter:
"""Clear all filters."""
self._mask = pd.Series(True, index=self._df.index)
return self
# ── Results ───────────────────────────────────────────────────────────────
[docs]
def count(self) -> int:
"""Number of samples matching the current filter."""
return int(self._mask.sum())
[docs]
def get_design_ids(self) -> list[int]:
"""Return design IDs matching the current filter."""
return self._df[self._mask]["design_id"].tolist()
[docs]
def get_dataframe(self) -> pd.DataFrame:
"""Return filtered metadata as a DataFrame."""
return self._df[self._mask].reset_index(drop=True)
# ── Download methods ──────────────────────────────────────────────────────
[docs]
def load(self, num_samples: int = None,
data_dir: str | Path = "Cooldataset"):
"""
Download samples matching the current filter.
Parameters
----------
num_samples : cap on number of samples. Downloads all matches if None.
data_dir : local directory for downloaded files.
"""
design_ids = self.get_design_ids()
if num_samples is not None:
design_ids = design_ids[:num_samples]
return self._download(design_ids, data_dir)
[docs]
def load_by_ids(self, design_ids: list[int],
data_dir: str | Path = "Cooldataset"):
"""
Download specific samples by design ID.
Example
-------
>>> ds = f.load_by_ids([125002, 125037, 212515])
"""
return self._download(design_ids, data_dir)
[docs]
def load_random(self, n: int, data_dir: str | Path = "Cooldataset",
seed: int = None):
"""
Download n randomly selected samples (respects active filters).
Parameters
----------
n : number of samples to download.
seed : optional random seed for reproducibility.
"""
design_ids = self.get_design_ids()
rng = random.Random(seed) if seed is not None else random
chosen = rng.sample(design_ids, min(n, len(design_ids)))
print(f"Randomly selected {len(chosen)} samples (seed={seed}).")
return self._download(chosen, data_dir)
[docs]
def load_by_run(self, run_name: str, num_samples: int = None,
data_dir: str | Path = "Cooldataset"):
"""
Download samples from a specific run.
Available runs: run_1, run_3, run_4, run_6, run_7
Parameters
----------
run_name : e.g. "run_1"
num_samples : optional cap.
"""
available_runs = self._df["run"].unique().tolist()
if run_name not in available_runs:
raise ValueError(f"Run '{run_name}' not found. "
f"Available: {sorted(available_runs)}")
design_ids = self._df[self._df["run"] == run_name]["design_id"].tolist()
if num_samples is not None:
design_ids = design_ids[:num_samples]
print(f"Downloading {len(design_ids)} samples from {run_name}.")
return self._download(design_ids, data_dir)
# ── Core download logic ───────────────────────────────────────────────────
def _download(self, design_ids: list[int],
data_dir: str | Path = "Cooldataset"):
"""
Download and load the given design IDs using run/batch columns
for targeted batch downloads — only fetches batches that are needed.
"""
import huggingface_hub as hf
from cooldata.pyvista_flow_field_dataset import PyvistaFlowFieldDataset, PyvistaSample
if not design_ids:
print("No samples to download.")
return None
print(f"Downloading {len(design_ids)} samples ...")
data_dir = Path(os.path.abspath(data_dir))
volume_dir = data_dir / "volume"
surface_dir = data_dir / "surface"
tmp_dir = data_dir / "tmp"
for d in (volume_dir, surface_dir, tmp_dir):
os.makedirs(d, exist_ok=True)
# Look up run/batch for each design_id directly from the DataFrame
id_set = set(design_ids)
rows = self._df[self._df["design_id"].isin(id_set)]
missing = id_set - set(rows["design_id"].tolist())
if missing:
print(f" Warning: {len(missing)} design ID(s) not found in metadata: "
f"{sorted(missing)[:5]}{'...' if len(missing) > 5 else ''}")
# Group by (run, batch) — download each batch only once
grouped: dict[tuple, list[int]] = defaultdict(list)
for _, row in rows.iterrows():
grouped[(row["run"], row["batch"])].append(int(row["design_id"]))
fs = hf.HfFileSystem()
samples : list[PyvistaSample] = []
total = len(grouped)
for batch_num, ((run_name, batch_name), batch_ids) in enumerate(grouped.items(), 1):
zip_remote = f"{_REPO_ID}/runs/{run_name}/{batch_name}.zip"
zip_local = tmp_dir / run_name / f"{batch_name}.zip"
extract_dir = tmp_dir / run_name / batch_name
os.makedirs(zip_local.parent, exist_ok=True)
print(f" [{batch_num}/{total}] {run_name}/{batch_name}.zip "
f"({len(batch_ids)} needed) ...", end=" ", flush=True)
try:
fs.download(zip_remote, str(zip_local))
except Exception as e:
print(f"FAILED: {e}")
continue
extract_dir.mkdir(parents=True, exist_ok=True)
shutil.unpack_archive(str(zip_local), str(extract_dir))
found = 0
for did in batch_ids:
fid = f"{did:07d}"
v = extract_dir / f"volume_design_{fid}_p.cgns"
s = extract_dir / f"surface_design_{fid}_p.cgns"
if not v.exists() or not s.exists():
print(f"\n Warning: files missing for design {fid}")
continue
shutil.copy(v, volume_dir / v.name)
shutil.copy(s, surface_dir / s.name)
samples.append(PyvistaSample(volume_dir / v.name,
surface_dir / s.name))
found += 1
print(f"{found}/{len(batch_ids)} copied")
shutil.rmtree(extract_dir, ignore_errors=True)
zip_local.unlink(missing_ok=True)
shutil.rmtree(tmp_dir, ignore_errors=True)
if not samples:
print("No samples collected.")
return None
ds = PyvistaFlowFieldDataset(samples)
ds.add_metadata(self._df)
print(f"\nLoaded {len(ds)} samples.")
return ds