Skip to content

Model API

OPF Heterogeneous Models

OPFHeteroGNN

Bases: Module

OPF-specific Heterogeneous GNN with configurable message-passing backend.

Wraps HeteroConv with a selectable backend (SAGE, GCN, GIN, or GAT) to operate on the heterogeneous power grid graph. Produces per-node predictions for bus (va, vm) and generator (pg, qg).

Input shape per node type: (N_type, input_channels[type]). Output shape: {'bus': (N_bus, out_channels), 'generator': (N_gen, out_channels)}.

Source code in lumina/model/opf/hetero_model.py
class OPFHeteroGNN(torch.nn.Module):
    """OPF-specific Heterogeneous GNN with configurable message-passing backend.

    Wraps ``HeteroConv`` with a selectable backend (SAGE, GCN, GIN, or GAT)
    to operate on the heterogeneous power grid graph. Produces per-node
    predictions for ``bus`` (va, vm) and ``generator`` (pg, qg).

    Input shape per node type: ``(N_type, input_channels[type])``.
    Output shape: ``{'bus': (N_bus, out_channels), 'generator': (N_gen, out_channels)}``.
    """

    def __init__(
            self,
            metadata,
            input_channels,
            hidden_channels=64,
            out_channels=2,
            num_layers=3,
            backend="sage",
            edge_attr_dim=None,
            **kwargs):
        """ Heterogeneous Graph Neural Network (HeteroGNN) model for OPF.

        Args:
            metadata (dict or tuple): Metadata containing node types and edge types.
                If dict: {'nodes': {node_type: dim, ...}, 'edges': {edge_type: dim, ...}}
                If tuple: (node_types, edge_types)
            input_channels (dict): Number of input features for each node type.
            hidden_channels (int): Hidden embedding size.
            out_channels (int): Size of each output sample. Defaults to 2.
            num_layers (int): Number of layers. Defaults to 3.
            backend (str): Graph convolutional layer backend. Defaults to "sage".
            edge_attr_dim (int, optional): Dimension of edge attributes for GAT. Defaults to None.
        """
        super().__init__()
        self.lin_dict = torch.nn.ModuleDict()

        # Handle both old tuple format and new dict format
        if isinstance(metadata, dict):
            node_types = list(metadata['nodes'].keys())
            edge_types = list(metadata['edges'].keys())
        else:
            # Legacy tuple format: (node_types, edge_types)
            node_types = metadata[0]
            edge_types = metadata[1]

        self.backend = backend
        self.edge_attr_support = backend == "gat"

        # Validate input_channels
        if not isinstance(input_channels, dict):
            raise ValueError("input_channels must be a dictionary")

        # Input layers for each node type - no more lazy initialization
        for node_type in node_types:
            if node_type not in input_channels:
                raise ValueError(f"input_channels must contain entry for node type '{node_type}'")
            self.lin_dict[node_type] = Linear(input_channels[node_type], hidden_channels)

        # Heterogeneous graph convolutional layers for edges
        self.convs = torch.nn.ModuleList()

        # Get edge attribute dimensions from metadata
        if isinstance(metadata, dict):
            edge_attr_dims = {edge_type: (dim if dim > 0 else None)
                              for edge_type, dim in metadata['edges'].items()}
        else:
            # Fallback to hardcoded dimensions for legacy format
            edge_attr_dims = {
                ('bus', 'ac_line', 'bus'): 9,
                ('bus', 'transformer', 'bus'): 11,
                # Link edges have no attributes
                ('generator', 'generator_link', 'bus'): None,
                ('bus', 'generator_link', 'generator'): None,
                ('load', 'load_link', 'bus'): None,
                ('bus', 'load_link', 'load'): None,
                ('shunt', 'shunt_link', 'bus'): None,
                ('bus', 'shunt_link', 'shunt'): None,
            }

        def get_conv_layer(edge_type):
            if backend == "sage":
                return SAGEConv((hidden_channels, hidden_channels), hidden_channels)
            elif backend == "gcn":
                return GraphConv((hidden_channels, hidden_channels), hidden_channels)
            elif backend == "gin":
                return GINConv(MLP([hidden_channels, hidden_channels]))
            elif backend == "gat":
                edge_dim = edge_attr_dims.get(edge_type, None)
                return GATConv((hidden_channels, hidden_channels),
                               hidden_channels,
                               add_self_loops=False,
                               edge_dim=edge_dim)
            else:
                raise ValueError(f"Unknown backend: {backend}")

        for _ in range(num_layers - 1):
            conv = HeteroConv({
                edge_type: get_conv_layer(edge_type)
                for edge_type in edge_types
            }, aggr='sum')
            self.convs.append(conv)

        # Output layers for target node types, ACOPF variables
        self.out_dict = torch.nn.ModuleDict({
            "bus": Linear(hidden_channels, out_channels),
            "generator": Linear(hidden_channels, out_channels),
        })

        self.reset_parameters()

    def reset_parameters(self):
        """Reset parameters of the model."""
        for lin in self.lin_dict.values():
            lin.reset_parameters()
        for conv in self.convs:
            for rel_conv in conv.convs.values():
                if hasattr(rel_conv, 'reset_parameters'):
                    rel_conv.reset_parameters()
            conv.reset_parameters()
        for out in self.out_dict.values():
            out.reset_parameters()

    def forward(self, x_dict, edge_index_dict, edge_attr_dict=None, minmax_scaling=False, **kwargs):
        """Forward pass of the HeteroGNN model.

        Args:
            x_dict (dict): Node features for each node type.
            edge_index_dict (dict): Edge indices for each edge type.
            edge_attr_dict (dict, optional): Edge attributes for each edge type. Defaults to None
            minmax_scaling (bool): Whether to apply min-max scaling to outputs. Defaults to False.

        Returns:
            dict: Output predictions for each target node type.
        """

        if minmax_scaling:
            _vmin = x_dict['bus'][:, 1].clone()  # Original voltage min
            _vmax = x_dict['bus'][:, 2].clone()  # Original voltage max
            _pmin = x_dict['generator'][:, 2].clone()  # Original active power min
            _pmax = x_dict['generator'][:, 3].clone()  # Original active power max
            _qmin = x_dict['generator'][:, 5].clone()  # Original reactive power min
            _qmax = x_dict['generator'][:, 6].clone()  # Original reactive power max

        # Transform input features
        x_dict = {
            node_type: F.relu(self.lin_dict[node_type](x))
            for node_type, x in x_dict.items()
        }
        # x_dict = {key: F.dropout(x, p=0.1, training=self.training) for key, x in x_dict.items()}

        # Message passing
        for conv in self.convs:
            if self.edge_attr_support and edge_attr_dict is not None:
                x_dict = conv(x_dict, edge_index_dict, edge_attr_dict=edge_attr_dict)
                x_dict = {key: F.relu(x) for key, x in x_dict.items()}
                x_dict = {key: F.dropout(x, p=0.1, training=self.training) for key, x in x_dict.items()}
            else:
                x_dict = conv(x_dict, edge_index_dict)
                x_dict = {key: F.relu(x) for key, x in x_dict.items()}
                x_dict = {key: F.dropout(x, p=0.1, training=self.training) for key, x in x_dict.items()}
            # NOTE: no activation function applied here <== why?
            # x_dict = {key: F.relu(x) for key, x in x_dict.items()}

        # Final predictions
        # bus_out: va, vm
        bus_out = self.out_dict["bus"](x_dict["bus"])
        # bus_out = F.dropout(bus_out, p=0.1, training=self.training)
        # gen_out: pg, qg
        gen_out = self.out_dict["generator"](x_dict["generator"])
        # gen_out = F.dropout(gen_out, p=0.1, training=self.training)

        if minmax_scaling:
            # Create new tensors instead of modifying in-place
            bus_out_final = bus_out.clone()
            gen_out_final = gen_out.clone()

            # Apply scaling without in-place operations
            bus_out_final[:, 1] = F.sigmoid(bus_out[:, 1]) * (_vmax - _vmin) + _vmin

            gen_out_sigmoid = F.sigmoid(gen_out)
            gen_out_final[:, 0] = gen_out_sigmoid[:, 0] * (_pmax - _pmin) + _pmin
            gen_out_final[:, 1] = gen_out_sigmoid[:, 1] * (_qmax - _qmin) + _qmin

            return {"bus": bus_out_final, "generator": gen_out_final}
        else:
            return {"bus": bus_out, "generator": gen_out}

__init__(metadata, input_channels, hidden_channels=64, out_channels=2, num_layers=3, backend='sage', edge_attr_dim=None, **kwargs)

Heterogeneous Graph Neural Network (HeteroGNN) model for OPF.

Parameters:

Name Type Description Default
metadata dict or tuple

Metadata containing node types and edge types. If dict: {'nodes': {node_type: dim, ...}, 'edges': {edge_type: dim, ...}} If tuple: (node_types, edge_types)

required
input_channels dict

Number of input features for each node type.

required
hidden_channels int

Hidden embedding size.

64
out_channels int

Size of each output sample. Defaults to 2.

2
num_layers int

Number of layers. Defaults to 3.

3
backend str

Graph convolutional layer backend. Defaults to "sage".

'sage'
edge_attr_dim int

Dimension of edge attributes for GAT. Defaults to None.

None
Source code in lumina/model/opf/hetero_model.py
def __init__(
        self,
        metadata,
        input_channels,
        hidden_channels=64,
        out_channels=2,
        num_layers=3,
        backend="sage",
        edge_attr_dim=None,
        **kwargs):
    """ Heterogeneous Graph Neural Network (HeteroGNN) model for OPF.

    Args:
        metadata (dict or tuple): Metadata containing node types and edge types.
            If dict: {'nodes': {node_type: dim, ...}, 'edges': {edge_type: dim, ...}}
            If tuple: (node_types, edge_types)
        input_channels (dict): Number of input features for each node type.
        hidden_channels (int): Hidden embedding size.
        out_channels (int): Size of each output sample. Defaults to 2.
        num_layers (int): Number of layers. Defaults to 3.
        backend (str): Graph convolutional layer backend. Defaults to "sage".
        edge_attr_dim (int, optional): Dimension of edge attributes for GAT. Defaults to None.
    """
    super().__init__()
    self.lin_dict = torch.nn.ModuleDict()

    # Handle both old tuple format and new dict format
    if isinstance(metadata, dict):
        node_types = list(metadata['nodes'].keys())
        edge_types = list(metadata['edges'].keys())
    else:
        # Legacy tuple format: (node_types, edge_types)
        node_types = metadata[0]
        edge_types = metadata[1]

    self.backend = backend
    self.edge_attr_support = backend == "gat"

    # Validate input_channels
    if not isinstance(input_channels, dict):
        raise ValueError("input_channels must be a dictionary")

    # Input layers for each node type - no more lazy initialization
    for node_type in node_types:
        if node_type not in input_channels:
            raise ValueError(f"input_channels must contain entry for node type '{node_type}'")
        self.lin_dict[node_type] = Linear(input_channels[node_type], hidden_channels)

    # Heterogeneous graph convolutional layers for edges
    self.convs = torch.nn.ModuleList()

    # Get edge attribute dimensions from metadata
    if isinstance(metadata, dict):
        edge_attr_dims = {edge_type: (dim if dim > 0 else None)
                          for edge_type, dim in metadata['edges'].items()}
    else:
        # Fallback to hardcoded dimensions for legacy format
        edge_attr_dims = {
            ('bus', 'ac_line', 'bus'): 9,
            ('bus', 'transformer', 'bus'): 11,
            # Link edges have no attributes
            ('generator', 'generator_link', 'bus'): None,
            ('bus', 'generator_link', 'generator'): None,
            ('load', 'load_link', 'bus'): None,
            ('bus', 'load_link', 'load'): None,
            ('shunt', 'shunt_link', 'bus'): None,
            ('bus', 'shunt_link', 'shunt'): None,
        }

    def get_conv_layer(edge_type):
        if backend == "sage":
            return SAGEConv((hidden_channels, hidden_channels), hidden_channels)
        elif backend == "gcn":
            return GraphConv((hidden_channels, hidden_channels), hidden_channels)
        elif backend == "gin":
            return GINConv(MLP([hidden_channels, hidden_channels]))
        elif backend == "gat":
            edge_dim = edge_attr_dims.get(edge_type, None)
            return GATConv((hidden_channels, hidden_channels),
                           hidden_channels,
                           add_self_loops=False,
                           edge_dim=edge_dim)
        else:
            raise ValueError(f"Unknown backend: {backend}")

    for _ in range(num_layers - 1):
        conv = HeteroConv({
            edge_type: get_conv_layer(edge_type)
            for edge_type in edge_types
        }, aggr='sum')
        self.convs.append(conv)

    # Output layers for target node types, ACOPF variables
    self.out_dict = torch.nn.ModuleDict({
        "bus": Linear(hidden_channels, out_channels),
        "generator": Linear(hidden_channels, out_channels),
    })

    self.reset_parameters()

reset_parameters()

Reset parameters of the model.

Source code in lumina/model/opf/hetero_model.py
def reset_parameters(self):
    """Reset parameters of the model."""
    for lin in self.lin_dict.values():
        lin.reset_parameters()
    for conv in self.convs:
        for rel_conv in conv.convs.values():
            if hasattr(rel_conv, 'reset_parameters'):
                rel_conv.reset_parameters()
        conv.reset_parameters()
    for out in self.out_dict.values():
        out.reset_parameters()

forward(x_dict, edge_index_dict, edge_attr_dict=None, minmax_scaling=False, **kwargs)

Forward pass of the HeteroGNN model.

Parameters:

Name Type Description Default
x_dict dict

Node features for each node type.

required
edge_index_dict dict

Edge indices for each edge type.

required
edge_attr_dict dict

Edge attributes for each edge type. Defaults to None

None
minmax_scaling bool

Whether to apply min-max scaling to outputs. Defaults to False.

False

Returns:

Name Type Description
dict

Output predictions for each target node type.

Source code in lumina/model/opf/hetero_model.py
def forward(self, x_dict, edge_index_dict, edge_attr_dict=None, minmax_scaling=False, **kwargs):
    """Forward pass of the HeteroGNN model.

    Args:
        x_dict (dict): Node features for each node type.
        edge_index_dict (dict): Edge indices for each edge type.
        edge_attr_dict (dict, optional): Edge attributes for each edge type. Defaults to None
        minmax_scaling (bool): Whether to apply min-max scaling to outputs. Defaults to False.

    Returns:
        dict: Output predictions for each target node type.
    """

    if minmax_scaling:
        _vmin = x_dict['bus'][:, 1].clone()  # Original voltage min
        _vmax = x_dict['bus'][:, 2].clone()  # Original voltage max
        _pmin = x_dict['generator'][:, 2].clone()  # Original active power min
        _pmax = x_dict['generator'][:, 3].clone()  # Original active power max
        _qmin = x_dict['generator'][:, 5].clone()  # Original reactive power min
        _qmax = x_dict['generator'][:, 6].clone()  # Original reactive power max

    # Transform input features
    x_dict = {
        node_type: F.relu(self.lin_dict[node_type](x))
        for node_type, x in x_dict.items()
    }
    # x_dict = {key: F.dropout(x, p=0.1, training=self.training) for key, x in x_dict.items()}

    # Message passing
    for conv in self.convs:
        if self.edge_attr_support and edge_attr_dict is not None:
            x_dict = conv(x_dict, edge_index_dict, edge_attr_dict=edge_attr_dict)
            x_dict = {key: F.relu(x) for key, x in x_dict.items()}
            x_dict = {key: F.dropout(x, p=0.1, training=self.training) for key, x in x_dict.items()}
        else:
            x_dict = conv(x_dict, edge_index_dict)
            x_dict = {key: F.relu(x) for key, x in x_dict.items()}
            x_dict = {key: F.dropout(x, p=0.1, training=self.training) for key, x in x_dict.items()}
        # NOTE: no activation function applied here <== why?
        # x_dict = {key: F.relu(x) for key, x in x_dict.items()}

    # Final predictions
    # bus_out: va, vm
    bus_out = self.out_dict["bus"](x_dict["bus"])
    # bus_out = F.dropout(bus_out, p=0.1, training=self.training)
    # gen_out: pg, qg
    gen_out = self.out_dict["generator"](x_dict["generator"])
    # gen_out = F.dropout(gen_out, p=0.1, training=self.training)

    if minmax_scaling:
        # Create new tensors instead of modifying in-place
        bus_out_final = bus_out.clone()
        gen_out_final = gen_out.clone()

        # Apply scaling without in-place operations
        bus_out_final[:, 1] = F.sigmoid(bus_out[:, 1]) * (_vmax - _vmin) + _vmin

        gen_out_sigmoid = F.sigmoid(gen_out)
        gen_out_final[:, 0] = gen_out_sigmoid[:, 0] * (_pmax - _pmin) + _pmin
        gen_out_final[:, 1] = gen_out_sigmoid[:, 1] * (_qmax - _qmin) + _qmin

        return {"bus": bus_out_final, "generator": gen_out_final}
    else:
        return {"bus": bus_out, "generator": gen_out}

RGAT

Bases: Module

Relational Graph Attention Network for heterogeneous OPF graphs.

Uses HeteroConv wrapping per-relation GATConv layers with multi-head attention. Each edge type gets its own GATConv parameters.

Input shape per node type: (N_type, input_channels[type]). Output shape: {'bus': (N_bus, out_channels), 'generator': (N_gen, out_channels)}.

Source code in lumina/model/opf/hetero_model.py
class RGAT(torch.nn.Module):
    """Relational Graph Attention Network for heterogeneous OPF graphs.

    Uses ``HeteroConv`` wrapping per-relation ``GATConv`` layers with
    multi-head attention. Each edge type gets its own GATConv parameters.

    Input shape per node type: ``(N_type, input_channels[type])``.
    Output shape: ``{'bus': (N_bus, out_channels), 'generator': (N_gen, out_channels)}``.
    """

    def __init__(self,
                 metadata,
                 input_channels,
                 hidden_channels=64,
                 out_channels=2,
                 num_layers=3,
                 num_heads=1,
                 backend="sage",
                 edge_attr_dim=None,
                 **kwargs):
        r""" Relational Graph Attention Network (RGAT) model.
        Implemented as HeteroConv with GATConv (Relational GAT).

        Args:
            metadata (dict or tuple): Metadata containing node types and edge types.
                If dict: {'nodes': {node_type: dim, ...}, 'edges': {edge_type: dim, ...}}
                If tuple: (node_types, edge_types)
            input_channels (dict): Number of input features for each node type.
            hidden_channels (int): Hidden embedding size.
            out_channels (int): Size of each output sample. Defaults to 2.
            num_layers (int): Number of layers. Defaults to 3.
            num_heads (int): Number of multi-head-attention heads. Defaults to 1.
            backend (str): Graph convolutional layer backend. Defaults to "sage".
            edge_attr_dim (int, optional): Dimension of edge attributes for GAT. Defaults to None.
        """
        super().__init__()

        self.lin_dict = torch.nn.ModuleDict()

        # Handle both old tuple format and new dict format
        if isinstance(metadata, dict):
            node_types = list(metadata['nodes'].keys())
            edge_types = list(metadata['edges'].keys())
        else:
            # Legacy tuple format: (node_types, edge_types)
            node_types = metadata[0]
            edge_types = metadata[1]

        # Validate input_channels
        if not isinstance(input_channels, dict):
            raise ValueError("input_channels must be a dictionary")

        # Input layers for each node type - no more lazy initialization
        for node_type in node_types:
            if node_type not in input_channels:
                raise ValueError(f"input_channels must contain entry for node type '{node_type}'")
            self.lin_dict[node_type] = Linear(input_channels[node_type], hidden_channels)

        self.convs = torch.nn.ModuleList()
        for _ in range(num_layers):
            # Use HeteroConv with GATConv to simulate RGAT (Relational GAT)
            # Each edge type gets its own GATConv parameters
            conv = HeteroConv({
                edge_type: GATConv(
                    hidden_channels,
                    hidden_channels // num_heads,
                    heads=num_heads,
                    add_self_loops=False
                )
                for edge_type in edge_types
            }, aggr='sum')
            self.convs.append(conv)

        # Output layers for target node types
        self.out_dict = torch.nn.ModuleDict({
            "bus": Linear(hidden_channels, out_channels),
            "generator": Linear(hidden_channels, out_channels),
        })

        self.reset_parameters()

    def reset_parameters(self):
        """Reset parameters of the model."""
        for lin in self.lin_dict.values():
            lin.reset_parameters()
        for conv in self.convs:
            if hasattr(conv, 'reset_parameters'):
                conv.reset_parameters()
        for out in self.out_dict.values():
            out.reset_parameters()

    def forward(self, x_dict, edge_index_dict, edge_attr_dict=None, minmax_scaling=False, **kwargs):
        """Forward pass of the RGAT model.

        Args:
            x_dict (dict): Node features for each node type.
            edge_index_dict (dict): Edge indices for each edge type.
            edge_attr_dict (dict, optional): Edge attributes for each edge type. Defaults to None
            minmax_scaling (bool): Whether to apply min-max scaling to outputs. Defaults to False.

        Returns:
            dict: Output predictions for each target node type.
        """

        if minmax_scaling:
            _vmin = x_dict['bus'][:, 1].clone()  # Original voltage min
            _vmax = x_dict['bus'][:, 2].clone()  # Original voltage max
            _pmin = x_dict['generator'][:, 2].clone()  # Original active power min
            _pmax = x_dict['generator'][:, 3].clone()  # Original active power max
            _qmin = x_dict['generator'][:, 5].clone()  # Original reactive power min
            _qmax = x_dict['generator'][:, 6].clone()  # Original reactive power max

        # Transform input features
        x_dict = {
            node_type: F.relu(self.lin_dict[node_type](x))
            for node_type, x in x_dict.items()
        }

        # Message passing
        for conv in self.convs:
            if edge_attr_dict is not None:
                x_dict = conv(x_dict, edge_index_dict, edge_attr_dict)
            else:
                x_dict = conv(x_dict, edge_index_dict)
            x_dict = {key: F.relu(x) for key, x in x_dict.items()}

        # Final predictions
        bus_out = self.out_dict["bus"](x_dict["bus"])
        gen_out = self.out_dict["generator"](x_dict["generator"])

        if minmax_scaling:
            # Create new tensors instead of modifying in-place
            bus_out_final = bus_out.clone()
            gen_out_final = gen_out.clone()

            # Apply scaling without in-place operations
            bus_out_final[:, 1] = F.sigmoid(bus_out[:, 1]) * (_vmax - _vmin) + _vmin

            gen_out_sigmoid = F.sigmoid(gen_out)
            gen_out_final[:, 0] = gen_out_sigmoid[:, 0] * (_pmax - _pmin) + _pmin
            gen_out_final[:, 1] = gen_out_sigmoid[:, 1] * (_qmax - _qmin) + _qmin

            return {"bus": bus_out_final, "generator": gen_out_final}
        else:
            return {"bus": bus_out, "generator": gen_out}

__init__(metadata, input_channels, hidden_channels=64, out_channels=2, num_layers=3, num_heads=1, backend='sage', edge_attr_dim=None, **kwargs)

Relational Graph Attention Network (RGAT) model. Implemented as HeteroConv with GATConv (Relational GAT).

Parameters:

Name Type Description Default
metadata dict or tuple

Metadata containing node types and edge types. If dict: {'nodes': {node_type: dim, ...}, 'edges': {edge_type: dim, ...}} If tuple: (node_types, edge_types)

required
input_channels dict

Number of input features for each node type.

required
hidden_channels int

Hidden embedding size.

64
out_channels int

Size of each output sample. Defaults to 2.

2
num_layers int

Number of layers. Defaults to 3.

3
num_heads int

Number of multi-head-attention heads. Defaults to 1.

1
backend str

Graph convolutional layer backend. Defaults to "sage".

'sage'
edge_attr_dim int

Dimension of edge attributes for GAT. Defaults to None.

None
Source code in lumina/model/opf/hetero_model.py
def __init__(self,
             metadata,
             input_channels,
             hidden_channels=64,
             out_channels=2,
             num_layers=3,
             num_heads=1,
             backend="sage",
             edge_attr_dim=None,
             **kwargs):
    r""" Relational Graph Attention Network (RGAT) model.
    Implemented as HeteroConv with GATConv (Relational GAT).

    Args:
        metadata (dict or tuple): Metadata containing node types and edge types.
            If dict: {'nodes': {node_type: dim, ...}, 'edges': {edge_type: dim, ...}}
            If tuple: (node_types, edge_types)
        input_channels (dict): Number of input features for each node type.
        hidden_channels (int): Hidden embedding size.
        out_channels (int): Size of each output sample. Defaults to 2.
        num_layers (int): Number of layers. Defaults to 3.
        num_heads (int): Number of multi-head-attention heads. Defaults to 1.
        backend (str): Graph convolutional layer backend. Defaults to "sage".
        edge_attr_dim (int, optional): Dimension of edge attributes for GAT. Defaults to None.
    """
    super().__init__()

    self.lin_dict = torch.nn.ModuleDict()

    # Handle both old tuple format and new dict format
    if isinstance(metadata, dict):
        node_types = list(metadata['nodes'].keys())
        edge_types = list(metadata['edges'].keys())
    else:
        # Legacy tuple format: (node_types, edge_types)
        node_types = metadata[0]
        edge_types = metadata[1]

    # Validate input_channels
    if not isinstance(input_channels, dict):
        raise ValueError("input_channels must be a dictionary")

    # Input layers for each node type - no more lazy initialization
    for node_type in node_types:
        if node_type not in input_channels:
            raise ValueError(f"input_channels must contain entry for node type '{node_type}'")
        self.lin_dict[node_type] = Linear(input_channels[node_type], hidden_channels)

    self.convs = torch.nn.ModuleList()
    for _ in range(num_layers):
        # Use HeteroConv with GATConv to simulate RGAT (Relational GAT)
        # Each edge type gets its own GATConv parameters
        conv = HeteroConv({
            edge_type: GATConv(
                hidden_channels,
                hidden_channels // num_heads,
                heads=num_heads,
                add_self_loops=False
            )
            for edge_type in edge_types
        }, aggr='sum')
        self.convs.append(conv)

    # Output layers for target node types
    self.out_dict = torch.nn.ModuleDict({
        "bus": Linear(hidden_channels, out_channels),
        "generator": Linear(hidden_channels, out_channels),
    })

    self.reset_parameters()

reset_parameters()

Reset parameters of the model.

Source code in lumina/model/opf/hetero_model.py
def reset_parameters(self):
    """Reset parameters of the model."""
    for lin in self.lin_dict.values():
        lin.reset_parameters()
    for conv in self.convs:
        if hasattr(conv, 'reset_parameters'):
            conv.reset_parameters()
    for out in self.out_dict.values():
        out.reset_parameters()

forward(x_dict, edge_index_dict, edge_attr_dict=None, minmax_scaling=False, **kwargs)

Forward pass of the RGAT model.

Parameters:

Name Type Description Default
x_dict dict

Node features for each node type.

required
edge_index_dict dict

Edge indices for each edge type.

required
edge_attr_dict dict

Edge attributes for each edge type. Defaults to None

None
minmax_scaling bool

Whether to apply min-max scaling to outputs. Defaults to False.

False

Returns:

Name Type Description
dict

Output predictions for each target node type.

Source code in lumina/model/opf/hetero_model.py
def forward(self, x_dict, edge_index_dict, edge_attr_dict=None, minmax_scaling=False, **kwargs):
    """Forward pass of the RGAT model.

    Args:
        x_dict (dict): Node features for each node type.
        edge_index_dict (dict): Edge indices for each edge type.
        edge_attr_dict (dict, optional): Edge attributes for each edge type. Defaults to None
        minmax_scaling (bool): Whether to apply min-max scaling to outputs. Defaults to False.

    Returns:
        dict: Output predictions for each target node type.
    """

    if minmax_scaling:
        _vmin = x_dict['bus'][:, 1].clone()  # Original voltage min
        _vmax = x_dict['bus'][:, 2].clone()  # Original voltage max
        _pmin = x_dict['generator'][:, 2].clone()  # Original active power min
        _pmax = x_dict['generator'][:, 3].clone()  # Original active power max
        _qmin = x_dict['generator'][:, 5].clone()  # Original reactive power min
        _qmax = x_dict['generator'][:, 6].clone()  # Original reactive power max

    # Transform input features
    x_dict = {
        node_type: F.relu(self.lin_dict[node_type](x))
        for node_type, x in x_dict.items()
    }

    # Message passing
    for conv in self.convs:
        if edge_attr_dict is not None:
            x_dict = conv(x_dict, edge_index_dict, edge_attr_dict)
        else:
            x_dict = conv(x_dict, edge_index_dict)
        x_dict = {key: F.relu(x) for key, x in x_dict.items()}

    # Final predictions
    bus_out = self.out_dict["bus"](x_dict["bus"])
    gen_out = self.out_dict["generator"](x_dict["generator"])

    if minmax_scaling:
        # Create new tensors instead of modifying in-place
        bus_out_final = bus_out.clone()
        gen_out_final = gen_out.clone()

        # Apply scaling without in-place operations
        bus_out_final[:, 1] = F.sigmoid(bus_out[:, 1]) * (_vmax - _vmin) + _vmin

        gen_out_sigmoid = F.sigmoid(gen_out)
        gen_out_final[:, 0] = gen_out_sigmoid[:, 0] * (_pmax - _pmin) + _pmin
        gen_out_final[:, 1] = gen_out_sigmoid[:, 1] * (_qmax - _qmin) + _qmin

        return {"bus": bus_out_final, "generator": gen_out_final}
    else:
        return {"bus": bus_out, "generator": gen_out}

HEAT

Bases: Module

Heterogeneous Edge-Attributed Transformer for OPF graphs.

Uses HEATConv on a homogeneous view of the heterogeneous graph. Node and edge features are projected to hidden_channels before being passed through HEATConv layers that incorporate edge type and edge attribute embeddings.

Input shape per node type: (N_type, input_channels[type]). Output shape: {'bus': (N_bus, out_channels), 'generator': (N_gen, out_channels)}.

Source code in lumina/model/opf/hetero_model.py
class HEAT(torch.nn.Module):
    """Heterogeneous Edge-Attributed Transformer for OPF graphs.

    Uses ``HEATConv`` on a homogeneous view of the heterogeneous graph.
    Node and edge features are projected to ``hidden_channels`` before
    being passed through HEATConv layers that incorporate edge type and
    edge attribute embeddings.

    Input shape per node type: ``(N_type, input_channels[type])``.
    Output shape: ``{'bus': (N_bus, out_channels), 'generator': (N_gen, out_channels)}``.
    """

    def __init__(
            self,
            metadata,
            input_channels,
            hidden_channels=64,
            out_channels=2,
            num_layers=3,
            attention_heads=1,
            edge_type_emb_dim=16,
            edge_attr_emb_dim=16,
            backend="sage",
            edge_attr_dim=None,
            **kwargs):
        """Initialize the HEAT model.

        Notes:
          - Externally, this model keeps the "hetero" forward signature:
              forward(x_dict, edge_index_dict, edge_attr_dict=None, ...)
          - Internally, torch_geometric.nn.HEATConv operates on a *homogeneous* view.
            We therefore build a temporary HeteroData and call to_homogeneous().
          - If edge_attr_dict is not provided by the caller (as in train_opf.py today),
            this model will still run by creating zero edge attributes for all relations.
        """
        super().__init__()

        self.lin_dict = torch.nn.ModuleDict()
        self.edge_lin_dict = torch.nn.ModuleDict()

        # Handle both old tuple format and new dict format
        if isinstance(metadata, dict):
            self.node_types = list(metadata['nodes'].keys())
            self.edge_types = list(metadata['edges'].keys())
            edge_attr_dims = dict(metadata['edges'])
        else:
            # Legacy tuple format: (node_types, edge_types)
            self.node_types = metadata[0]
            self.edge_types = metadata[1]
            edge_attr_dims = {}

        # Validate input_channels
        if not isinstance(input_channels, dict):
            raise ValueError("input_channels must be a dictionary")

        # Node input projections
        for node_type in self.node_types:
            if node_type not in input_channels:
                raise ValueError(f"input_channels must contain entry for node type '{node_type}'")
            self.lin_dict[node_type] = Linear(input_channels[node_type], hidden_channels)

        for edge_type in self.edge_types:
            dim = edge_attr_dims.get(edge_type, 0)
            if dim and dim > 0:
                self.edge_lin_dict[str(edge_type)] = Linear(dim, hidden_channels)

        self._heat_edge_dim = hidden_channels

        self.convs = torch.nn.ModuleList()
        for _ in range(num_layers):
            conv = HEATConv(
                in_channels=-1,
                out_channels=hidden_channels,
                num_node_types=len(self.node_types),
                num_edge_types=len(self.edge_types),
                edge_type_emb_dim=edge_type_emb_dim,
                edge_dim=self._heat_edge_dim,
                edge_attr_emb_dim=edge_attr_emb_dim,
                heads=attention_heads,
                concat=False,
            )
            self.convs.append(conv)

        # Output heads (targets)
        self.out_dict = torch.nn.ModuleDict({
            "bus": Linear(hidden_channels, out_channels),
            "generator": Linear(hidden_channels, out_channels),
        })

        self.reset_parameters()

    def reset_parameters(self):
        for lin in self.lin_dict.values():
            lin.reset_parameters()
        for lin in self.edge_lin_dict.values():
            lin.reset_parameters()
        for conv in self.convs:
            if hasattr(conv, 'reset_parameters'):
                conv.reset_parameters()
        for out in self.out_dict.values():
            out.reset_parameters()

    def forward(self, x_dict, edge_index_dict, edge_attr_dict=None, minmax_scaling=False, **kwargs):
        """Forward pass of the HEAT model.

        Converts the heterogeneous graph to a homogeneous view internally
        for HEATConv message passing, then maps outputs back to node types.

        Args:
            x_dict (dict): Node features for each node type.
            edge_index_dict (dict): Edge indices for each edge type.
            edge_attr_dict (dict, optional): Edge attributes for each edge
                type. Zero-padded if not provided. Defaults to None.
            minmax_scaling (bool): Whether to apply min-max scaling to
                outputs. Defaults to False.

        Returns:
            dict: Output predictions ``{'bus': Tensor, 'generator': Tensor}``.
        """
        if minmax_scaling:
            _vmin = x_dict['bus'][:, 1].clone()
            _vmax = x_dict['bus'][:, 2].clone()
            _pmin = x_dict['generator'][:, 2].clone()
            _pmax = x_dict['generator'][:, 3].clone()
            _qmin = x_dict['generator'][:, 5].clone()
            _qmax = x_dict['generator'][:, 6].clone()

        # Project node features to hidden_channels
        x_dict = {
            node_type: F.relu(self.lin_dict[node_type](x))
            for node_type, x in x_dict.items()
        }

        # Project edge attributes (if provided) to hidden_channels
        projected_edge_attr_dict = {}
        if edge_attr_dict is not None:
            for edge_type, edge_attr in edge_attr_dict.items():
                key = str(edge_type)
                if key in self.edge_lin_dict:
                    projected_edge_attr_dict[edge_type] = F.relu(self.edge_lin_dict[key](edge_attr))

        # HEATConv interface in PyG is not compatible, we have to construct a temporary homogeneous dataset object
        hdata = HeteroData()

        for node_type in self.node_types:
            if node_type not in x_dict:
                raise ValueError(f"Missing node type '{node_type}' in x_dict")
            hdata[node_type].x = x_dict[node_type]

        # Ensure every relation exists, and every relation has edge_attr aligned to edge_index
        for edge_type in self.edge_types:
            if edge_type in edge_index_dict:
                ei = edge_index_dict[edge_type]
            else:
                device = hdata[self.node_types[0]].x.device
                ei = torch.empty((2, 0), dtype=torch.long, device=device)

            hdata[edge_type].edge_index = ei
            num_edges = ei.size(1)
            device = ei.device

            if edge_type in projected_edge_attr_dict:
                ea = projected_edge_attr_dict[edge_type]
                if ea.size(0) != num_edges:
                    raise ValueError(
                        f"edge_attr rows ({ea.size(0)}) must match num_edges ({num_edges}) for edge_type={edge_type}"
                    )
                hdata[edge_type].edge_attr = ea
            else:
                # padding for missing edge attrs
                hdata[edge_type].edge_attr = torch.zeros(
                    (num_edges, self._heat_edge_dim),
                    dtype=hdata[self.node_types[0]].x.dtype,
                    device=device,
                )

        homo = hdata.to_homogeneous(node_attrs=['x'], edge_attrs=['edge_attr'])

        x = homo.x
        edge_index = homo.edge_index
        node_type = homo.node_type
        edge_type = homo.edge_type
        edge_attr = homo.edge_attr

        # Message passing (homogeneous)
        for conv in self.convs:
            x = conv(x, edge_index, node_type, edge_type, edge_attr)
            x = F.relu(x)

        node_type_to_id = {nt: i for i, nt in enumerate(self.node_types)}
        bus_x = x[node_type == node_type_to_id["bus"]]
        gen_x = x[node_type == node_type_to_id["generator"]]

        bus_out = self.out_dict["bus"](bus_x)
        gen_out = self.out_dict["generator"](gen_x)

        if minmax_scaling:
            bus_out_final = bus_out.clone()
            gen_out_final = gen_out.clone()

            bus_out_final[:, 1] = torch.sigmoid(bus_out[:, 1]) * (_vmax - _vmin) + _vmin

            gen_out_sigmoid = torch.sigmoid(gen_out)
            gen_out_final[:, 0] = gen_out_sigmoid[:, 0] * (_pmax - _pmin) + _pmin
            gen_out_final[:, 1] = gen_out_sigmoid[:, 1] * (_qmax - _qmin) + _qmin
            return {"bus": bus_out_final, "generator": gen_out_final}

        return {"bus": bus_out, "generator": gen_out}

__init__(metadata, input_channels, hidden_channels=64, out_channels=2, num_layers=3, attention_heads=1, edge_type_emb_dim=16, edge_attr_emb_dim=16, backend='sage', edge_attr_dim=None, **kwargs)

Initialize the HEAT model.

Notes
  • Externally, this model keeps the "hetero" forward signature: forward(x_dict, edge_index_dict, edge_attr_dict=None, ...)
  • Internally, torch_geometric.nn.HEATConv operates on a homogeneous view. We therefore build a temporary HeteroData and call to_homogeneous().
  • If edge_attr_dict is not provided by the caller (as in train_opf.py today), this model will still run by creating zero edge attributes for all relations.
Source code in lumina/model/opf/hetero_model.py
def __init__(
        self,
        metadata,
        input_channels,
        hidden_channels=64,
        out_channels=2,
        num_layers=3,
        attention_heads=1,
        edge_type_emb_dim=16,
        edge_attr_emb_dim=16,
        backend="sage",
        edge_attr_dim=None,
        **kwargs):
    """Initialize the HEAT model.

    Notes:
      - Externally, this model keeps the "hetero" forward signature:
          forward(x_dict, edge_index_dict, edge_attr_dict=None, ...)
      - Internally, torch_geometric.nn.HEATConv operates on a *homogeneous* view.
        We therefore build a temporary HeteroData and call to_homogeneous().
      - If edge_attr_dict is not provided by the caller (as in train_opf.py today),
        this model will still run by creating zero edge attributes for all relations.
    """
    super().__init__()

    self.lin_dict = torch.nn.ModuleDict()
    self.edge_lin_dict = torch.nn.ModuleDict()

    # Handle both old tuple format and new dict format
    if isinstance(metadata, dict):
        self.node_types = list(metadata['nodes'].keys())
        self.edge_types = list(metadata['edges'].keys())
        edge_attr_dims = dict(metadata['edges'])
    else:
        # Legacy tuple format: (node_types, edge_types)
        self.node_types = metadata[0]
        self.edge_types = metadata[1]
        edge_attr_dims = {}

    # Validate input_channels
    if not isinstance(input_channels, dict):
        raise ValueError("input_channels must be a dictionary")

    # Node input projections
    for node_type in self.node_types:
        if node_type not in input_channels:
            raise ValueError(f"input_channels must contain entry for node type '{node_type}'")
        self.lin_dict[node_type] = Linear(input_channels[node_type], hidden_channels)

    for edge_type in self.edge_types:
        dim = edge_attr_dims.get(edge_type, 0)
        if dim and dim > 0:
            self.edge_lin_dict[str(edge_type)] = Linear(dim, hidden_channels)

    self._heat_edge_dim = hidden_channels

    self.convs = torch.nn.ModuleList()
    for _ in range(num_layers):
        conv = HEATConv(
            in_channels=-1,
            out_channels=hidden_channels,
            num_node_types=len(self.node_types),
            num_edge_types=len(self.edge_types),
            edge_type_emb_dim=edge_type_emb_dim,
            edge_dim=self._heat_edge_dim,
            edge_attr_emb_dim=edge_attr_emb_dim,
            heads=attention_heads,
            concat=False,
        )
        self.convs.append(conv)

    # Output heads (targets)
    self.out_dict = torch.nn.ModuleDict({
        "bus": Linear(hidden_channels, out_channels),
        "generator": Linear(hidden_channels, out_channels),
    })

    self.reset_parameters()

forward(x_dict, edge_index_dict, edge_attr_dict=None, minmax_scaling=False, **kwargs)

Forward pass of the HEAT model.

Converts the heterogeneous graph to a homogeneous view internally for HEATConv message passing, then maps outputs back to node types.

Parameters:

Name Type Description Default
x_dict dict

Node features for each node type.

required
edge_index_dict dict

Edge indices for each edge type.

required
edge_attr_dict dict

Edge attributes for each edge type. Zero-padded if not provided. Defaults to None.

None
minmax_scaling bool

Whether to apply min-max scaling to outputs. Defaults to False.

False

Returns:

Name Type Description
dict

Output predictions {'bus': Tensor, 'generator': Tensor}.

Source code in lumina/model/opf/hetero_model.py
def forward(self, x_dict, edge_index_dict, edge_attr_dict=None, minmax_scaling=False, **kwargs):
    """Forward pass of the HEAT model.

    Converts the heterogeneous graph to a homogeneous view internally
    for HEATConv message passing, then maps outputs back to node types.

    Args:
        x_dict (dict): Node features for each node type.
        edge_index_dict (dict): Edge indices for each edge type.
        edge_attr_dict (dict, optional): Edge attributes for each edge
            type. Zero-padded if not provided. Defaults to None.
        minmax_scaling (bool): Whether to apply min-max scaling to
            outputs. Defaults to False.

    Returns:
        dict: Output predictions ``{'bus': Tensor, 'generator': Tensor}``.
    """
    if minmax_scaling:
        _vmin = x_dict['bus'][:, 1].clone()
        _vmax = x_dict['bus'][:, 2].clone()
        _pmin = x_dict['generator'][:, 2].clone()
        _pmax = x_dict['generator'][:, 3].clone()
        _qmin = x_dict['generator'][:, 5].clone()
        _qmax = x_dict['generator'][:, 6].clone()

    # Project node features to hidden_channels
    x_dict = {
        node_type: F.relu(self.lin_dict[node_type](x))
        for node_type, x in x_dict.items()
    }

    # Project edge attributes (if provided) to hidden_channels
    projected_edge_attr_dict = {}
    if edge_attr_dict is not None:
        for edge_type, edge_attr in edge_attr_dict.items():
            key = str(edge_type)
            if key in self.edge_lin_dict:
                projected_edge_attr_dict[edge_type] = F.relu(self.edge_lin_dict[key](edge_attr))

    # HEATConv interface in PyG is not compatible, we have to construct a temporary homogeneous dataset object
    hdata = HeteroData()

    for node_type in self.node_types:
        if node_type not in x_dict:
            raise ValueError(f"Missing node type '{node_type}' in x_dict")
        hdata[node_type].x = x_dict[node_type]

    # Ensure every relation exists, and every relation has edge_attr aligned to edge_index
    for edge_type in self.edge_types:
        if edge_type in edge_index_dict:
            ei = edge_index_dict[edge_type]
        else:
            device = hdata[self.node_types[0]].x.device
            ei = torch.empty((2, 0), dtype=torch.long, device=device)

        hdata[edge_type].edge_index = ei
        num_edges = ei.size(1)
        device = ei.device

        if edge_type in projected_edge_attr_dict:
            ea = projected_edge_attr_dict[edge_type]
            if ea.size(0) != num_edges:
                raise ValueError(
                    f"edge_attr rows ({ea.size(0)}) must match num_edges ({num_edges}) for edge_type={edge_type}"
                )
            hdata[edge_type].edge_attr = ea
        else:
            # padding for missing edge attrs
            hdata[edge_type].edge_attr = torch.zeros(
                (num_edges, self._heat_edge_dim),
                dtype=hdata[self.node_types[0]].x.dtype,
                device=device,
            )

    homo = hdata.to_homogeneous(node_attrs=['x'], edge_attrs=['edge_attr'])

    x = homo.x
    edge_index = homo.edge_index
    node_type = homo.node_type
    edge_type = homo.edge_type
    edge_attr = homo.edge_attr

    # Message passing (homogeneous)
    for conv in self.convs:
        x = conv(x, edge_index, node_type, edge_type, edge_attr)
        x = F.relu(x)

    node_type_to_id = {nt: i for i, nt in enumerate(self.node_types)}
    bus_x = x[node_type == node_type_to_id["bus"]]
    gen_x = x[node_type == node_type_to_id["generator"]]

    bus_out = self.out_dict["bus"](bus_x)
    gen_out = self.out_dict["generator"](gen_x)

    if minmax_scaling:
        bus_out_final = bus_out.clone()
        gen_out_final = gen_out.clone()

        bus_out_final[:, 1] = torch.sigmoid(bus_out[:, 1]) * (_vmax - _vmin) + _vmin

        gen_out_sigmoid = torch.sigmoid(gen_out)
        gen_out_final[:, 0] = gen_out_sigmoid[:, 0] * (_pmax - _pmin) + _pmin
        gen_out_final[:, 1] = gen_out_sigmoid[:, 1] * (_qmax - _qmin) + _qmin
        return {"bus": bus_out_final, "generator": gen_out_final}

    return {"bus": bus_out, "generator": gen_out}

HGT

Bases: Module

Heterogeneous Graph Transformer for OPF graphs.

Uses HGTConv layers with multi-head attention and optional dropout. Each node type and edge type receives type-specific attention weights following the HGT architecture.

Input shape per node type: (N_type, input_channels[type]). Output shape: {'bus': (N_bus, out_channels), 'generator': (N_gen, out_channels)}.

Source code in lumina/model/opf/hetero_model.py
class HGT(torch.nn.Module):
    """Heterogeneous Graph Transformer for OPF graphs.

    Uses ``HGTConv`` layers with multi-head attention and optional dropout.
    Each node type and edge type receives type-specific attention weights
    following the HGT architecture.

    Input shape per node type: ``(N_type, input_channels[type])``.
    Output shape: ``{'bus': (N_bus, out_channels), 'generator': (N_gen, out_channels)}``.
    """

    def __init__(self,
                 metadata,
                 input_channels,
                 hidden_channels=64,
                 out_channels=2,
                 num_layers=3,
                 num_heads=1,
                 dropout=0.0,
                 backend="sage",
                 edge_attr_dim=None,
                 **kwargs):
        r""" Heterogeneous Graph Transformer (HGT) model.

        Args:
            metadata (dict or tuple): Metadata containing node types and edge types.
                If dict: {'nodes': {node_type: dim, ...}, 'edges': {edge_type: dim, ...}}
                If tuple: (node_types, edge_types)
            input_channels (dict): Number of input features for each node type.
            hidden_channels (int): Hidden embedding size.
            out_channels (int): Size of each output sample. Defaults to 2.
            num_layers (int): Number of layers. Defaults to 3.
            num_heads (int): Number of multi-head-attention heads. Defaults to 1.
            backend (str): Graph convolutional layer backend. Defaults to "sage".
            edge_attr_dim (int, optional): Dimension of edge attributes for GAT. Defaults to None.
        """
        super().__init__()

        self.lin_dict = torch.nn.ModuleDict()
        self.dropout = float(dropout)

        # Handle both old tuple format and new dict format
        if isinstance(metadata, dict):
            self.node_types = list(metadata['nodes'].keys())
            self.edge_types = list(metadata['edges'].keys())
        else:
            # Legacy tuple format: (node_types, edge_types)
            self.node_types = metadata[0]
            self.edge_types = metadata[1]

        # Validate input_channels
        if not isinstance(input_channels, dict):
            raise ValueError("input_channels must be a dictionary")

        # Input layers for each node type - no more lazy initialization
        for node_type in self.node_types:
            if node_type not in input_channels:
                raise ValueError(f"input_channels must contain entry for node type '{node_type}'")
            self.lin_dict[node_type] = Linear(input_channels[node_type], hidden_channels)

        metadata_tuple = (self.node_types, self.edge_types) if isinstance(metadata, dict) else metadata
        self.convs = torch.nn.ModuleList()
        for _ in range(num_layers):
            conv = HGTConv(hidden_channels, hidden_channels, metadata_tuple, num_heads)
            self.convs.append(conv)

        # Output layers for target node types
        self.out_dict = torch.nn.ModuleDict({
            "bus": Linear(hidden_channels, out_channels),
            "generator": Linear(hidden_channels, out_channels),
        })

        self.reset_parameters()

    def reset_parameters(self):
        """Reset parameters of the model."""
        for lin in self.lin_dict.values():
            lin.reset_parameters()
        for conv in self.convs:
            if hasattr(conv, 'reset_parameters'):
                conv.reset_parameters()
        for out in self.out_dict.values():
            out.reset_parameters()

    def forward(self, x_dict, edge_index_dict, edge_attr_dict=None, minmax_scaling=False, **kwargs):
        """Forward pass of the HGT model.

        Args:
            x_dict (dict): Node features for each node type.
            edge_index_dict (dict): Edge indices for each edge type.
            edge_attr_dict (dict, optional): Edge attributes for each edge type. Defaults to None
            minmax_scaling (bool): Whether to apply min-max scaling to outputs. Defaults to False.

        Returns:
            dict: Output predictions for each target node type.
        """

        if minmax_scaling:
            _vmin = x_dict['bus'][:, 1].clone()  # Original voltage min
            _vmax = x_dict['bus'][:, 2].clone()  # Original voltage max
            _pmin = x_dict['generator'][:, 2].clone()  # Original active power min
            _pmax = x_dict['generator'][:, 3].clone()  # Original active power max
            _qmin = x_dict['generator'][:, 5].clone()  # Original reactive power min
            _qmax = x_dict['generator'][:, 6].clone()  # Original reactive power max

        # Transform input features
        x_dict = {
            node_type: torch.relu(self.lin_dict[node_type](x_dict[node_type]))
            for node_type in self.node_types
        }
        if self.dropout > 0.0:
            x_dict = {
                key: F.dropout(x, p=self.dropout, training=self.training)
                for key, x in x_dict.items()
            }

        # Message passing
        for conv in self.convs:
            x_dict = conv(x_dict, edge_index_dict)
            x_dict = {key: F.relu(x) for key, x in x_dict.items()}
            if self.dropout > 0.0:
                x_dict = {
                    key: F.dropout(x, p=self.dropout, training=self.training)
                    for key, x in x_dict.items()
                }

        # Final predictions
        bus_out = self.out_dict["bus"](x_dict["bus"])
        gen_out = self.out_dict["generator"](x_dict["generator"])

        if minmax_scaling:
            # Create new tensors instead of modifying in-place
            bus_out_final = bus_out.clone()
            gen_out_final = gen_out.clone()

            # Apply scaling without in-place operations
            bus_out_final[:, 1] = F.sigmoid(bus_out[:, 1]) * (_vmax - _vmin) + _vmin

            gen_out_sigmoid = F.sigmoid(gen_out)
            gen_out_final[:, 0] = gen_out_sigmoid[:, 0] * (_pmax - _pmin) + _pmin
            gen_out_final[:, 1] = gen_out_sigmoid[:, 1] * (_qmax - _qmin) + _qmin

            return {"bus": bus_out_final, "generator": gen_out_final}
        else:
            return {"bus": bus_out, "generator": gen_out}

__init__(metadata, input_channels, hidden_channels=64, out_channels=2, num_layers=3, num_heads=1, dropout=0.0, backend='sage', edge_attr_dim=None, **kwargs)

Heterogeneous Graph Transformer (HGT) model.

Parameters:

Name Type Description Default
metadata dict or tuple

Metadata containing node types and edge types. If dict: {'nodes': {node_type: dim, ...}, 'edges': {edge_type: dim, ...}} If tuple: (node_types, edge_types)

required
input_channels dict

Number of input features for each node type.

required
hidden_channels int

Hidden embedding size.

64
out_channels int

Size of each output sample. Defaults to 2.

2
num_layers int

Number of layers. Defaults to 3.

3
num_heads int

Number of multi-head-attention heads. Defaults to 1.

1
backend str

Graph convolutional layer backend. Defaults to "sage".

'sage'
edge_attr_dim int

Dimension of edge attributes for GAT. Defaults to None.

None
Source code in lumina/model/opf/hetero_model.py
def __init__(self,
             metadata,
             input_channels,
             hidden_channels=64,
             out_channels=2,
             num_layers=3,
             num_heads=1,
             dropout=0.0,
             backend="sage",
             edge_attr_dim=None,
             **kwargs):
    r""" Heterogeneous Graph Transformer (HGT) model.

    Args:
        metadata (dict or tuple): Metadata containing node types and edge types.
            If dict: {'nodes': {node_type: dim, ...}, 'edges': {edge_type: dim, ...}}
            If tuple: (node_types, edge_types)
        input_channels (dict): Number of input features for each node type.
        hidden_channels (int): Hidden embedding size.
        out_channels (int): Size of each output sample. Defaults to 2.
        num_layers (int): Number of layers. Defaults to 3.
        num_heads (int): Number of multi-head-attention heads. Defaults to 1.
        backend (str): Graph convolutional layer backend. Defaults to "sage".
        edge_attr_dim (int, optional): Dimension of edge attributes for GAT. Defaults to None.
    """
    super().__init__()

    self.lin_dict = torch.nn.ModuleDict()
    self.dropout = float(dropout)

    # Handle both old tuple format and new dict format
    if isinstance(metadata, dict):
        self.node_types = list(metadata['nodes'].keys())
        self.edge_types = list(metadata['edges'].keys())
    else:
        # Legacy tuple format: (node_types, edge_types)
        self.node_types = metadata[0]
        self.edge_types = metadata[1]

    # Validate input_channels
    if not isinstance(input_channels, dict):
        raise ValueError("input_channels must be a dictionary")

    # Input layers for each node type - no more lazy initialization
    for node_type in self.node_types:
        if node_type not in input_channels:
            raise ValueError(f"input_channels must contain entry for node type '{node_type}'")
        self.lin_dict[node_type] = Linear(input_channels[node_type], hidden_channels)

    metadata_tuple = (self.node_types, self.edge_types) if isinstance(metadata, dict) else metadata
    self.convs = torch.nn.ModuleList()
    for _ in range(num_layers):
        conv = HGTConv(hidden_channels, hidden_channels, metadata_tuple, num_heads)
        self.convs.append(conv)

    # Output layers for target node types
    self.out_dict = torch.nn.ModuleDict({
        "bus": Linear(hidden_channels, out_channels),
        "generator": Linear(hidden_channels, out_channels),
    })

    self.reset_parameters()

reset_parameters()

Reset parameters of the model.

Source code in lumina/model/opf/hetero_model.py
def reset_parameters(self):
    """Reset parameters of the model."""
    for lin in self.lin_dict.values():
        lin.reset_parameters()
    for conv in self.convs:
        if hasattr(conv, 'reset_parameters'):
            conv.reset_parameters()
    for out in self.out_dict.values():
        out.reset_parameters()

forward(x_dict, edge_index_dict, edge_attr_dict=None, minmax_scaling=False, **kwargs)

Forward pass of the HGT model.

Parameters:

Name Type Description Default
x_dict dict

Node features for each node type.

required
edge_index_dict dict

Edge indices for each edge type.

required
edge_attr_dict dict

Edge attributes for each edge type. Defaults to None

None
minmax_scaling bool

Whether to apply min-max scaling to outputs. Defaults to False.

False

Returns:

Name Type Description
dict

Output predictions for each target node type.

Source code in lumina/model/opf/hetero_model.py
def forward(self, x_dict, edge_index_dict, edge_attr_dict=None, minmax_scaling=False, **kwargs):
    """Forward pass of the HGT model.

    Args:
        x_dict (dict): Node features for each node type.
        edge_index_dict (dict): Edge indices for each edge type.
        edge_attr_dict (dict, optional): Edge attributes for each edge type. Defaults to None
        minmax_scaling (bool): Whether to apply min-max scaling to outputs. Defaults to False.

    Returns:
        dict: Output predictions for each target node type.
    """

    if minmax_scaling:
        _vmin = x_dict['bus'][:, 1].clone()  # Original voltage min
        _vmax = x_dict['bus'][:, 2].clone()  # Original voltage max
        _pmin = x_dict['generator'][:, 2].clone()  # Original active power min
        _pmax = x_dict['generator'][:, 3].clone()  # Original active power max
        _qmin = x_dict['generator'][:, 5].clone()  # Original reactive power min
        _qmax = x_dict['generator'][:, 6].clone()  # Original reactive power max

    # Transform input features
    x_dict = {
        node_type: torch.relu(self.lin_dict[node_type](x_dict[node_type]))
        for node_type in self.node_types
    }
    if self.dropout > 0.0:
        x_dict = {
            key: F.dropout(x, p=self.dropout, training=self.training)
            for key, x in x_dict.items()
        }

    # Message passing
    for conv in self.convs:
        x_dict = conv(x_dict, edge_index_dict)
        x_dict = {key: F.relu(x) for key, x in x_dict.items()}
        if self.dropout > 0.0:
            x_dict = {
                key: F.dropout(x, p=self.dropout, training=self.training)
                for key, x in x_dict.items()
            }

    # Final predictions
    bus_out = self.out_dict["bus"](x_dict["bus"])
    gen_out = self.out_dict["generator"](x_dict["generator"])

    if minmax_scaling:
        # Create new tensors instead of modifying in-place
        bus_out_final = bus_out.clone()
        gen_out_final = gen_out.clone()

        # Apply scaling without in-place operations
        bus_out_final[:, 1] = F.sigmoid(bus_out[:, 1]) * (_vmax - _vmin) + _vmin

        gen_out_sigmoid = F.sigmoid(gen_out)
        gen_out_final[:, 0] = gen_out_sigmoid[:, 0] * (_pmax - _pmin) + _pmin
        gen_out_final[:, 1] = gen_out_sigmoid[:, 1] * (_qmax - _qmin) + _qmin

        return {"bus": bus_out_final, "generator": gen_out_final}
    else:
        return {"bus": bus_out, "generator": gen_out}

OPF Homogeneous Models

GNN_basic

Bases: GNNBase

Base homogeneous GNN with NNConv layers and an MLP head.

Provides the default get_layers, forward, get_emb, and loss methods. Subclasses (GAT, GCN, GIN, TRANSFORMER) override get_layers and optionally get_emb to swap the convolution type.

Parameters:

Name Type Description Default
input_dim int

Number of input node features.

required
output_dim int

Number of output features per node.

required
model_params dict

Dictionary containing 'edge_dim', 'num_layers', 'hidden_dim', 'dropout', and 'readout' keys.

required
Source code in lumina/model/opf/homo_model.py
class GNN_basic(GNNBase):
    """Base homogeneous GNN with NNConv layers and an MLP head.

    Provides the default ``get_layers``, ``forward``, ``get_emb``, and
    ``loss`` methods. Subclasses (GAT, GCN, GIN, TRANSFORMER) override
    ``get_layers`` and optionally ``get_emb`` to swap the convolution type.

    Args:
        input_dim (int): Number of input node features.
        output_dim (int): Number of output features per node.
        model_params (dict): Dictionary containing ``'edge_dim'``,
            ``'num_layers'``, ``'hidden_dim'``, ``'dropout'``, and
            ``'readout'`` keys.
    """

    def __init__(self,
                 input_dim,
                 output_dim,
                 model_params,
                 ):
        super(GNN_basic, self).__init__()  # edge_dim)
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.edge_dim = model_params["edge_dim"]
        self.num_layers = model_params["num_layers"]
        self.hidden_dim = model_params["hidden_dim"]
        self.dropout = model_params["dropout"]
        # readout
        self.readout = model_params["readout"]
        # self.readout_layer = GNNPool(self.readout)
        # self.default_num_nodes = model_params["default_num_nodes"]
        self.get_layers()

    def get_layers(self):
        # GNN layers
        self.convs = nn.ModuleList()
        current_dim = self.input_dim
        for layer in range(self.num_layers):
            self.convs.append(NNConv(current_dim, self.hidden_dim))
            current_dim = self.hidden_dim
        # FC layers
        mlp_dim = current_dim * 2 if self.readout == 'cat_max_sum' else current_dim
        self.mlps = nn.Sequential(
            nn.Linear(mlp_dim, mlp_dim),
            nn.ReLU(),
            nn.Linear(mlp_dim, self.output_dim))
        return

    def forward(self, *args, **kwargs):
        r""" Forward pass of GNN model.
        """
        _, _, _, _, batch = self._argsparse(*args, **kwargs)
        # node embedding for GNN
        emb = self.get_emb(*args, **kwargs)
        # logits of ACOPF variables
        self.logits = self.mlps(emb)
        return self.logits

    def loss(self, pred, label):
        r""" Mean Squared Error Loss

        Args:
            pred (torch.Tensor): Predicted values.
            label (torch.Tensor): True values.

        Returns:
            torch.Tensor: Mean Squared Error Loss.
        """
        return F.mse_loss(pred, label)

    def get_emb(self, *args, **kwargs):
        x, edge_index, edge_attr, edge_weight, _ = self._argsparse(*args, **kwargs)

        for layer in self.convs:
            x = layer(x, edge_index, edge_attr * edge_weight[:, None])
            x = F.relu(x)
            x = F.dropout(x, self.dropout, training=self.training)
        return x

    # def get_graph_rep(self, *args, **kwargs):
    #     x, edge_index, edge_attr, edge_weight, batch = self._argsparse(*args, **kwargs)
    #     for layer in self.convs:
    #         x = layer(x, edge_index, edge_attr * edge_weight[:, None])
    #         x = F.relu(x)  # maybe replace the ReLU with LeakyReLU
    #         x = F.dropout(x, self.dropout, training=self.training)
    #     x = self.readout_layer(x, batch)
    #     return x

    def get_pred_label(self, pred):
        return pred.argmax(dim=1)

forward(*args, **kwargs)

Forward pass of GNN model.

Source code in lumina/model/opf/homo_model.py
def forward(self, *args, **kwargs):
    r""" Forward pass of GNN model.
    """
    _, _, _, _, batch = self._argsparse(*args, **kwargs)
    # node embedding for GNN
    emb = self.get_emb(*args, **kwargs)
    # logits of ACOPF variables
    self.logits = self.mlps(emb)
    return self.logits

loss(pred, label)

Mean Squared Error Loss

Parameters:

Name Type Description Default
pred Tensor

Predicted values.

required
label Tensor

True values.

required

Returns:

Type Description

torch.Tensor: Mean Squared Error Loss.

Source code in lumina/model/opf/homo_model.py
def loss(self, pred, label):
    r""" Mean Squared Error Loss

    Args:
        pred (torch.Tensor): Predicted values.
        label (torch.Tensor): True values.

    Returns:
        torch.Tensor: Mean Squared Error Loss.
    """
    return F.mse_loss(pred, label)

GAT

Bases: GNN_basic

Homogeneous GNN using GATConv (Graph Attention Network) layers.

Replaces the default NNConv with GATConv and uses LeakyReLU activation in the MLP head.

Parameters:

Name Type Description Default
input_dim int

Number of input node features.

required
output_dim int

Number of output features per node.

required
model_params dict

Model hyperparameters (see GNN_basic).

required
Source code in lumina/model/opf/homo_model.py
class GAT(GNN_basic):
    """Homogeneous GNN using GATConv (Graph Attention Network) layers.

    Replaces the default NNConv with ``GATConv`` and uses ``LeakyReLU``
    activation in the MLP head.

    Args:
        input_dim (int): Number of input node features.
        output_dim (int): Number of output features per node.
        model_params (dict): Model hyperparameters (see ``GNN_basic``).
    """

    def __init__(self, input_dim, output_dim, model_params):
        super().__init__(
            input_dim,
            output_dim,
            model_params,
        )

    def get_layers(self):
        self.convs = nn.ModuleList()
        current_dim = self.input_dim
        for layer in range(self.num_layers):
            self.convs.append(
                GATConv(current_dim, self.hidden_dim, edge_dim=self.edge_dim)
            )
            current_dim = self.hidden_dim
        # FC layers

        mlp_dim = current_dim * 2 if self.readout == 'cat_max_sum' else current_dim
        self.mlps = nn.Sequential(
            nn.Linear(mlp_dim, mlp_dim),
            nn.LeakyReLU(),
            nn.Linear(mlp_dim, self.output_dim))
        return

GCN

Bases: GNN_basic

Homogeneous GNN using GCNConv (Graph Convolutional Network) layers.

Overrides get_layers to use GCNConv and get_emb to skip edge attributes (GCNConv does not use them).

Parameters:

Name Type Description Default
input_dim int

Number of input node features.

required
output_dim int

Number of output features per node.

required
model_params dict

Model hyperparameters (see GNN_basic).

required
Source code in lumina/model/opf/homo_model.py
class GCN(GNN_basic):
    """Homogeneous GNN using GCNConv (Graph Convolutional Network) layers.

    Overrides ``get_layers`` to use ``GCNConv`` and ``get_emb`` to skip
    edge attributes (GCNConv does not use them).

    Args:
        input_dim (int): Number of input node features.
        output_dim (int): Number of output features per node.
        model_params (dict): Model hyperparameters (see ``GNN_basic``).
    """

    def __init__(self,
                 input_dim,
                 output_dim,
                 model_params):
        super().__init__(input_dim,
                         output_dim,
                         model_params)

    def get_layers(self):
        r""" Prepare the layers of GCN model. """
        # GCN layers
        self.convs = nn.ModuleList()
        current_dim = self.input_dim
        for layer in range(self.num_layers):
            self.convs.append(GCNConv(current_dim, self.hidden_dim))
            current_dim = self.hidden_dim

        # MLP layers
        mlp_dim = current_dim * 2 if self.readout == 'cat_max_sum' else current_dim
        self.mlps = nn.Linear(mlp_dim, self.output_dim)
        return

    def get_emb(self, *args, **kwargs):
        r""" Get the node embeddings from GCN model. """
        x, edge_index, _, _, _ = self._argsparse(*args, **kwargs)

        for layer in self.convs:
            x = layer(x, edge_index)
            x = F.relu(x)  # Fixed: was calling nn.PReLU() incorrectly
            x = F.dropout(x, self.dropout, training=self.training)
        return x

get_layers()

Prepare the layers of GCN model.

Source code in lumina/model/opf/homo_model.py
def get_layers(self):
    r""" Prepare the layers of GCN model. """
    # GCN layers
    self.convs = nn.ModuleList()
    current_dim = self.input_dim
    for layer in range(self.num_layers):
        self.convs.append(GCNConv(current_dim, self.hidden_dim))
        current_dim = self.hidden_dim

    # MLP layers
    mlp_dim = current_dim * 2 if self.readout == 'cat_max_sum' else current_dim
    self.mlps = nn.Linear(mlp_dim, self.output_dim)
    return

get_emb(*args, **kwargs)

Get the node embeddings from GCN model.

Source code in lumina/model/opf/homo_model.py
def get_emb(self, *args, **kwargs):
    r""" Get the node embeddings from GCN model. """
    x, edge_index, _, _, _ = self._argsparse(*args, **kwargs)

    for layer in self.convs:
        x = layer(x, edge_index)
        x = F.relu(x)  # Fixed: was calling nn.PReLU() incorrectly
        x = F.dropout(x, self.dropout, training=self.training)
    return x

GIN

Bases: GNN_basic

Homogeneous GNN using GINEConv (Graph Isomorphism Network) layers.

Uses GINEConv with edge attributes and a two-layer MLP with ReLU/Sigmoid activations inside each convolution.

Parameters:

Name Type Description Default
input_dim int

Number of input node features.

required
output_dim int

Number of output features per node.

required
model_params dict

Model hyperparameters (see GNN_basic).

required
Source code in lumina/model/opf/homo_model.py
class GIN(GNN_basic):
    """Homogeneous GNN using GINEConv (Graph Isomorphism Network) layers.

    Uses ``GINEConv`` with edge attributes and a two-layer MLP with
    ReLU/Sigmoid activations inside each convolution.

    Args:
        input_dim (int): Number of input node features.
        output_dim (int): Number of output features per node.
        model_params (dict): Model hyperparameters (see ``GNN_basic``).
    """

    def __init__(
            self,
            input_dim,
            output_dim,
            model_params,
    ):
        super().__init__(
            input_dim,
            output_dim,
            model_params,
        )

    def get_layers(self):
        self.convs = nn.ModuleList()
        current_dim = self.input_dim
        print(current_dim)
        for layer in range(self.num_layers):
            self.convs.append(
                GINEConv(
                    nn=nn.Sequential(
                        nn.Linear(current_dim, self.hidden_dim),
                        nn.ReLU(),
                        # nn.Linear(current_dim, self.hidden_dim),
                        # nn.PReLU(),
                        nn.Linear(self.hidden_dim, self.hidden_dim),
                        nn.Sigmoid(),
                    ),
                    edge_dim=self.edge_dim,
                )
            )
            current_dim = self.hidden_dim
        # FC layers
        mlp_dim = current_dim * 2 if self.readout == 'cat_max_sum' else current_dim
        self.mlps = nn.Linear(mlp_dim, self.output_dim)
        return

TRANSFORMER

Bases: GNN_basic

Homogeneous GNN using TransformerConv layers.

Applies multi-head graph transformer convolutions (4 heads, no concatenation) with edge attribute support.

Parameters:

Name Type Description Default
input_dim int

Number of input node features.

required
output_dim int

Number of output features per node.

required
model_params dict

Model hyperparameters (see GNN_basic).

required
Source code in lumina/model/opf/homo_model.py
class TRANSFORMER(GNN_basic):  # uppercase
    """Homogeneous GNN using TransformerConv layers.

    Applies multi-head graph transformer convolutions (4 heads, no
    concatenation) with edge attribute support.

    Args:
        input_dim (int): Number of input node features.
        output_dim (int): Number of output features per node.
        model_params (dict): Model hyperparameters (see ``GNN_basic``).
    """

    def __init__(
            self,
            input_dim,
            output_dim,
            model_params,
    ):
        super().__init__(
            input_dim,
            output_dim,
            model_params,
        )

    def get_layers(self):
        self.convs = nn.ModuleList()
        current_dim = self.input_dim
        for layer in range(self.num_layers):
            self.convs.append(
                TransformerConv(current_dim, self.hidden_dim, heads=4, edge_dim=self.edge_dim, concat=False)
            )
            current_dim = self.hidden_dim * 1

        # FC layers
        mlp_dim = current_dim * 2 if self.readout == 'cat_max_sum' else current_dim
        self.mlps = nn.Linear(mlp_dim, self.output_dim)
        return

Base Heterogeneous GNN

HeteroGNN

Bases: Module

Generic Heterogeneous GNN with configurable message-passing backend.

Supports SAGE, GCN, GIN, and GAT backends via HeteroConv. Each edge type gets its own convolution instance. Produces per-node predictions for bus and generator node types.

Input shape per node type: (N_type, input_channels[type]). Output shape: {'bus': (N_bus, out_channels), 'generator': (N_gen, out_channels)}.

Source code in lumina/model/base/hetero_gnn.py
class HeteroGNN(torch.nn.Module):
    """Generic Heterogeneous GNN with configurable message-passing backend.

    Supports SAGE, GCN, GIN, and GAT backends via ``HeteroConv``. Each edge
    type gets its own convolution instance. Produces per-node predictions for
    ``bus`` and ``generator`` node types.

    Input shape per node type: ``(N_type, input_channels[type])``.
    Output shape: ``{'bus': (N_bus, out_channels), 'generator': (N_gen, out_channels)}``.
    """

    def __init__(
            self,
            metadata,
            input_channels,
            hidden_channels=64,
            out_channels=2,
            num_layers=3,
            backend="sage",
            edge_attr_dim=None,
            **kwargs):
        """ Heterogeneous Graph Neural Network (HeteroGNN) model.

        Args:
            metadata (dict or tuple): Metadata containing node types and edge types.
                If dict: {'nodes': {node_type: dim, ...}, 'edges': {edge_type: dim, ...}}
                If tuple: (node_types, edge_types)
            input_channels (dict): Number of input features for each node type.
            hidden_channels (int): Hidden embedding size.
            out_channels (int): Size of each output sample. Defaults to 2.
            num_layers (int): Number of layers. Defaults to 3.
            backend (str): Graph convolutional layer backend. Defaults to "sage".
            edge_attr_dim (int, optional): Dimension of edge attributes for GAT. Defaults to None.
        """
        super().__init__()
        self.lin_dict = torch.nn.ModuleDict()

        # Handle both old tuple format and new dict format
        if isinstance(metadata, dict):
            node_types = list(metadata['nodes'].keys())
            edge_types = list(metadata['edges'].keys())
        else:
            # Legacy tuple format: (node_types, edge_types)
            node_types = metadata[0]
            edge_types = metadata[1]

        self.backend = backend
        self.edge_attr_support = backend == "gat"

        # Validate input_channels
        if not isinstance(input_channels, dict):
            raise ValueError("input_channels must be a dictionary")

        # Input layers for each node type - no more lazy initialization
        for node_type in node_types:
            if node_type not in input_channels:
                raise ValueError(f"input_channels must contain entry for node type '{node_type}'")
            self.lin_dict[node_type] = Linear(input_channels[node_type], hidden_channels)

        # Heterogeneous graph convolutional layers for edges
        self.convs = torch.nn.ModuleList()

        # Get edge attribute dimensions from metadata
        if isinstance(metadata, dict):
            edge_attr_dims = {edge_type: (dim if dim > 0 else None)
                              for edge_type, dim in metadata['edges'].items()}
        else:
            # Fallback to hardcoded dimensions for legacy format
            edge_attr_dims = {
                ('bus', 'ac_line', 'bus'): 9,
                ('bus', 'transformer', 'bus'): 11,
                # Link edges have no attributes
                ('generator', 'generator_link', 'bus'): None,
                ('bus', 'generator_link', 'generator'): None,
                ('load', 'load_link', 'bus'): None,
                ('bus', 'load_link', 'load'): None,
                ('shunt', 'shunt_link', 'bus'): None,
                ('bus', 'shunt_link', 'shunt'): None,
            }

        def get_conv_layer(edge_type):
            if backend == "sage":
                return SAGEConv((hidden_channels, hidden_channels), hidden_channels)
            elif backend == "gcn":
                return GraphConv((hidden_channels, hidden_channels), hidden_channels)
            elif backend == "gin":
                return GINConv(MLP([hidden_channels, hidden_channels]))
            elif backend == "gat":
                edge_dim = edge_attr_dims.get(edge_type, None)
                return GATConv((hidden_channels, hidden_channels),
                               hidden_channels,
                               add_self_loops=False,
                               edge_dim=edge_dim)
            else:
                raise ValueError(f"Unknown backend: {backend}")

        for _ in range(num_layers - 1):
            conv = HeteroConv({
                edge_type: get_conv_layer(edge_type)
                for edge_type in edge_types
            }, aggr='sum')
            self.convs.append(conv)

        # Output layers for target node types, ACOPF variables
        self.out_dict = torch.nn.ModuleDict({
            "bus": Linear(hidden_channels, out_channels),
            "generator": Linear(hidden_channels, out_channels),
        })

        self.reset_parameters()

        # # set param init
        # for lin in self.lin_dict.values():
        #     torch.nn.init.xavier_uniform_(lin.weight)
        # # for conv in self.convs:
        # #     for rel_conv in conv.convs.values():
        # #         torch.nn.init.xavier_normal_(rel_conv.lin_rel.weight)
        # #         torch.nn.init.xavier_normal_(rel_conv.lin_root.weight)
        # for out in self.out_dict.values():
        #     torch.nn.init.xavier_uniform_(out.weight)

    def reset_parameters(self):
        """Reset parameters of the model."""
        for lin in self.lin_dict.values():
            lin.reset_parameters()
        for conv in self.convs:
            for rel_conv in conv.convs.values():
                if hasattr(rel_conv, 'reset_parameters'):
                    rel_conv.reset_parameters()
            conv.reset_parameters()
        for out in self.out_dict.values():
            out.reset_parameters()

    def forward(self, x_dict, edge_index_dict, edge_attr_dict=None, minmax_scaling=False, **kwargs):
        """Forward pass of the HeteroGNN model.

        Args:
            x_dict (dict): Node features for each node type.
            edge_index_dict (dict): Edge indices for each edge type.
            edge_attr_dict (dict, optional): Edge attributes for each edge type. Defaults to None
            minmax_scaling (bool): Whether to apply min-max scaling to outputs. Defaults to False.

        Returns:
            dict: Output predictions for each target node type.
        """

        if minmax_scaling:
            _vmin = x_dict['bus'][:, 1].clone()  # Original voltage min
            _vmax = x_dict['bus'][:, 2].clone()  # Original voltage max
            _pmin = x_dict['generator'][:, 2].clone()  # Original active power min
            _pmax = x_dict['generator'][:, 3].clone()  # Original active power max
            _qmin = x_dict['generator'][:, 5].clone()  # Original reactive power min
            _qmax = x_dict['generator'][:, 6].clone()  # Original reactive power max

        # Transform input features
        x_dict = {
            node_type: F.relu(self.lin_dict[node_type](x))
            for node_type, x in x_dict.items()
        }
        # x_dict = {key: F.dropout(x, p=0.1, training=self.training) for key, x in x_dict.items()}

        # Message passing
        for conv in self.convs:
            if self.edge_attr_support and edge_attr_dict is not None:
                x_dict = conv(x_dict, edge_index_dict, edge_attr_dict=edge_attr_dict)
                x_dict = {key: F.relu(x) for key, x in x_dict.items()}
                x_dict = {key: F.dropout(x, p=0.1, training=self.training) for key, x in x_dict.items()}
            else:
                x_dict = conv(x_dict, edge_index_dict)
                x_dict = {key: F.relu(x) for key, x in x_dict.items()}
                x_dict = {key: F.dropout(x, p=0.1, training=self.training) for key, x in x_dict.items()}
            # NOTE: no activation function applied here <== why?
            # x_dict = {key: F.relu(x) for key, x in x_dict.items()}

        # Final predictions
        # bus_out: va, vm
        bus_out = self.out_dict["bus"](x_dict["bus"])
        # bus_out = F.dropout(bus_out, p=0.1, training=self.training)
        # gen_out: pg, qg
        gen_out = self.out_dict["generator"](x_dict["generator"])
        # gen_out = F.dropout(gen_out, p=0.1, training=self.training)

        if minmax_scaling:
            # Create new tensors instead of modifying in-place
            bus_out_final = bus_out.clone()
            gen_out_final = gen_out.clone()

            # Apply scaling without in-place operations
            bus_out_final[:, 1] = F.sigmoid(bus_out[:, 1]) * (_vmax - _vmin) + _vmin

            gen_out_sigmoid = F.sigmoid(gen_out)
            gen_out_final[:, 0] = gen_out_sigmoid[:, 0] * (_pmax - _pmin) + _pmin
            gen_out_final[:, 1] = gen_out_sigmoid[:, 1] * (_qmax - _qmin) + _qmin

            return {"bus": bus_out_final, "generator": gen_out_final}
        else:
            return {"bus": bus_out, "generator": gen_out}

__init__(metadata, input_channels, hidden_channels=64, out_channels=2, num_layers=3, backend='sage', edge_attr_dim=None, **kwargs)

Heterogeneous Graph Neural Network (HeteroGNN) model.

Parameters:

Name Type Description Default
metadata dict or tuple

Metadata containing node types and edge types. If dict: {'nodes': {node_type: dim, ...}, 'edges': {edge_type: dim, ...}} If tuple: (node_types, edge_types)

required
input_channels dict

Number of input features for each node type.

required
hidden_channels int

Hidden embedding size.

64
out_channels int

Size of each output sample. Defaults to 2.

2
num_layers int

Number of layers. Defaults to 3.

3
backend str

Graph convolutional layer backend. Defaults to "sage".

'sage'
edge_attr_dim int

Dimension of edge attributes for GAT. Defaults to None.

None
Source code in lumina/model/base/hetero_gnn.py
def __init__(
        self,
        metadata,
        input_channels,
        hidden_channels=64,
        out_channels=2,
        num_layers=3,
        backend="sage",
        edge_attr_dim=None,
        **kwargs):
    """ Heterogeneous Graph Neural Network (HeteroGNN) model.

    Args:
        metadata (dict or tuple): Metadata containing node types and edge types.
            If dict: {'nodes': {node_type: dim, ...}, 'edges': {edge_type: dim, ...}}
            If tuple: (node_types, edge_types)
        input_channels (dict): Number of input features for each node type.
        hidden_channels (int): Hidden embedding size.
        out_channels (int): Size of each output sample. Defaults to 2.
        num_layers (int): Number of layers. Defaults to 3.
        backend (str): Graph convolutional layer backend. Defaults to "sage".
        edge_attr_dim (int, optional): Dimension of edge attributes for GAT. Defaults to None.
    """
    super().__init__()
    self.lin_dict = torch.nn.ModuleDict()

    # Handle both old tuple format and new dict format
    if isinstance(metadata, dict):
        node_types = list(metadata['nodes'].keys())
        edge_types = list(metadata['edges'].keys())
    else:
        # Legacy tuple format: (node_types, edge_types)
        node_types = metadata[0]
        edge_types = metadata[1]

    self.backend = backend
    self.edge_attr_support = backend == "gat"

    # Validate input_channels
    if not isinstance(input_channels, dict):
        raise ValueError("input_channels must be a dictionary")

    # Input layers for each node type - no more lazy initialization
    for node_type in node_types:
        if node_type not in input_channels:
            raise ValueError(f"input_channels must contain entry for node type '{node_type}'")
        self.lin_dict[node_type] = Linear(input_channels[node_type], hidden_channels)

    # Heterogeneous graph convolutional layers for edges
    self.convs = torch.nn.ModuleList()

    # Get edge attribute dimensions from metadata
    if isinstance(metadata, dict):
        edge_attr_dims = {edge_type: (dim if dim > 0 else None)
                          for edge_type, dim in metadata['edges'].items()}
    else:
        # Fallback to hardcoded dimensions for legacy format
        edge_attr_dims = {
            ('bus', 'ac_line', 'bus'): 9,
            ('bus', 'transformer', 'bus'): 11,
            # Link edges have no attributes
            ('generator', 'generator_link', 'bus'): None,
            ('bus', 'generator_link', 'generator'): None,
            ('load', 'load_link', 'bus'): None,
            ('bus', 'load_link', 'load'): None,
            ('shunt', 'shunt_link', 'bus'): None,
            ('bus', 'shunt_link', 'shunt'): None,
        }

    def get_conv_layer(edge_type):
        if backend == "sage":
            return SAGEConv((hidden_channels, hidden_channels), hidden_channels)
        elif backend == "gcn":
            return GraphConv((hidden_channels, hidden_channels), hidden_channels)
        elif backend == "gin":
            return GINConv(MLP([hidden_channels, hidden_channels]))
        elif backend == "gat":
            edge_dim = edge_attr_dims.get(edge_type, None)
            return GATConv((hidden_channels, hidden_channels),
                           hidden_channels,
                           add_self_loops=False,
                           edge_dim=edge_dim)
        else:
            raise ValueError(f"Unknown backend: {backend}")

    for _ in range(num_layers - 1):
        conv = HeteroConv({
            edge_type: get_conv_layer(edge_type)
            for edge_type in edge_types
        }, aggr='sum')
        self.convs.append(conv)

    # Output layers for target node types, ACOPF variables
    self.out_dict = torch.nn.ModuleDict({
        "bus": Linear(hidden_channels, out_channels),
        "generator": Linear(hidden_channels, out_channels),
    })

    self.reset_parameters()

reset_parameters()

Reset parameters of the model.

Source code in lumina/model/base/hetero_gnn.py
def reset_parameters(self):
    """Reset parameters of the model."""
    for lin in self.lin_dict.values():
        lin.reset_parameters()
    for conv in self.convs:
        for rel_conv in conv.convs.values():
            if hasattr(rel_conv, 'reset_parameters'):
                rel_conv.reset_parameters()
        conv.reset_parameters()
    for out in self.out_dict.values():
        out.reset_parameters()

forward(x_dict, edge_index_dict, edge_attr_dict=None, minmax_scaling=False, **kwargs)

Forward pass of the HeteroGNN model.

Parameters:

Name Type Description Default
x_dict dict

Node features for each node type.

required
edge_index_dict dict

Edge indices for each edge type.

required
edge_attr_dict dict

Edge attributes for each edge type. Defaults to None

None
minmax_scaling bool

Whether to apply min-max scaling to outputs. Defaults to False.

False

Returns:

Name Type Description
dict

Output predictions for each target node type.

Source code in lumina/model/base/hetero_gnn.py
def forward(self, x_dict, edge_index_dict, edge_attr_dict=None, minmax_scaling=False, **kwargs):
    """Forward pass of the HeteroGNN model.

    Args:
        x_dict (dict): Node features for each node type.
        edge_index_dict (dict): Edge indices for each edge type.
        edge_attr_dict (dict, optional): Edge attributes for each edge type. Defaults to None
        minmax_scaling (bool): Whether to apply min-max scaling to outputs. Defaults to False.

    Returns:
        dict: Output predictions for each target node type.
    """

    if minmax_scaling:
        _vmin = x_dict['bus'][:, 1].clone()  # Original voltage min
        _vmax = x_dict['bus'][:, 2].clone()  # Original voltage max
        _pmin = x_dict['generator'][:, 2].clone()  # Original active power min
        _pmax = x_dict['generator'][:, 3].clone()  # Original active power max
        _qmin = x_dict['generator'][:, 5].clone()  # Original reactive power min
        _qmax = x_dict['generator'][:, 6].clone()  # Original reactive power max

    # Transform input features
    x_dict = {
        node_type: F.relu(self.lin_dict[node_type](x))
        for node_type, x in x_dict.items()
    }
    # x_dict = {key: F.dropout(x, p=0.1, training=self.training) for key, x in x_dict.items()}

    # Message passing
    for conv in self.convs:
        if self.edge_attr_support and edge_attr_dict is not None:
            x_dict = conv(x_dict, edge_index_dict, edge_attr_dict=edge_attr_dict)
            x_dict = {key: F.relu(x) for key, x in x_dict.items()}
            x_dict = {key: F.dropout(x, p=0.1, training=self.training) for key, x in x_dict.items()}
        else:
            x_dict = conv(x_dict, edge_index_dict)
            x_dict = {key: F.relu(x) for key, x in x_dict.items()}
            x_dict = {key: F.dropout(x, p=0.1, training=self.training) for key, x in x_dict.items()}
        # NOTE: no activation function applied here <== why?
        # x_dict = {key: F.relu(x) for key, x in x_dict.items()}

    # Final predictions
    # bus_out: va, vm
    bus_out = self.out_dict["bus"](x_dict["bus"])
    # bus_out = F.dropout(bus_out, p=0.1, training=self.training)
    # gen_out: pg, qg
    gen_out = self.out_dict["generator"](x_dict["generator"])
    # gen_out = F.dropout(gen_out, p=0.1, training=self.training)

    if minmax_scaling:
        # Create new tensors instead of modifying in-place
        bus_out_final = bus_out.clone()
        gen_out_final = gen_out.clone()

        # Apply scaling without in-place operations
        bus_out_final[:, 1] = F.sigmoid(bus_out[:, 1]) * (_vmax - _vmin) + _vmin

        gen_out_sigmoid = F.sigmoid(gen_out)
        gen_out_final[:, 0] = gen_out_sigmoid[:, 0] * (_pmax - _pmin) + _pmin
        gen_out_final[:, 1] = gen_out_sigmoid[:, 1] * (_qmax - _qmin) + _qmin

        return {"bus": bus_out_final, "generator": gen_out_final}
    else:
        return {"bus": bus_out, "generator": gen_out}

ModelFactory

Factory for instantiating heterogeneous GNN model architectures.

Currently supports 'heterognn', 'hgt', 'heat', and 'rgat'. HGT, HEAT, and RGAT fall back to HeteroGNN with a warning until their dedicated configurations are finalized.

Source code in lumina/model/base/hetero_gnn.py
class ModelFactory:
    """Factory for instantiating heterogeneous GNN model architectures.

    Currently supports ``'heterognn'``, ``'hgt'``, ``'heat'``, and ``'rgat'``.
    HGT, HEAT, and RGAT fall back to HeteroGNN with a warning until their
    dedicated configurations are finalized.
    """

    @staticmethod
    def create_model(model_name, metadata, input_channels, config):
        """Create a model instance by name and configuration.

        Args:
            model_name (str): Model architecture name (case-insensitive).
                One of ``'heterognn'``, ``'hgt'``, ``'heat'``, ``'rgat'``.
            metadata (dict or tuple): Graph metadata describing node and
                edge types.
            input_channels (dict): Mapping of node type to input feature
                dimension.
            config (dict): Full training configuration dict; model
                hyperparameters are read from ``config['models'][name]``.

        Returns:
            torch.nn.Module: An instantiated GNN model.

        Raises:
            ValueError: If ``model_name`` is not recognized.
        """
        model_name = model_name.lower()

        if model_name == 'heterognn':
            model_config = config['models']['HeteroGNN']
            return HeteroGNN(
                metadata=metadata,
                input_channels=input_channels,
                hidden_channels=model_config['hidden_channels'],
                num_layers=model_config['num_layers'],
                backend=model_config.get('backend', 'gcn'),
                dropout=model_config.get('dropout', 0.0),
                out_channels=2  # VM, VA for bus; PG, QG for generator
            )
        elif model_name == 'hgt':
            model_config = config['models'].get('HGT', {})
            # Placeholder for HGT - to be implemented
            print("Warning: HGT model not yet implemented, falling back to HeteroGNN")
            return ModelFactory.create_model('heterognn', metadata, input_channels, config)
        elif model_name == 'heat':
            model_config = config['models'].get('HEAT', {})
            # Placeholder for HEAT - to be implemented
            print("Warning: HEAT model not yet implemented, falling back to HeteroGNN")
            return ModelFactory.create_model('heterognn', metadata, input_channels, config)
        elif model_name == 'rgat':
            model_config = config['models'].get('RGAT', {})
            # Placeholder for RGAT - to be implemented
            print("Warning: RGAT model not yet implemented, falling back to HeteroGNN")
            return ModelFactory.create_model('heterognn', metadata, input_channels, config)
        else:
            raise ValueError(f"Unknown model: {model_name}. Available models: heterognn, hgt, heat, rgat")

    @staticmethod
    def get_available_models():
        """Return the list of supported model architecture names.

        Returns:
            list[str]: Available model names.
        """
        return ['heterognn', 'hgt', 'heat', 'rgat']

create_model(model_name, metadata, input_channels, config) staticmethod

Create a model instance by name and configuration.

Parameters:

Name Type Description Default
model_name str

Model architecture name (case-insensitive). One of 'heterognn', 'hgt', 'heat', 'rgat'.

required
metadata dict or tuple

Graph metadata describing node and edge types.

required
input_channels dict

Mapping of node type to input feature dimension.

required
config dict

Full training configuration dict; model hyperparameters are read from config['models'][name].

required

Returns:

Type Description

torch.nn.Module: An instantiated GNN model.

Raises:

Type Description
ValueError

If model_name is not recognized.

Source code in lumina/model/base/hetero_gnn.py
@staticmethod
def create_model(model_name, metadata, input_channels, config):
    """Create a model instance by name and configuration.

    Args:
        model_name (str): Model architecture name (case-insensitive).
            One of ``'heterognn'``, ``'hgt'``, ``'heat'``, ``'rgat'``.
        metadata (dict or tuple): Graph metadata describing node and
            edge types.
        input_channels (dict): Mapping of node type to input feature
            dimension.
        config (dict): Full training configuration dict; model
            hyperparameters are read from ``config['models'][name]``.

    Returns:
        torch.nn.Module: An instantiated GNN model.

    Raises:
        ValueError: If ``model_name`` is not recognized.
    """
    model_name = model_name.lower()

    if model_name == 'heterognn':
        model_config = config['models']['HeteroGNN']
        return HeteroGNN(
            metadata=metadata,
            input_channels=input_channels,
            hidden_channels=model_config['hidden_channels'],
            num_layers=model_config['num_layers'],
            backend=model_config.get('backend', 'gcn'),
            dropout=model_config.get('dropout', 0.0),
            out_channels=2  # VM, VA for bus; PG, QG for generator
        )
    elif model_name == 'hgt':
        model_config = config['models'].get('HGT', {})
        # Placeholder for HGT - to be implemented
        print("Warning: HGT model not yet implemented, falling back to HeteroGNN")
        return ModelFactory.create_model('heterognn', metadata, input_channels, config)
    elif model_name == 'heat':
        model_config = config['models'].get('HEAT', {})
        # Placeholder for HEAT - to be implemented
        print("Warning: HEAT model not yet implemented, falling back to HeteroGNN")
        return ModelFactory.create_model('heterognn', metadata, input_channels, config)
    elif model_name == 'rgat':
        model_config = config['models'].get('RGAT', {})
        # Placeholder for RGAT - to be implemented
        print("Warning: RGAT model not yet implemented, falling back to HeteroGNN")
        return ModelFactory.create_model('heterognn', metadata, input_channels, config)
    else:
        raise ValueError(f"Unknown model: {model_name}. Available models: heterognn, hgt, heat, rgat")

get_available_models() staticmethod

Return the list of supported model architecture names.

Returns:

Type Description

list[str]: Available model names.

Source code in lumina/model/base/hetero_gnn.py
@staticmethod
def get_available_models():
    """Return the list of supported model architecture names.

    Returns:
        list[str]: Available model names.
    """
    return ['heterognn', 'hgt', 'heat', 'rgat']

ACOPFModel

Bases: Module

Placeholder ACOPF model for power systems.

Not yet implemented. Reserved for a future end-to-end ACOPF model that combines graph encoding with constraint-aware decoding.

Source code in lumina/model/base/hetero_gnn.py
class ACOPFModel(torch.nn.Module):
    """Placeholder ACOPF model for power systems.

    Not yet implemented. Reserved for a future end-to-end ACOPF model that
    combines graph encoding with constraint-aware decoding.
    """

    def __init__(self, ):
        pass

Base Homogeneous GNN

GNNBase

Bases: Module

Abstract base class for homogeneous GNN models.

Provides _argsparse to flexibly accept graph data as positional arguments, keyword arguments, or a PyG Batch / Data object. Subclasses must define convolutional layers and a forward method.

Source code in lumina/model/base/homo_gnn.py
class GNNBase(nn.Module):
    """Abstract base class for homogeneous GNN models.

    Provides ``_argsparse`` to flexibly accept graph data as positional
    arguments, keyword arguments, or a PyG ``Batch`` / ``Data`` object.
    Subclasses must define convolutional layers and a forward method.
    """

    def __init__(self):
        super(GNNBase, self).__init__()

    def _argsparse(self, *args, **kwargs):
        """Parse the possible input types.
        If the x and edge_index are in args, follow the args.
        In other case, find them in kwargs.
        """
        if args:
            if len(args) == 1:
                data = args[0]
                x = data.x
                edge_index = data.edge_index
                if hasattr(data, "edge_attr"):
                    edge_attr = data.edge_attr
                else:
                    edge_attr = torch.ones(
                        (edge_index.shape[1], self.edge_dim),
                        dtype=torch.float32,
                        device=x.device,
                    )
                if hasattr(data, "batch"):
                    batch = data.batch
                else:
                    batch = torch.zeros(x.shape[0], dtype=torch.int64, device=x.device)
                if hasattr(data, "edge_weight") and data.edge_weight is not None:
                    edge_weight = data.edge_weight
                else:
                    edge_weight = torch.ones(edge_index.shape[1], dtype=torch.float32, device=x.device)

            elif len(args) == 2:
                x, edge_index = args[0], args[1]
                batch = torch.zeros(x.shape[0], dtype=torch.int64, device=x.device)
                edge_attr = torch.ones(
                    (edge_index.shape[1], self.edge_dim),
                    dtype=torch.float32,
                    device=x.device,
                )
                edge_weight = torch.ones(edge_index.shape[1], dtype=torch.float32, device=x.device)

            elif len(args) == 3:
                x, edge_index, edge_attr = args[0], args[1], args[2]
                batch = torch.zeros(x.shape[0], dtype=torch.int64, device=x.device)
                edge_weight = torch.ones(edge_index.shape[1], dtype=torch.float32, device=x.device)

            elif len(args) == 4:
                x, edge_index, edge_attr, batch = args[0], args[1], args[2], args[3]
                edge_weight = torch.ones(edge_index.shape[1], dtype=torch.float32, device=x.device)
            else:
                raise ValueError(
                    f"forward's args should take 1, 2 or 3 arguments but got {len(args)}"
                )
        else:
            data: Batch = kwargs.get("data")
            if not data:
                x = kwargs.get("x")
                edge_index = kwargs.get("edge_index")
                adj = kwargs.get("adj")
                edge_weight = kwargs.get("edge_weight")
                if "edge_index" not in kwargs:
                    assert (
                        adj is not None
                    ), "forward's args is empty and required adj is not in kwargs"
                    if torch.is_tensor(adj):
                        edge_index, edge_weight = from_adj_to_edge_index_torch(adj)
                    else:
                        edge_index, edge_weight = from_adj_to_edge_index_torch(torch.from_numpy(adj))
                if "adj" not in kwargs:
                    assert (
                        edge_index is not None
                    ), "forward's args is empty and required edge_index is not in kwargs"
                assert (
                    x is not None
                ), "forward's args is empty and required node features x is not in kwargs"
                edge_attr = kwargs.get("edge_attr")
                if "edge_attr" not in kwargs:
                    edge_attr = torch.ones(
                        (edge_index.shape[1], self.edge_dim),
                        dtype=torch.float32,
                        device=x.device,
                    )
                batch = kwargs.get("batch")
                if torch.is_tensor(batch):
                    if batch.size == 0:
                        batch = torch.zeros(x.shape[0], dtype=torch.int64, device=x.device)
                else:
                    if not batch:
                        batch = torch.zeros(x.shape[0], dtype=torch.int64, device=x.device)
                if "edge_weight" not in kwargs:
                    edge_weight = torch.ones(edge_index.shape[1], dtype=torch.float32, device=x.device)

            else:
                x = data.x
                edge_index = data.edge_index
                if hasattr(data, "edge_attr"):
                    edge_attr = data.edge_attr
                    if edge_attr is None:
                        edge_attr = torch.ones(
                            (edge_index.shape[1], self.edge_dim),
                            dtype=torch.float64,
                            device=x.device,
                        )
                else:
                    edge_attr = torch.ones(
                        (edge_index.shape[1], self.edge_dim),
                        dtype=torch.float32,
                        device=x.device,
                    )
                if hasattr(data, "batch"):
                    batch = data.batch
                    if batch is None:
                        batch = torch.zeros(
                            x.shape[0], dtype=torch.int64, device=x.device
                        )
                else:
                    batch = torch.zeros(x.shape[0], dtype=torch.int64, device=x.device)
                if hasattr(data, "edge_weight"):
                    edge_weight = data.edge_weight
                    if edge_weight is None:
                        edge_weight = torch.ones(edge_index.shape[1], dtype=torch.float32, device=x.device)
                else:
                    edge_weight = torch.ones(edge_index.shape[1], dtype=torch.float32, device=x.device)
        return x, edge_index, edge_attr, edge_weight, batch

GNNPool

Bases: Module

Graph pooling module that applies a named readout operation.

Parameters:

Name Type Description Default
readout str

Name of the readout strategy. One of 'mean', 'sum', 'max', 'identity', or 'cat_max_sum'.

required
Source code in lumina/model/base/homo_gnn.py
class GNNPool(nn.Module):
    """Graph pooling module that applies a named readout operation.

    Args:
        readout (str): Name of the readout strategy. One of ``'mean'``,
            ``'sum'``, ``'max'``, ``'identity'``, or ``'cat_max_sum'``.
    """

    def __init__(self, readout):
        super().__init__()
        self.readout = get_readout_layers(readout)

    def forward(self, x, batch):
        """Apply the readout operation.

        Args:
            x (torch.Tensor): Node embeddings of shape ``(N, D)``.
            batch (torch.Tensor): Batch assignment vector of shape ``(N,)``.

        Returns:
            torch.Tensor: Pooled representation.
        """
        return self.readout(x, batch)

forward(x, batch)

Apply the readout operation.

Parameters:

Name Type Description Default
x Tensor

Node embeddings of shape (N, D).

required
batch Tensor

Batch assignment vector of shape (N,).

required

Returns:

Type Description

torch.Tensor: Pooled representation.

Source code in lumina/model/base/homo_gnn.py
def forward(self, x, batch):
    """Apply the readout operation.

    Args:
        x (torch.Tensor): Node embeddings of shape ``(N, D)``.
        batch (torch.Tensor): Batch assignment vector of shape ``(N,)``.

    Returns:
        torch.Tensor: Pooled representation.
    """
    return self.readout(x, batch)

Loss Functions

OPFLossManager

Bases: Module

Unified loss manager that wraps ACOPFLossFunction for the training loop.

Provides target extraction from both heterogeneous and homogeneous batches, optional y_mask filtering for homogeneous data, and a standardized compute_loss interface returning both the scalar loss and an info dict.

Parameters:

Name Type Description Default
loss_type str

Type of loss to use. One of 'mse', 'rmse', 'mae', 'mape', or 'smooth_l1'.

'mse'
device device

Device for computations. Defaults to CPU.

None
**kwargs

Additional arguments forwarded to ACOPFLossFunction (e.g. node_weights, reduction, epsilon, beta).

{}
Source code in lumina/model/opf/losses.py
class OPFLossManager(nn.Module):
    """Unified loss manager that wraps ACOPFLossFunction for the training loop.

    Provides target extraction from both heterogeneous and homogeneous batches,
    optional y_mask filtering for homogeneous data, and a standardized
    ``compute_loss`` interface returning both the scalar loss and an info dict.

    Args:
        loss_type (str): Type of loss to use. One of 'mse', 'rmse', 'mae',
            'mape', or 'smooth_l1'.
        device (torch.device, optional): Device for computations.
            Defaults to CPU.
        **kwargs: Additional arguments forwarded to ``ACOPFLossFunction``
            (e.g. ``node_weights``, ``reduction``, ``epsilon``, ``beta``).
    """

    def __init__(
        self,
        loss_type: str = 'mse',
        device: Optional[torch.device] = None,
        **kwargs
    ):
        super().__init__()

        self.loss_type = loss_type
        self.device = device or torch.device('cpu')
        self.base_loss = ACOPFLossFunction(loss_type=loss_type, **kwargs)

    def compute_loss(
        self,
        predictions: Dict[str, torch.Tensor],
        batch,
        return_info: bool = True,
        **kwargs,
    ) -> Union[torch.Tensor, Tuple[torch.Tensor, Dict]]:
        """Compute the loss given model predictions and a data batch.

        Extracts targets from the batch, applies y_mask filtering for
        homogeneous batches, and delegates to the underlying ACOPFLossFunction.

        Args:
            predictions (Dict[str, torch.Tensor]): Model outputs keyed by
                node type (e.g. ``{'bus': ..., 'generator': ...}``).
            batch: A PyG ``Batch`` or ``HeteroData`` object containing
                target labels.
            return_info (bool): If True, return ``(loss, info_dict)``;
                otherwise return the scalar loss only. Defaults to True.
            **kwargs: Reserved for future use.

        Returns:
            Union[torch.Tensor, Tuple[torch.Tensor, Dict]]: The scalar loss,
                or a tuple of ``(loss, info_dict)`` when ``return_info=True``.
                The info dict contains per-node-type losses and an
                ``'objective'`` key.
        """
        targets = self._extract_targets(predictions, batch)

        masked_predictions = predictions
        masked_targets = targets
        if self._is_homo_batch(batch) and hasattr(batch, "y_mask"):
            masked_predictions, masked_targets = self._apply_homo_target_mask(
                predictions,
                targets,
                batch,
            )

        results = self.base_loss(masked_predictions, masked_targets)
        loss = results['total_loss']
        results.setdefault('objective', loss)

        if return_info:
            return loss, results
        else:
            return loss

    @staticmethod
    def _is_homo_batch(batch) -> bool:
        return hasattr(batch, 'node_type') and not hasattr(batch, 'node_types')

    def _extract_targets(self, predictions: Dict[str, torch.Tensor], batch) -> Dict[str, torch.Tensor]:
        if self._is_homo_batch(batch):
            targets = {}
            y = getattr(batch, 'y', None)
            node_type = getattr(batch, 'node_type', None)
            if y is None or node_type is None:
                return targets
            node_types = ["bus", "generator", "load", "shunt"]
            for idx, name in enumerate(node_types):
                if name not in predictions:
                    continue
                mask = node_type == idx
                if mask.any():
                    targets[name] = y[mask]
            return targets

        targets = {}
        for node_type in predictions.keys():
            if hasattr(batch[node_type], 'y') and batch[node_type].y is not None:
                targets[node_type] = batch[node_type].y
        return targets

    def _apply_homo_target_mask(self, predictions, targets, batch):
        y_mask = getattr(batch, "y_mask", None)
        node_type = getattr(batch, "node_type", None)
        if y_mask is None or node_type is None or not torch.is_tensor(y_mask):
            return predictions, targets

        if y_mask.ndim > 1:
            y_mask = y_mask.all(dim=-1)
        if y_mask.numel() != node_type.numel():
            return predictions, targets

        node_types = ["bus", "generator", "load", "shunt"]
        masked_predictions = {}
        masked_targets = {}
        for idx, name in enumerate(node_types):
            if name not in predictions or name not in targets:
                continue
            type_mask = node_type == idx
            if not bool(type_mask.any().item()):
                continue
            valid_mask = y_mask[type_mask]
            if not bool(valid_mask.any().item()):
                continue
            masked_predictions[name] = predictions[name][valid_mask]
            masked_targets[name] = targets[name][valid_mask]

        return masked_predictions, masked_targets

    def get_loss_info(self) -> Dict:
        """Return a dictionary describing the current loss configuration.

        Returns:
            Dict: Loss metadata including ``loss_type`` and any additional
                info from the underlying ``ACOPFLossFunction``.
        """
        info = {
            'loss_type': self.loss_type,
        }
        if hasattr(self.base_loss, 'get_loss_info'):
            info.update(self.base_loss.get_loss_info())
        return info

compute_loss(predictions: Dict[str, torch.Tensor], batch, return_info: bool = True, **kwargs) -> Union[torch.Tensor, Tuple[torch.Tensor, Dict]]

Compute the loss given model predictions and a data batch.

Extracts targets from the batch, applies y_mask filtering for homogeneous batches, and delegates to the underlying ACOPFLossFunction.

Parameters:

Name Type Description Default
predictions Dict[str, Tensor]

Model outputs keyed by node type (e.g. {'bus': ..., 'generator': ...}).

required
batch

A PyG Batch or HeteroData object containing target labels.

required
return_info bool

If True, return (loss, info_dict); otherwise return the scalar loss only. Defaults to True.

True
**kwargs

Reserved for future use.

{}

Returns:

Type Description
Union[Tensor, Tuple[Tensor, Dict]]

Union[torch.Tensor, Tuple[torch.Tensor, Dict]]: The scalar loss, or a tuple of (loss, info_dict) when return_info=True. The info dict contains per-node-type losses and an 'objective' key.

Source code in lumina/model/opf/losses.py
def compute_loss(
    self,
    predictions: Dict[str, torch.Tensor],
    batch,
    return_info: bool = True,
    **kwargs,
) -> Union[torch.Tensor, Tuple[torch.Tensor, Dict]]:
    """Compute the loss given model predictions and a data batch.

    Extracts targets from the batch, applies y_mask filtering for
    homogeneous batches, and delegates to the underlying ACOPFLossFunction.

    Args:
        predictions (Dict[str, torch.Tensor]): Model outputs keyed by
            node type (e.g. ``{'bus': ..., 'generator': ...}``).
        batch: A PyG ``Batch`` or ``HeteroData`` object containing
            target labels.
        return_info (bool): If True, return ``(loss, info_dict)``;
            otherwise return the scalar loss only. Defaults to True.
        **kwargs: Reserved for future use.

    Returns:
        Union[torch.Tensor, Tuple[torch.Tensor, Dict]]: The scalar loss,
            or a tuple of ``(loss, info_dict)`` when ``return_info=True``.
            The info dict contains per-node-type losses and an
            ``'objective'`` key.
    """
    targets = self._extract_targets(predictions, batch)

    masked_predictions = predictions
    masked_targets = targets
    if self._is_homo_batch(batch) and hasattr(batch, "y_mask"):
        masked_predictions, masked_targets = self._apply_homo_target_mask(
            predictions,
            targets,
            batch,
        )

    results = self.base_loss(masked_predictions, masked_targets)
    loss = results['total_loss']
    results.setdefault('objective', loss)

    if return_info:
        return loss, results
    else:
        return loss

get_loss_info() -> Dict

Return a dictionary describing the current loss configuration.

Returns:

Name Type Description
Dict Dict

Loss metadata including loss_type and any additional info from the underlying ACOPFLossFunction.

Source code in lumina/model/opf/losses.py
def get_loss_info(self) -> Dict:
    """Return a dictionary describing the current loss configuration.

    Returns:
        Dict: Loss metadata including ``loss_type`` and any additional
            info from the underlying ``ACOPFLossFunction``.
    """
    info = {
        'loss_type': self.loss_type,
    }
    if hasattr(self.base_loss, 'get_loss_info'):
        info.update(self.base_loss.get_loss_info())
    return info

ACOPFLossFunction

Bases: Module

Simplified loss function class for ACOPF training using PyTorch built-in losses.

Supports: - MSE (Mean Squared Error) - torch.nn.MSELoss - RMSE (Root Mean Squared Error) - sqrt of MSE - MAE (Mean Absolute Error) - torch.nn.L1Loss - MAPE (Mean Absolute Percentage Error) - custom implementation - SmoothL1Loss (Huber Loss) - torch.nn.SmoothL1Loss

Parameters:

Name Type Description Default
loss_type str

Type of loss function to use: - 'mse': Mean Squared Error - 'rmse': Root Mean Squared Error - 'mae': Mean Absolute Error - 'mape': Mean Absolute Percentage Error - 'smooth_l1': Smooth L1 Loss (Huber Loss)

'mse'
node_weights dict

Weights for different node types. Default: {'bus': 1.0, 'generator': 1.0}

None
reduction str

Reduction method ('mean', 'sum', 'none'). Default: 'mean'

'mean'
epsilon float

Small value to avoid division by zero in MAPE. Default: 1e-8

1e-08
beta float

Beta parameter for SmoothL1Loss. Default: 1.0

1.0
Source code in lumina/model/opf/losses.py
class ACOPFLossFunction(nn.Module):
    """
    Simplified loss function class for ACOPF training using PyTorch built-in losses.

    Supports:
    - MSE (Mean Squared Error) - torch.nn.MSELoss
    - RMSE (Root Mean Squared Error) - sqrt of MSE
    - MAE (Mean Absolute Error) - torch.nn.L1Loss
    - MAPE (Mean Absolute Percentage Error) - custom implementation
    - SmoothL1Loss (Huber Loss) - torch.nn.SmoothL1Loss

    Args:
        loss_type (str): Type of loss function to use:
            - 'mse': Mean Squared Error
            - 'rmse': Root Mean Squared Error
            - 'mae': Mean Absolute Error
            - 'mape': Mean Absolute Percentage Error
            - 'smooth_l1': Smooth L1 Loss (Huber Loss)
        node_weights (dict, optional): Weights for different node types.
            Default: {'bus': 1.0, 'generator': 1.0}
        reduction (str): Reduction method ('mean', 'sum', 'none'). Default: 'mean'
        epsilon (float): Small value to avoid division by zero in MAPE. Default: 1e-8
        beta (float): Beta parameter for SmoothL1Loss. Default: 1.0
    """

    def __init__(
        self,
        loss_type: str = 'mse',
        node_weights: Optional[Dict[str, float]] = None,
        reduction: str = 'mean',
        epsilon: float = 1e-8,
        beta: float = 1.0
    ):
        super(ACOPFLossFunction, self).__init__()

        self.loss_type = loss_type
        self.reduction = reduction
        self.epsilon = epsilon
        self.beta = beta

        self.node_weights = node_weights or {'bus': 1.0, 'generator': 1.0}

        valid_types = ['mse', 'rmse', 'mae', 'mape', 'smooth_l1']
        if loss_type not in valid_types:
            raise ValueError(f"Invalid loss_type '{loss_type}'. Must be one of {valid_types}")

        self._init_loss_functions()

    def _init_loss_functions(self):
        if self.loss_type == 'mse':
            self.criterion = nn.MSELoss(reduction=self.reduction)
        elif self.loss_type == 'mae':
            self.criterion = nn.L1Loss(reduction=self.reduction)
        elif self.loss_type == 'smooth_l1':
            self.criterion = nn.SmoothL1Loss(reduction=self.reduction, beta=self.beta)
        elif self.loss_type == 'rmse':
            self.criterion = nn.MSELoss(reduction='none')
        elif self.loss_type == 'mape':
            self.criterion = nn.L1Loss(reduction='none')

    def _compute_single_loss(self, predictions: torch.Tensor, targets: torch.Tensor) -> torch.Tensor:
        if self.loss_type == 'mse':
            return self.criterion(predictions, targets)
        elif self.loss_type == 'mae':
            return self.criterion(predictions, targets)
        elif self.loss_type == 'smooth_l1':
            return self.criterion(predictions, targets)
        elif self.loss_type == 'rmse':
            mse = self.criterion(predictions, targets)
            rmse = torch.sqrt(mse + self.epsilon)
            return self._reduce_loss(rmse)
        elif self.loss_type == 'mape':
            abs_error = torch.abs(predictions - targets)
            abs_target = torch.abs(targets) + self.epsilon
            mape = abs_error / abs_target
            return self._reduce_loss(mape)
        else:
            raise ValueError(f"Unknown loss function: {self.loss_type}")

    def _reduce_loss(self, loss: torch.Tensor) -> torch.Tensor:
        if self.reduction == 'mean':
            return loss.mean()
        elif self.reduction == 'sum':
            return loss.sum()
        elif self.reduction == 'none':
            return loss
        else:
            raise ValueError(f"Invalid reduction: {self.reduction}")

    def forward(self, predictions: Dict[str, torch.Tensor],
                targets: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]:
        total_loss = None
        node_losses = {}
        loss_components = {}

        for node_type in predictions.keys():
            if node_type not in targets:
                continue

            pred = predictions[node_type]
            target = targets[node_type]

            if torch.is_tensor(target):
                finite_mask = torch.isfinite(target)
                if finite_mask.ndim > 1:
                    finite_mask = finite_mask.all(dim=-1)
                if finite_mask.ndim == 0:
                    if not bool(finite_mask.item()):
                        continue
                else:
                    if not finite_mask.any():
                        continue
                    pred = pred[finite_mask]
                    target = target[finite_mask]

            node_loss = self._compute_single_loss(pred, target)
            node_weight = self.node_weights.get(node_type, 1.0)

            node_losses[f"{node_type}_loss"] = node_loss
            weighted_loss = node_weight * node_loss
            total_loss = weighted_loss if total_loss is None else total_loss + weighted_loss

        if total_loss is None:
            device = next(iter(predictions.values())).device
            total_loss = torch.tensor(0.0, device=device)

        results = {
            'total_loss': total_loss,
            **node_losses
        }

        if loss_components:
            results['loss_components'] = loss_components

        return results

    def compute_loss(self, predictions: Dict[str, torch.Tensor], batch) -> Dict[str, torch.Tensor]:
        targets = self._extract_targets(predictions, batch)
        masked_predictions = predictions
        masked_targets = targets
        if self._is_homo_batch(batch) and hasattr(batch, "y_mask"):
            masked_predictions, masked_targets = self._apply_homo_target_mask(
                predictions,
                targets,
                batch,
            )
        return self.forward(predictions, targets)

    @staticmethod
    def _is_homo_batch(batch) -> bool:
        return hasattr(batch, 'node_type') and not hasattr(batch, 'node_types')

    def _extract_targets(self, predictions: Dict[str, torch.Tensor], batch) -> Dict[str, torch.Tensor]:
        if self._is_homo_batch(batch):
            targets = {}
            y = getattr(batch, 'y', None)
            node_type = getattr(batch, 'node_type', None)
            if y is None or node_type is None:
                return targets
            node_types = ["bus", "generator", "load", "shunt"]
            for idx, name in enumerate(node_types):
                if name not in predictions:
                    continue
                mask = node_type == idx
                if mask.any():
                    targets[name] = y[mask]
            return targets

        targets = {}
        for node_type in predictions.keys():
            if hasattr(batch[node_type], 'y') and batch[node_type].y is not None:
                targets[node_type] = batch[node_type].y
        return targets

    def get_loss_info(self) -> Dict:
        return {
            'loss_type': self.loss_type,
            'node_weights': self.node_weights,
            'reduction': self.reduction,
            'epsilon': self.epsilon,
            'beta': self.beta
        }

PhysicsInformedLoss

Bases: ACOPFLossFunction

Physics-informed loss combining standard ML loss with physics constraint penalties.

Extends ACOPFLossFunction by adding a penalty term computed from power system constraint violations (e.g. power flow, line limits). The total loss is ML_loss + physics_weight * physics_penalty.

Parameters:

Name Type Description Default
base_loss_config dict

Configuration dict forwarded to ACOPFLossFunction (must contain at least 'loss_type'). Defaults to {'loss_type': 'mse'}.

None
physics_weight float

Scalar weight for the physics penalty term. Defaults to 1.0.

1.0
constraint_types list

Which constraint families to penalize, e.g. ['power_flow', 'line_limits'].

None
penalty_method str

Penalty formulation. One of 'quadratic', 'absolute', or 'log_barrier'. Defaults to 'quadratic'.

'quadratic'
**kwargs

Additional keyword arguments forwarded to ACOPFLossFunction.

{}
Source code in lumina/model/opf/losses.py
class PhysicsInformedLoss(ACOPFLossFunction):
    """Physics-informed loss combining standard ML loss with physics constraint penalties.

    Extends ``ACOPFLossFunction`` by adding a penalty term computed from
    power system constraint violations (e.g. power flow, line limits).
    The total loss is ``ML_loss + physics_weight * physics_penalty``.

    Args:
        base_loss_config (dict, optional): Configuration dict forwarded to
            ``ACOPFLossFunction`` (must contain at least ``'loss_type'``).
            Defaults to ``{'loss_type': 'mse'}``.
        physics_weight (float): Scalar weight for the physics penalty term.
            Defaults to 1.0.
        constraint_types (list, optional): Which constraint families to
            penalize, e.g. ``['power_flow', 'line_limits']``.
        penalty_method (str): Penalty formulation. One of ``'quadratic'``,
            ``'absolute'``, or ``'log_barrier'``. Defaults to ``'quadratic'``.
        **kwargs: Additional keyword arguments forwarded to
            ``ACOPFLossFunction``.
    """

    def __init__(
        self,
        base_loss_config: Dict = None,
        physics_weight: float = 1.0,
        constraint_types: List[str] = None,
        penalty_method: str = 'quadratic',
        **kwargs
    ):
        base_config = base_loss_config or {'loss_type': 'mse'}
        super().__init__(**base_config, **kwargs)

        self.physics_weight = physics_weight
        self.constraint_types = constraint_types or ['power_flow', 'line_limits']
        self.penalty_method = penalty_method
        self.constraint_computer = None

    def set_constraint_computer(self, constraint_computer):
        """Attach a constraint violation computer for physics penalty evaluation.

        Args:
            constraint_computer: Object with a ``compute_violations`` method
                that accepts predictions and returns a dict of violation
                tensors keyed by constraint type.
        """
        self.constraint_computer = constraint_computer

    def _compute_physics_penalty(self, predictions: Dict[str, torch.Tensor]) -> torch.Tensor:
        if self.constraint_computer is None:
            return torch.tensor(0.0, device=next(iter(predictions.values())).device)

        violations = self.constraint_computer.compute_violations(predictions)

        total_penalty = 0.0
        for constraint_type in self.constraint_types:
            if constraint_type in violations:
                violation = violations[constraint_type]

                if self.penalty_method == 'quadratic':
                    penalty = torch.mean(violation ** 2)
                elif self.penalty_method == 'absolute':
                    penalty = torch.mean(torch.abs(violation))
                elif self.penalty_method == 'log_barrier':
                    penalty = -torch.mean(torch.log(torch.clamp(-violation, min=self.epsilon)))
                else:
                    raise ValueError(f"Unknown penalty method: {self.penalty_method}")

                total_penalty += penalty

        return total_penalty

    def forward(self, predictions: Dict[str, torch.Tensor],
                targets: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]:
        results = super().forward(predictions, targets)
        physics_penalty = self._compute_physics_penalty(predictions)
        total_loss = results['total_loss'] + self.physics_weight * physics_penalty
        results.update({
            'total_loss': total_loss,
            'ml_loss': results['total_loss'],
            'physics_penalty': physics_penalty
        })
        return results

set_constraint_computer(constraint_computer)

Attach a constraint violation computer for physics penalty evaluation.

Parameters:

Name Type Description Default
constraint_computer

Object with a compute_violations method that accepts predictions and returns a dict of violation tensors keyed by constraint type.

required
Source code in lumina/model/opf/losses.py
def set_constraint_computer(self, constraint_computer):
    """Attach a constraint violation computer for physics penalty evaluation.

    Args:
        constraint_computer: Object with a ``compute_violations`` method
            that accepts predictions and returns a dict of violation
            tensors keyed by constraint type.
    """
    self.constraint_computer = constraint_computer

RMSELoss

Bases: Module

Root Mean Squared Error Loss.

.. math:: RMSE = \sqrt{ rac{1}{N} \sum_{i=1}^{N} (y_i - \hat{y}_i)^2}

Parameters:

Name Type Description Default
reduction str

Reduction method to apply to the output: 'mean', 'sum', or 'none'. Default: 'mean'

'mean'
epsilon float

Small value to avoid sqrt(0). Default: 1e-8

1e-08
Source code in lumina/model/opf/losses.py
class RMSELoss(nn.Module):
    """Root Mean Squared Error Loss.

    .. math::
        RMSE = \\sqrt{\frac{1}{N} \\sum_{i=1}^{N} (y_i - \\hat{y}_i)^2}

    Args:
        reduction (str): Reduction method to apply to the output: 'mean', 'sum',
            or 'none'. Default: 'mean'
        epsilon (float): Small value to avoid sqrt(0). Default: 1e-8
    """

    def __init__(self, reduction: str = 'mean', epsilon: float = 1e-8):
        super(RMSELoss, self).__init__()
        self.reduction = reduction
        self.epsilon = epsilon
        self.mse_loss = nn.MSELoss(reduction='none')

    def forward(self, predictions: torch.Tensor, targets: torch.Tensor) -> torch.Tensor:
        mse = self.mse_loss(predictions, targets)
        rmse = torch.sqrt(mse + self.epsilon)

        if self.reduction == 'mean':
            return rmse.mean()
        elif self.reduction == 'sum':
            return rmse.sum()
        elif self.reduction == 'none':
            return rmse
        else:
            raise ValueError(f"Invalid reduction: {self.reduction}")

MAPELoss

Bases: Module

Mean Absolute Percentage Error Loss.

.. math:: MAPE = \frac{1}{N} \sum_{i=1}^{N} \left| \frac{y_i - \hat{y}_i}{y_i} \right|

Parameters:

Name Type Description Default
reduction str

Reduction method to apply to the output: 'mean', 'sum', or 'none'. Default: 'mean'

'mean'
epsilon float

Small value to avoid division by zero. Default: 1e-8

1e-08
Source code in lumina/model/opf/losses.py
class MAPELoss(nn.Module):
    """Mean Absolute Percentage Error Loss.

    .. math::
        MAPE = \\frac{1}{N} \\sum_{i=1}^{N} \\left| \\frac{y_i - \\hat{y}_i}{y_i} \\right|

    Args:
        reduction (str): Reduction method to apply to the output: 'mean', 'sum',
            or 'none'. Default: 'mean'
        epsilon (float): Small value to avoid division by zero. Default: 1e-8
    """

    def __init__(self, reduction: str = 'mean', epsilon: float = 1e-8):
        super(MAPELoss, self).__init__()
        self.reduction = reduction
        self.epsilon = epsilon

    def forward(self, predictions: torch.Tensor, targets: torch.Tensor) -> torch.Tensor:
        abs_error = torch.abs(predictions - targets)
        abs_target = torch.abs(targets) + self.epsilon
        mape = abs_error / abs_target

        if self.reduction == 'mean':
            return mape.mean()
        elif self.reduction == 'sum':
            return mape.sum()
        elif self.reduction == 'none':
            return mape
        else:
            raise ValueError(f"Invalid reduction: {self.reduction}")