Source code for pyhgf.utils.to_pandas
# Author: Nicolas Legrand <nicolas.legrand@cas.au.dk>
from typing import TYPE_CHECKING
import jax.numpy as jnp
import pandas as pd
from pyhgf.math import binary_surprise, gaussian_surprise
if TYPE_CHECKING:
from pyhgf.model import Network
[docs]
def to_pandas(network: "Network") -> pd.DataFrame:
"""Export the nodes trajectories and surprise as a Pandas data frame.
Returns
-------
trajectories_df :
Pandas data frame with the time series of sufficient statistics and the
surprise of each node in the structure.
"""
n_nodes = len(network.edges)
# get time and time steps from the first input node
trajectories_df = pd.DataFrame(
{
"time_steps": network.node_trajectories[-1]["time_step"],
"time": jnp.cumsum(network.node_trajectories[-1]["time_step"]),
}
)
# loop over continuous and binary state nodes and store sufficient statistics
# ---------------------------------------------------------------------------
states_indexes = [i for i in range(n_nodes) if network.edges[i].node_type in [1, 2]]
df = pd.DataFrame(
dict(
[
(f"x_{i}_{var}", network.node_trajectories[i][var])
for i in states_indexes
for var in network.node_trajectories[i].keys()
if (("mean" in var) or ("precision" in var))
]
)
)
trajectories_df = pd.concat([trajectories_df, df], axis=1)
# loop over exponential family state nodes and store sufficient statistics
# ------------------------------------------------------------------------
ef_indexes = [i for i in range(n_nodes) if network.edges[i].node_type == 3]
for i in ef_indexes:
for var in ["nus", "xis", "mean"]:
if network.node_trajectories[i][var].ndim == 1:
trajectories_df = pd.concat(
[
trajectories_df,
pd.DataFrame(
dict([(f"x_{i}_{var}", network.node_trajectories[i][var])])
),
],
axis=1,
)
else:
for ii in range(network.node_trajectories[i][var].shape[1]):
trajectories_df = pd.concat(
[
trajectories_df,
pd.DataFrame(
dict(
[
(
f"x_{i}_{var}_{ii}",
network.node_trajectories[i][var][:, ii],
)
]
)
),
],
axis=1,
)
# add surprise from binary state nodes
binary_indexes = [i for i in range(n_nodes) if network.edges[i].node_type == 1]
for bin_idx in binary_indexes:
surprise = binary_surprise(
x=network.node_trajectories[bin_idx]["mean"],
expected_mean=network.node_trajectories[bin_idx]["expected_mean"],
)
trajectories_df[f"x_{bin_idx}_surprise"] = surprise
# add surprise from continuous state nodes
continuous_indexes = [i for i in range(n_nodes) if network.edges[i].node_type == 2]
for con_idx in continuous_indexes:
surprise = gaussian_surprise(
x=network.node_trajectories[con_idx]["mean"],
expected_mean=network.node_trajectories[con_idx]["expected_mean"],
expected_precision=network.node_trajectories[con_idx]["expected_precision"],
)
trajectories_df[f"x_{con_idx}_surprise"] = surprise
# compute the global surprise over all node
trajectories_df["total_surprise"] = trajectories_df.iloc[
:, trajectories_df.columns.str.contains("_surprise")
].sum(axis=1, min_count=1)
return trajectories_df