Source code for cooldata.dgl_flow_field_dataset

import json
import os
from pathlib import Path
from typing import Callable, List, Optional

try:
    import dgl
except ImportError as e:
    raise ImportError(
        f"""DGL is not installed. Please install it manually by following the instructions at: 
        "https://www.dgl.ai/pages/start.html
        {e}"""
    )

import concurrent.futures
import shutil

import numpy as np
import pyvista as pv
import torch
from tqdm import tqdm

from cooldata.pyvista_flow_field_dataset import (
    PyvistaFlowFieldDataset,
    PyvistaSample,
    SurfaceFieldType,
)


[docs] def process_sample(args): cache_dir, sample, i = args g = DGLVolumeFlowFieldDataset.pyvista_to_volume_dgl(sample) print("Saving graph to cache directory:", os.path.join(cache_dir, f"{i}.dgl")) dgl.save_graphs(os.path.join(cache_dir, f"{i}.dgl"), g) print("Graph saved successfully.")
[docs] class DGLVolumeFlowFieldDataset(torch.utils.data.Dataset): def __init__( self, cache_dir: str, pyvista_dataset: PyvistaFlowFieldDataset | None = None, parallel_conversion: bool = True ): """ Creates a new DGLVolumeFlowFieldDataset. If a PyvistaFlowFieldDataset is provided, it will be converted to DGLGraphs and stored in the cache directory. If not, the dataset will be loaded from the cache directory. Parameters: ----------- cache_dir: str The directory where the dataset converted to DGLGraphs is stored. polydata: pv.PolyData The directory where the dataset converted to DGLGraphs is stored. Default None. """ self.cache_dir = Path(os.path.abspath(cache_dir)) self.cache_dir.mkdir(parents=True, exist_ok=True) self.node_stats: tuple[dict, dict] | None = None self.edge_stats: tuple[dict, dict] | None = None if not self.cache_dir.exists(): self.cache_dir.mkdir(parents=True) existing_files = [f for f in self.cache_dir.iterdir() if f.suffix == ".dgl"] if len(existing_files) == len(pyvista_dataset or []): self.files = existing_files return if pyvista_dataset is not None: # clear the cache directory shutil.rmtree(self.cache_dir) self.cache_dir.mkdir(parents=True) pyvista_dataset.unload() if parallel_conversion: args = [ (self.cache_dir, pyvista_dataset[i], i) for i in range(len(pyvista_dataset)) ] with concurrent.futures.ProcessPoolExecutor() as executor: list( tqdm( executor.map(process_sample, args), total=len(pyvista_dataset), desc="Converting Pyvista dataset to DGLGraphs", ) ) else: for i in tqdm( range(len(pyvista_dataset)), desc="Converting Pyvista dataset to DGLGraphs", ): process_sample((self.cache_dir, pyvista_dataset[i], i)) self.files = [f for f in self.cache_dir.iterdir() if f.suffix == ".dgl"] def __len__(self): return len(self.files) def __getitem__(self, idx): filename = os.path.join(self.cache_dir, self.files[idx]) if filename.endswith(".dgl"): g = dgl.load_graphs(filename)[0][0] if self.node_stats is not None and self.edge_stats is not None: self.normalize_inplace(g) return g else: raise ValueError(f"File {filename} is not a DGLGraph file.")
[docs] @classmethod def pyvista_to_volume_dgl(cls, sample: PyvistaSample) -> dgl.DGLGraph: """ Convert a Pyvista UnstructuredGrid object to a DGLGraph of the volume flow field. Parameters: ----------- grid: pv.UnstructuredGrid The UnstructuredGrid object to convert. Returns: -------- DGLGraph: The converted graph. """ grid = sample.volume_data[0][0][0] edges_from = [] edges_to = [] # TODO: Speed up this loop for i in tqdm(range(grid.n_cells)): neighbors = grid.cell_neighbors(i) edges_from.extend([i] * len(neighbors)) edges_to.extend(neighbors) graph = dgl.graph((edges_from, edges_to), num_nodes=grid.n_cells) # TODO add the attributes to the nodes from the grid graph.ndata["Pressure"] = torch.tensor( grid.cell_data["Pressure"], dtype=torch.float32 ) graph.ndata["Temperature"] = torch.tensor( grid.cell_data["Temperature"], dtype=torch.float32 ) centers = torch.tensor(grid.cell_centers().points, dtype=torch.float32) graph.ndata["Position"] = centers velocities = torch.stack( [ torch.tensor(grid.cell_data["Velocity_0"], dtype=torch.float32), torch.tensor(grid.cell_data["Velocity_1"], dtype=torch.float32), torch.tensor(grid.cell_data["Velocity_2"], dtype=torch.float32), ], dim=1, ) graph.ndata["Velocity"] = velocities graph.ndata["TurbulentDissipationRate"] = torch.tensor( grid.cell_data["TurbulentDissipationRate"], dtype=torch.float32 ) graph.ndata["TurbulentKineticEnergy"] = torch.tensor( grid.cell_data["TurbulentKineticEnergy"], dtype=torch.float32 ) graph.ndata["Volume"] = torch.tensor( grid.cell_data["Volume"], dtype=torch.float32 ) connectivity_vectors = ( graph.ndata["Position"][edges_to] - graph.ndata["Position"][edges_from] ) graph.edata["dx"] = connectivity_vectors return graph
[docs] def volume_dgl_to_pv(self, graph: dgl.DGLGraph) -> pv.PolyData: """ Convert a DGLGraph of the volume flow field to a Pyvista PolyData object. Parameters: ----------- graph: dgl.DGLGraph The DGLGraph to convert. Returns: -------- pv.PolyData: The converted PolyData object. """ raise NotImplementedError("Implement this method")
[docs] def normalize(self): if os.path.exists(os.path.join(self.cache_dir, "stats.json")): with open(os.path.join(self.cache_dir, "stats.json"), "r") as f: stats = json.load(f) if "node_stats" in stats and "edge_stats" in stats: self.node_stats = stats["node_stats"] self.edge_stats = stats["edge_stats"] else: self.node_stats = self.compute_node_stats() self.edge_stats = self.compute_edge_stats() with open(os.path.join(self.cache_dir, "stats.json"), "w") as f: json.dump( { "node_stats": self.node_stats, "edge_stats": self.edge_stats, }, f, )
[docs] def denormalize(self): self.node_stats = None self.edge_stats = None
[docs] def normalize_inplace(self, graph: dgl.DGLGraph): """ Normalize the features of the graph. Parameters: ----------- graph: dgl.DGLGraph The graph to normalize. """ for key in graph.ndata.keys(): if key not in self.node_stats[0] or key not in self.node_stats[1]: continue if graph.ndata[key].dtype != torch.float32: continue graph.ndata[key] = ( graph.ndata[key] - torch.tensor(self.node_stats[0][key], device=graph.device) ) / torch.tensor(self.node_stats[1][key], device=graph.device) for key in graph.edata.keys(): if key not in self.edge_stats[0] or key not in self.edge_stats[1]: continue if graph.edata[key].dtype != torch.float32: continue graph.edata[key] = ( graph.edata[key] - torch.tensor(self.edge_stats[0][key], device=graph.device) ) / torch.tensor(self.edge_stats[1][key], device=graph.device)
[docs] def denormalize_inplace(self, graph: dgl.DGLGraph): """ Denormalize the features of the graph. Parameters: ----------- graph: dgl.DGLGraph The graph to denormalize. """ for key in graph.ndata.keys(): if key not in self.node_means or key not in self.node_stds: continue if graph.ndata[key].dtype != torch.float32: continue graph.ndata[key] = ( graph.ndata[key] * torch.tensor(self.node_stds[key], device=graph.device) ) + torch.tensor(self.node_means[key], device=graph.device) for key in graph.edata.keys(): if key not in self.edge_means or key not in self.edge_stds: continue if graph.edata[key].dtype != torch.float32: continue graph.edata[key] = ( graph.edata[key] * torch.tensor(self.edge_stds[key], device=graph.device) ) + torch.tensor(self.edge_means[key], device=graph.device)
[docs] def compute_node_stats(self) -> tuple[dict[str, float], dict[str, float]]: """ Compute the mean and standard deviation of the node features. """ graph_means: dict[str, list[float]] = {} graph_stds: dict[str, list[float]] = {} for i in range(len(self)): graph = self[i] for key in graph.ndata.keys(): if graph.ndata[key].dtype != torch.float32: continue if key not in graph_means: graph_means[key] = [] graph_stds[key] = [] graph_means[key].append(graph.ndata[key].mean(dim=0).tolist()) graph_stds[key].append(graph.ndata[key].std(dim=0).tolist()) means = { key: (np.sum(graph_means[key], axis=0) / len(graph_means[key])).tolist() for key in graph_means } stds = { key: ( np.sqrt( np.sum(np.square(graph_stds[key]), axis=0) / len(graph_stds[key]) ) ).tolist() for key in graph_stds } return means, stds
[docs] def compute_edge_stats(self) -> tuple[dict[str, float], dict[str, float]]: """ Compute the mean and standard deviation of the edge features. """ graph_means: dict[str, list[float]] = {} graph_stds: dict[str, list[float]] = {} for i in range(len(self)): graph = self[i] for key in graph.edata.keys(): if graph.edata[key].dtype != torch.float32: continue if key not in graph_means: graph_means[key] = [] graph_stds[key] = [] graph_means[key].append(graph.edata[key].mean(dim=0).tolist()) graph_stds[key].append(graph.edata[key].std(dim=0).tolist()) means = { key: (np.sum(graph_means[key], axis=0) / len(graph_means[key])).tolist() for key in graph_means } stds = { key: ( np.sqrt( np.sum(np.square(graph_stds[key]), axis=0) / len(graph_stds[key]) ) ).tolist() for key in graph_stds } return means, stds
[docs] @classmethod def l2_loss(cls, graph1: dgl.DGLGraph, graph2: dgl.DGLGraph): """ Compute the L2 loss between two DGLGraphs. Parameters: ----------- graph1: dgl.DGLGraph The first graph. graph2: dgl.DGLGraph The second graph. Returns: -------- float: The L2 loss between the two graphs. """ raise NotImplementedError("Implement this method")
[docs] def process_surface_sample(args): cache_dir, sample, i, patches_to_include = args g = DGLSurfaceFlowFieldDataset.pyvista_to_surface_dgl(sample, patches_to_include) dgl.save_graphs(os.path.join(cache_dir, f"{i}.dgl"), g)
[docs] class DGLSurfaceFlowFieldDataset(torch.utils.data.Dataset): def __init__( self, cache_dir: str, pyvista_dataset: PyvistaFlowFieldDataset | None = None, normalize: bool = True, patches_to_include: Optional[List[int]] = None, ): """Dataset of surface flow fields represented as DGLGraphs. If a PyvistaFlowFieldDataset is provided, it will be converted to DGLGraphs and stored in the cache directory. If not, the dataset will be loaded from the cache directory. Parameters: ----------- cache_dir: str The directory where the dataset converted to DGLGraphs is stored. polydata: pv.PolyData The directory where the dataset converted to DGLGraphs is stored. Default None. """ self.cache_dir = Path(os.path.abspath(cache_dir)) if not self.cache_dir.exists(): os.makedirs(self.cache_dir) existing = [f for f in os.listdir(self.cache_dir) if f.endswith(".dgl")] if pyvista_dataset is not None and len(existing) != len(pyvista_dataset): # clear the cache directory for f in os.listdir(self.cache_dir): os.remove(os.path.join(self.cache_dir, f)) args = [ (self.cache_dir, pyvista_dataset[i], i, patches_to_include) for i in range(len(pyvista_dataset)) ] with concurrent.futures.ProcessPoolExecutor() as executor: list( tqdm( executor.map(process_surface_sample, args), total=len(pyvista_dataset), desc="Converting Pyvista dataset to DGLGraphs (surface)", ) ) self.files = [f for f in os.listdir(self.cache_dir) if f.endswith(".dgl")] # first do not normalize to compute the stats, then normalize self.do_normalization = False if pyvista_dataset is not None: node_stats = self.compute_node_stats() edge_stats = self.compute_edge_stats() with open(os.path.join(self.cache_dir, "stats.json"), "w") as f: json.dump({"node_stats": node_stats, "edge_stats": edge_stats}, f) with open(os.path.join(self.cache_dir, "stats.json"), "r") as f: stats = json.load(f) node_stats: tuple[dict[str, float], dict[str, float]] = stats["node_stats"] edge_stats: tuple[dict[str, float], dict[str, float]] = stats["edge_stats"] self.node_means, self.node_stds = node_stats self.edge_means, self.edge_stds = edge_stats self.do_normalization = normalize def __len__(self): return len(self.files) def __getitem__(self, idx): """ Get the graph at the given index.""" filename = os.path.join(self.cache_dir, self.files[idx]) g = dgl.load_graphs(filename)[0][0] if self.do_normalization: g = self.normalize(g) return g
[docs] def slice(self, start: int, end: int) -> "DGLSurfaceFlowFieldDataset": """ Get a slice of the dataset. """ new_dataset = DGLSurfaceFlowFieldDataset( cache_dir=self.cache_dir, pyvista_dataset=None, normalize=self.do_normalization, ) new_dataset.files = self.files[start:end] new_dataset.node_means = self.node_means new_dataset.node_stds = self.node_stds new_dataset.edge_means = self.edge_means new_dataset.edge_stds = self.edge_stds return new_dataset
[docs] def shuffle(self) -> None: """ Shuffle the dataset. """ np.random.shuffle(self.files)
[docs] def select_subset(self, indices: List[int]) -> "DGLSurfaceFlowFieldDataset": """ Select a subset of the dataset based on the given indices. """ new_dataset = DGLSurfaceFlowFieldDataset( cache_dir=self.cache_dir, pyvista_dataset=None, normalize=self.do_normalization, ) new_dataset.files = [self.files[i] for i in indices] new_dataset.node_means = self.node_means new_dataset.node_stds = self.node_stds new_dataset.edge_means = self.edge_means new_dataset.edge_stds = self.edge_stds return new_dataset
[docs] @classmethod def pyvista_to_surface_dgl( cls, sample: PyvistaSample, block_indices: Optional[List[int]] = None ) -> dgl.DGLGraph: """ Convert a Pyvista UnstructuredGrid object to a DGLGraph of the volume flow field. Parameters: ----------- grid: pv.UnstructuredGrid The UnstructuredGrid object to convert. Returns: -------- DGLGraph: The converted graph. """ if block_indices is None: block_indices = list(range(len(sample.surface_data[0]))) for block_index in block_indices: grid = sample.surface_data[0][block_index] # TODO: Speed up this loop grid.cell_data["BodyID"] = block_index block_name = sample.surface_data[0].get_block_name(block_index) surface_type = -1 if "wall" in block_name.lower(): surface_type = 0 elif "inlet" in block_name.lower(): surface_type = 1 elif "outlet" in block_name.lower(): surface_type = 2 elif "symmetry" in block_name.lower(): surface_type = 3 elif "body" in block_name.lower(): surface_type = 4 else: print(f"Unknown surface type for block {block_index}: {block_name}") grid.cell_data["SurfaceType"] = surface_type if "WallShearStress_0" not in grid.cell_data: grid.cell_data["WallShearStress_0"] = 0.0 grid.cell_data["WallShearStress_1"] = 0.0 grid.cell_data["WallShearStress_2"] = 0.0 combined = sample.surface_data[0].combine(merge_points=True) edges_from = [] edges_to = [] for i in range(combined.n_cells): neighbors = combined.cell_neighbors(i) edges_from.extend([i] * len(neighbors)) edges_to.extend(neighbors) graph = dgl.graph((edges_from, edges_to), num_nodes=combined.n_cells) graph.ndata["Pressure"] = torch.tensor( combined.cell_data["Pressure"], dtype=torch.float32 ) graph.ndata["Temperature"] = torch.tensor( combined.cell_data["Temperature"], dtype=torch.float32 ) centers = torch.tensor(combined.cell_centers().points, dtype=torch.float32) graph.ndata["Position"] = centers shear_stresses = ( torch.tensor(combined.cell_data["WallShearStress_0"], dtype=torch.float32), torch.tensor(combined.cell_data["WallShearStress_1"], dtype=torch.float32), torch.tensor(combined.cell_data["WallShearStress_2"], dtype=torch.float32), ) shear_stress = torch.stack(shear_stresses, dim=1) graph.ndata["ShearStress"] = shear_stress graph.ndata["Normal"] = torch.tensor( combined.extract_surface().face_normals, dtype=torch.float32 ) graph.ndata["CellArea"] = torch.tensor( combined.compute_cell_sizes( length=False, area=True, volume=False ).cell_data["Area"], dtype=torch.float32, ) graph.ndata["SurfaceType"] = torch.tensor( combined.cell_data["SurfaceType"], dtype=torch.int32 ) graph.ndata["BodyID"] = torch.tensor( combined.cell_data["BodyID"], dtype=torch.int32 ) graph.ndata["HeatTransferCoefficient"] = torch.tensor( combined.cell_data["HeatTransferCoefficient"], dtype=torch.float32 ) connectivity_vectors = ( graph.ndata["Position"][edges_to] - graph.ndata["Position"][edges_from] ) graph.edata["dx"] = connectivity_vectors # graph.ndata['velocity'] = torch.tensor(grid.point_data['Velocity'], dtype=torch.float32) # TODO add the attributes to the nodes from the grid return graph
[docs] def compute_node_stats(self) -> tuple[dict[str, float], dict[str, float]]: """ Compute the mean and standard deviation of the node features. """ graph_means: dict[str, list[float]] = {} graph_stds: dict[str, list[float]] = {} for i in range(len(self)): graph = self[i] for key in graph.ndata.keys(): if graph.ndata[key].dtype != torch.float32: continue if key not in graph_means: graph_means[key] = [] graph_stds[key] = [] graph_means[key].append(graph.ndata[key].mean(dim=0).tolist()) graph_stds[key].append(graph.ndata[key].std(dim=0).tolist()) means = { key: (np.sum(graph_means[key], axis=0) / len(graph_means[key])).tolist() for key in graph_means } stds = { key: ( np.sqrt( np.sum(np.square(graph_stds[key]), axis=0) / len(graph_stds[key]) ) ).tolist() for key in graph_stds } return means, stds
[docs] def compute_edge_stats(self) -> tuple[dict[str, float], dict[str, float]]: """ Compute the mean and standard deviation of the edge features. """ graph_means: dict[str, list[float]] = {} graph_stds: dict[str, list[float]] = {} for i in range(len(self)): graph = self[i] for key in graph.edata.keys(): if graph.edata[key].dtype != torch.float32: continue if key not in graph_means: graph_means[key] = [] graph_stds[key] = [] graph_means[key].append(graph.edata[key].mean(dim=0).tolist()) graph_stds[key].append(graph.edata[key].std(dim=0).tolist()) means = { key: (np.sum(graph_means[key], axis=0) / len(graph_means[key])).tolist() for key in graph_means } stds = { key: ( np.sqrt( np.sum(np.square(graph_stds[key]), axis=0) / len(graph_stds[key]) ) ).tolist() for key in graph_stds } return means, stds
[docs] def volume_dgl_to_pyvista(self, graph: dgl.DGLGraph) -> pv.PolyData: """ Convert a DGLGraph of the volume flow field to a Pyvista PolyData object. Parameters: ----------- graph: dgl.DGLGraph The DGLGraph to convert. Returns: -------- pv.PolyData: The converted PolyData object. """ raise NotImplementedError("Implement this method")
[docs] def normalize_inplace(self, graph: dgl.DGLGraph) -> None: """ Normalize the features of the graph in place. Parameters: ----------- graph: dgl.DGLGraph The graph to normalize. """ for key in graph.ndata.keys(): if key not in self.node_means or key not in self.node_stds: continue if graph.ndata[key].dtype != torch.float32: continue graph.ndata[key] = ( graph.ndata[key] - torch.tensor(self.node_means[key], device=graph.device) ) / torch.tensor(self.node_stds[key], device=graph.device) for key in graph.edata.keys(): if key not in self.edge_means or key not in self.edge_stds: continue if graph.edata[key].dtype != torch.float32: continue graph.edata[key] = ( graph.edata[key] - torch.tensor(self.edge_means[key], device=graph.device) ) / torch.tensor(self.edge_stds[key], device=graph.device)
[docs] def normalize(self, graph: dgl.DGLGraph) -> dgl.DGLGraph: """ Normalize the features of the graph. Parameters: ----------- graph: dgl.DGLGraph The graph to normalize. Returns: -------- dgl.DGLGraph: The normalized graph. """ graph = graph.clone() self.normalize_inplace(graph) return graph
[docs] def denormalize_inplace(self, graph: dgl.DGLGraph) -> None: """ Denormalize the features of the graph in place. Parameters: ----------- graph: dgl.DGLGraph The graph to denormalize. """ for key in graph.ndata.keys(): if key not in self.node_means or key not in self.node_stds: continue if graph.ndata[key].dtype != torch.float32: continue graph.ndata[key] = ( graph.ndata[key] * torch.tensor(self.node_stds[key], device=graph.device) ) + torch.tensor(self.node_means[key], device=graph.device) for key in graph.edata.keys(): if key not in self.edge_means or key not in self.edge_stds: continue if graph.edata[key].dtype != torch.float32: continue graph.edata[key] = ( graph.edata[key] * torch.tensor(self.edge_stds[key], device=graph.device) ) + torch.tensor(self.edge_means[key], device=graph.device)
[docs] def denormalize(self, graph: dgl.DGLGraph) -> dgl.DGLGraph: """ Denormalize the features of the graph. Parameters: ----------- graph: dgl.DGLGraph The graph to denormalize. Returns: -------- dgl.DGLGraph: The denormalized graph. """ graph = graph.clone() self.denormalize_inplace(graph) return graph
[docs] def dgl_to_pyvista_polydata(self, graph: dgl.DGLGraph) -> pv.PolyData: """ Convert a DGLGraph of the volume flow field to a Pyvista PolyData object. This will not be the original mesh since the graph only contains the cell centers and the connectivity, not the cell points Parameters: ----------- graph: dgl.DGLGraph The DGLGraph to convert. Returns: -------- pv.PolyData: The converted PolyData object. """ if self.do_normalization: graph = self.denormalize(graph) graph = graph.to("cpu") e_from, e_to = graph.edges() num_nodes_per_edge = torch.ones_like(e_from) * 2 edges = torch.stack([num_nodes_per_edge, e_from, e_to], dim=1).detach() grid = pv.PolyData(graph.ndata["Position"].detach().numpy(), edges.numpy()) grid.point_data["Pressure"] = graph.ndata["Pressure"].detach().numpy() grid.point_data["WallShearStress_0"] = ( graph.ndata["ShearStress"][:, 0].detach().numpy() ) grid.point_data["WallShearStress_1"] = ( graph.ndata["ShearStress"][:, 1].detach().numpy() ) grid.point_data["WallShearStress_2"] = ( graph.ndata["ShearStress"][:, 2].detach().numpy() ) grid.point_data["Temperature"] = graph.ndata["Temperature"].detach().numpy() grid.point_data["Normal_0"] = graph.ndata["Normal"][:, 0].detach().numpy() grid.point_data["Normal_1"] = graph.ndata["Normal"][:, 1].detach().numpy() grid.point_data["Normal_2"] = graph.ndata["Normal"][:, 2].detach().numpy() grid.point_data["CellArea"] = graph.ndata["CellArea"].detach().numpy() grid.point_data["BodyID"] = graph.ndata["BodyID"].detach().numpy() return grid
[docs] def plot_surface( self, graph: dgl.DGLGraph, scalar: SurfaceFieldType, object_id: Optional[int] = None, ): """ Plot the surface of the graph. Parameters: ----------- graph: dgl.DGLGraph The graph to plot. """ if object_id is not None: mask = graph.ndata["BodyID"] == object_id graph = graph.subgraph(mask) grid = self.dgl_to_pyvista_polydata(graph) grid.plot( scalars=scalar, render_points_as_spheres=True, eye_dome_lighting=True, render_lines_as_tubes=True, line_width=4, style="wireframe", )
[docs] def compute_aggregate_force( self, graph: dgl.DGLGraph, object_id: Optional[int] = None ) -> torch.Tensor: """ Compute the aggregate force acting on the surface. Parameters: ----------- graph: dgl.DGLGraph The graph to compute the aggregate force for. object_id: Optional[int] The object id to compute the aggregate force for. If None, the aggregate force for all objects is computed. Returns: -------- torch.Tensor: The aggregate force acting on the surface. """ if self.do_normalization: graph = self.denormalize(graph) cell_areas = graph.ndata["CellArea"] cell_pressure_forces = ( graph.ndata["Pressure"][:, None] * cell_areas[:, None] * graph.ndata["Normal"] ) cell_shear_forces = graph.ndata["ShearStress"] * cell_areas[:, None] if object_id is not None: mask = graph.ndata["BodyID"] == object_id cell_pressure_forces = cell_pressure_forces[mask] cell_shear_forces = cell_shear_forces[mask] pressure_force = cell_pressure_forces.sum(dim=0) shear_force = cell_shear_forces.sum(dim=0) return pressure_force + shear_force
[docs] @classmethod def l2_loss(cls, graph1: dgl.DGLGraph, graph2: dgl.DGLGraph): """ Compute the L2 loss between two DGLGraphs. Parameters: ----------- graph1: dgl.DGLGraph The first graph. graph2: dgl.DGLGraph The second graph. Returns: -------- torch.tensor: The L2 loss between the two graphs. """ # check that the graphs have the same number of nodes assert graph1.num_nodes() == graph2.num_nodes() # check that the graphs have the same number of edges assert graph1.num_edges() == graph2.num_edges() return ( torch.nn.functional.mse_loss( graph1.ndata["Pressure"], graph2.ndata["Pressure"] ) + torch.nn.functional.mse_loss( graph1.ndata["Temperature"], graph2.ndata["Temperature"] ) + torch.nn.functional.mse_loss( graph1.ndata["ShearStress"], graph2.ndata["ShearStress"] ) )