Source code for graph_tool.inference.reconstruction

#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# graph_tool -- a general graph manipulation python module
#
# Copyright (C) 2006-2024 Tiago de Paula Peixoto <tiago@skewed.de>
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from .. import _prop, _get_rng, Vector_int32_t

from .. spectral import adjacency
from .. generation import complete_graph, generate_triadic_closure, \
    remove_parallel_edges, graph_union
from .. dynamics import IsingGlauberState, NormalState, IsingBPState, \
    NormalBPState, LinearNormalState, LVState

from . uncertain_blockmodel import *
from . util import pmap

from abc import ABC
import inspect
import numpy as np
import numbers

import scipy.stats

import warnings

def get_bisect_args(**kwargs):
    args = libinference.bisect_args()
    for k, v in kwargs.items():
        try:
            if not hasattr(args, k):
                raise AttributeError(k)
            setattr(args, k, v)
        except AttributeError as e:
            raise ValueError(f"invalid bisect_args: {str(k)}, {str(v)}") from e
    return args

[docs] @entropy_state_signature class DynamicsBlockStateBase(UncertainBaseState): def __init__(self, s, g=None, directed=False, t=[], active=None, x=1, x_range=(-np.inf, np.inf), theta=0, theta_range=(-np.inf, np.inf), xdelta=1e-8, xdelta_min=1e-8, tdelta=1e-8, tdelta_min=1e-8, disable_xdist=False, disable_tdist=False, nested=True, state_args={}, bstate=None, self_loops=False, max_m=1 << 16, entropy_args={}, **kwargs): r"""Base state for network reconstruction based on dynamical or behavioral data, using the stochastic block model as a prior. This class is not meant to be instantiated directly, only indirectly via one of its subclasses. Nevertheless, its paramteres are inherited, and are documented as follows. Parameters ---------- s : :class:`~numpy.ndarray` of shape ``(N,M)`` or ``list`` of :class:`~graph_tool.VertexPropertyMap` or :class:`~graph_tool.VertexPropertyMap` Time series or independent samples used for reconstruction. If the type is :class:`~numpy.ndarray`, it should correspond to a ``(N,M)`` data matrix with ``M`` samples for all ``N`` nodes. If the parameter ``g`` is provided, this can be optionally a list of of :class:`~graph_tool.VertexPropertyMap` objects, where each entry in this list must be a :class:`~graph_tool.VertexPropertyMap` with type ``vector<int>`` or ``vector<double>``, depending on wether the model has discrete or continuous state values. If a single property map is given, then a single time series is assumed. If the parameter ``t`` below is given, each property map value for a given node should contain only the states for the same points in time given by that parameter. g : :class:`~graph_tool.Graph` (optional, default: ``None``) Initial graph state. If not provided, an empty graph will be assumed. directed : ``boolean`` or ``None`` (optional, default: ``None``) If ``True``, the inferred graph will be directed, or if ``False`` it will be undirected. If this option is ``None``, the directionality will be the same as the parameter ``g``, if it is provided, otherwise ``directed = False`` will be assumed. t : ``list`` of :class:`~graph_tool.VertexPropertyMap` or :class:`~graph_tool.VertexPropertyMap` (optional, default: ``[]``) If nonempty, this allows for a compressed representation of the time-series parameter ``s``, corresponding only to points in time where the state of each node changes. Each entry in this list must be a :class:`~graph_tool.VertexPropertyMap` with type ``vector<int>`` containing the points in time where the state of each node change. The corresponding state of the nodes at these times are given by parameter ``s``. If a single property map is given, then a single time series is assumed. active : :class:`~numpy.ndarray` of shape ``(N,M)`` or ``list`` of :class:`~graph_tool.VertexPropertyMap` or :class:`~graph_tool.VertexPropertyMap` (optional, default: ``None``) If provided, this should contain binary valies specifing wether nodes are "active" or not at specific times/samples. An "active" node contributes normally to the likelihood, while an inactive one is removed from the likelihood computation, with its value being replaced by a standard "inactive" state (the value of ``s`` for that node/sample is ignored). If the parameter ``g`` is provided, this can be optionally a list of of :class:`~graph_tool.VertexPropertyMap` objects, where each entry in this list must be a :class:`~graph_tool.VertexPropertyMap` with type ``vector<int>``, with values interpreted in the same way as above. If a single property map is given, then a single time series is assumed. x : :class:`~graph_tool.EdgePropertyMap` or ``float`` (optional, default: ``1.``) Initial value of the edge weights. If a ``float`` is given, the edge weights will be assumed to be the same for all edges. x_range : pair of ``float``s (optional, default: ``(-np.inf, np.inf)``) Determines the allowed range of edge weights. theta : :class:`~graph_tool.EdgePropertyMap` or ``float`` (optional, default: ``0.``) Initial value of the node parameters (a.k.a. node bias). If a ``float`` is given, the node biases will be assumed to be the same for all edges. theta_range : pair of ``float``s (optional, default: ``(-np.inf, np.inf)``) Determines the allowed range of the node parameters. xdelta : ``float`` (optional, default: ``1e-8``) Initial value for the edge weight precision parameter. xdelta_min : ``float`` (optional, default: ``1e-8``) Minimal value for the edge weight precision parameter. tdelta : ``float`` (optional, default: ``1e-8``) Initial value for the node bias precision parameter. tdelta_min : ``float`` (optional, default: ``1e-8``) Minimal value for the node bias precision parameter. disable_xdist : ``boolean`` (optional, default: ``False``) If ``True`` the, quantization of the edge weights will be turned off, and :math:`L_1` regularization will be used instead. disable_tdist : ``boolean`` (optional, default: ``False``) If ``True`` the, quantization of the node parameters will be turned off, and :math:`L_1` regularization will be used instead. nested : ``boolean`` (optional, default: ``True``) If ``True``, a :class:`~graph_tool.inference.nested_blockmodel.NestedBlockState` will be used, otherwise :class:`~graph_tool.inference.blockmodel.BlockState`. state_args : ``dict`` (optional, default: ``{}``) Arguments to be passed to :class:`~graph_tool.inference.nested_blockmodel.NestedBlockState` or :class:`~graph_tool.inference.blockmodel.BlockState`. bstate : :class:`~graph_tool.inference.nested_blockmodel.NestedBlockState` or :class:`~graph_tool.inference.blockmodel.BlockState` (optional, default: ``None``) If passed, this will be used to initialize the block state directly. self_loops : bool (optional, default: ``False``) If ``True``, it is assumed that the inferred graph can contain self-loops. max_m : ``int`` (optional, default: ``1 << 16``) Maximum latent edge multiplicity allowed. entropy_args : ``dict`` (optional, default: ``{}``) Override default arguments for :meth:`~DynamicsBlockStateBase.entropy()` method and releated operations. """ if g is None: if directed is None: directed = False g = Graph(s.shape[0], directed=directed) elif directed is None: directed = g.is_directed() if g.is_directed() != directed: g = GraphView(g, directed=directed) UncertainBaseState.__init__(self, g, nested=nested, state_args=state_args, bstate=bstate, self_loops=self_loops, entropy_args=entropy_args) if isinstance(s, np.ndarray): if kwargs.pop("discrete", False): s = g.new_vp("vector<int>", vals=s) else: s = g.new_vp("vector<double>", vals=s) if active is None: active = [] if isinstance(active, np.ndarray): active = g.new_vp("vector<int>", vals=active) if isinstance(s, PropertyMap): s = [s] if isinstance(t, PropertyMap): t = [t] if isinstance(active, PropertyMap): active = [active] self.s = [self.u.own_property(x) for x in s] self.t = [self.u.own_property(x) for x in t] self.active = [self.u.own_property(x) for x in active] if x is None: x = self.u.new_ep("double") elif isinstance(x, EdgePropertyMap): x = self.u.copy_property(x, g=x.get_graph(), value_type="double") else: x = self.u.new_ep("double", val=x) self.x = x self.xmin_bound = x_range[0] self.xmax_bound = x_range[1] if theta is None: theta = self.u.new_vp("double") elif isinstance(theta, VertexPropertyMap): theta = self.u.copy_property(theta, g=theta.get_graph(), value_type="double") else: theta = self.u.new_vp("double", val=theta) self.theta = theta self.tmin_bound = theta_range[0] self.tmax_bound = theta_range[1] self.xdelta = xdelta self.tdelta = tdelta self.xdelta_min = xdelta_min self.tdelta_min = tdelta_min self.disable_xdist = disable_xdist self.disable_tdist = disable_tdist if disable_xdist: self.update_entropy_args(xdist=False, sbm=entropy_args.get("sbm", False)) if disable_tdist: self.update_entropy_args(tdist=False) kwargs = kwargs.copy() for k in kwargs.keys(): v = kwargs[k] if isinstance(v, PropertyMap): kwargs[k] = g.own_property(v) elif (isinstance(v, collections.abc.Iterable) and len(v) > 0 and isinstance(v[0], PropertyMap)): kwargs[k] = [g.own_property(x) for x in v] self.rg = kwargs.pop("rg", None) self.elist = kwargs.pop("elist", None) self.ecandidates = kwargs.pop("ecandidates", np.zeros((0, 2), dtype="int64")) self.params = kwargs self.os = [ns._get_any() for ns in self.s] self.ot = [nt._get_any() for nt in self.t] self.oactive = [nt._get_any() for nt in self.active] self.max_m = max_m self._state = libinference.make_dynamics_state(self.bstate._state, self) self._dstate = self._make_state() self._state.set_dstate(self._dstate) self.theta_first = True
[docs] def set_params(self, params): r"""Sets the model parameters via the dictionary ``params``.""" self.params = dict(self.params, **params) self._state.set_params(self.params)
[docs] def get_params(self, params): r"""Gets the model parameters via the dictionary ``params``.""" return dict(self.params)
def __getstate__(self): state = UncertainBaseState.__getstate__(self) return dict(state, s=self.s, t=self.t, active=self.active, x=self.x, x_range=(self.xmin_bound, self.xmax_bound), theta=self.theta, theta_range=(self.tmin_bound, self.tmax_bound), xdelta=self.xdelta, tdelta=self.tdelta, disable_xdist=self.disable_xdist, disable_tdist=self.disable_tdist, rg=self.rg, elist=self.elist, ecandidates=self.ecandidates, max_m=self.max_m, **self.params) def __repr__(self): return "<%s object with %s, %d edge categories and %d node categories, at 0x%x>" % \ (self.__class__.__name__, self.nbstate if self.nbstate is not None else self.bstate, len(self.get_xvals()), len(self.get_tvals()), id(self)) def _gen_eargs(self, args): uea = UncertainBaseState._gen_eargs(self, args) return libinference.dentropy_args(uea) @copy_state_wrap def _entropy(self, latent_edges=True, density=False, aE=1, sbm=True, xdist=True, tdist=True, xdist_uniform=False, tdist_uniform=False, xl1=1, tl1=1, alpha=1, normal=False, mu=0, sigma=1, active=True, **kwargs): r"""Return the description length, i.e. the negative joint log-likelihood. Parameters ---------- latent_edges : ``boolean`` (optional, default: ``True``) If ``True``, the adjacency term of the description length will be included. density : ``boolean`` (optional, default: ``False``) If ``True``, a geometric prior for the total number of edges will be included. aE : ``double`` (optional, default: ``1``) If ``density=True``, this will correspond to the expected number of edges according to the geometric prior. sbm : ``boolean`` (optional, default: ``True``) If ``True``, SBM description length will be included. xdist : ``boolean`` (optional, default: ``True``) If ``True``, the quantized edge weight distribution description length will be included. xdist_uniform : ``boolean`` (optional, default: ``False``) If ``True``, a uniform prior for the edge weight distribution will be used. tdist : ``boolean`` (optional, default: ``True``) If ``True``, the quantized node parameter distribution description length will be included. tdist_uniform : ``boolean`` (optional, default: ``False``) If ``True``, a uniform prior for the node parameter distribution will be used. xl1 : ``float`` (optional, default: ``1``) Specifies the :math:`\lambda` parameter for :math:`L_1` regularization for the edge weights if ``xdist == False``, or the Laplace hyperprior for the discrete categories if ``xdist == True``. tl1 : ``float`` (optional, default: ``1``) Specifies the :math:`\lambda` parameter for :math:`L_1` regularization for the node paraemters if ``tdist == False``, or the Laplace hyperprior for the discrete categories if ``tdist == True``. normal : ``boolean`` (optional, default: ``False``) If ``True``, a normal distribution will be used for the weight priors. mu : ``double`` (optional, default: ``0``) If ``normal == True``, this will be the mean of the normal distribution. sigma : ``double`` (optional, default: ``1``) If ``normal == True``, this will be the standard deviation of the normal distribution. active : ``boolean`` (optional, default: ``True``) If ``True``, the contribution of the active/inactive node states will be added to the description length. Notes ----- The "entropy" of the state is the negative log-likelihood of the generative model for the data :math:`\boldsymbol S`, that includes the inferred weighted adjacency matrix :math:`\boldsymbol{X}`, the node parameters :math:`\boldsymbol{\theta}`, and the SBM node partition :math:`\boldsymbol{b},` given by .. math:: \begin{aligned} \Sigma(\boldsymbol{S},\boldsymbol{X},\boldsymbol{\theta}|\lambda_x,\lambda_{\theta},\Delta) = &- \ln P(\boldsymbol{S}|\boldsymbol{X},\boldsymbol{\theta})\\ &- \ln P(\boldsymbol{X}|\boldsymbol{A},\lambda_x, \Delta)\\ &- \ln P(\boldsymbol{A},\boldsymbol{b})\\ &- \ln P(\boldsymbol{\theta}, \lambda_{\theta}, \Delta). \end{aligned} The term :math:`P(\boldsymbol{S}|\boldsymbol{X},\boldsymbol{\theta})` is given by the particular generative model being used and :math:`P(\boldsymbol{A},\boldsymbol{b})` by the SBM. The weight ditribution is given by the quantized model .. math:: P(\boldsymbol X|\boldsymbol A,\lambda_x,\Delta) = \frac{\prod_{k}m_{k}!\times \mathrm{e}^{-\lambda_x \sum_k |z_k|}(\mathrm{e}^{\lambda\Delta} - 1)^{K}} {E!{E-1 \choose K-1}2^{K}\max(E,1)} where :math:`\boldsymbol z` are the :math:`K` discrete weight categories, and analogously .. math:: P(\boldsymbol\theta|\lambda_{\theta},\Delta) =\frac{\prod_{k}n_{k}!\times \mathrm{e}^{-\lambda \sum_k |u_k|} \sinh(\lambda_{\theta}\Delta)^{K_{\theta}-\mathbb{1}_{0\in\boldsymbol u}} (1-\mathrm{e}^{-\lambda_{\theta}\Delta})^{\mathbb{1}_{0\in\boldsymbol u}}} {N!{N-1 \choose K_{\theta}-1}N}, is the node parameter quantized distribution. For more details see [peixoto-network-2024]_. References ---------- .. [peixoto-network-2024] Tiago P. Peixoto, "Network reconstruction via the minimum description length principle", :arxiv:`2405.01015` .. [peixoto-scalable-2024] Tiago P. Peixoto, "Scalable network reconstruction in subquadratic time", :arxiv:`2401.01404` """ eargs = self._get_entropy_args(locals()) S = self._state.entropy(eargs) if sbm: if self.nbstate is None: S += self.bstate.entropy(**kwargs) else: S += self.nbstate.entropy(**kwargs) return S def _get_elist(self, k, elist_args={}, beta=np.inf): if self.elist is not None and self.elist[0] == k: return if np.isinf(beta) or self.elist is None: elist_args = elist_args.copy() elist, w = self.get_candidate_edges(k, **elist_args) self.elist = (k, elist, w)
[docs] def collect_candidates(self, u=None): r"""Store current edges into the list of candidates for MCMC. If :class:`~graph_tool.Graph` ``u`` is provided, its edges will be added instead.""" if u is None: self.collect_candidates(GraphView(self.u, directed=False)) if self.elist is not None: self.collect_candidates(self.elist[1]) else: if not isinstance(u, Graph): u = Graph(u, directed=False) u.add_vertex(self.u.num_vertices() - u.num_vertices()) cg = Graph(self.ecandidates, directed=False) cg.add_vertex(self.u.num_vertices() - cg.num_vertices()) graph_union(cg, u, intersection=u.vertex_index, include=True) cg.set_fast_edge_removal() remove_parallel_edges(cg) self.ecandidates = cg.get_edges()
[docs] def clear_candidates(self): r"""Clear candidate edges for MCMC.""" self.ecandidates = np.zeros((0, 2), dtype="int64")
[docs] def mcmc_sweep(self, beta=1, niter=1, k=1, keep_elist=False, edge=True, edge_swap=True, edge_multiflip=True, theta=True, theta_multiflip=True, sbm=True, xvals=True, tvals=True, verbose=False, elist_args={}, edge_mcmc_args={}, edge_swap_mcmc_args={}, edge_multiflip_mcmc_args={}, xvals_mcmc_args={}, theta_mcmc_args={}, theta_multiflip_mcmc_args={}, tvals_mcmc_args={}, sbm_mcmc_args={}, **kwargs): r"""Perform sweeps of a Metropolis-Hastings acceptance-rejection sampling MCMC to sample latent edges and network partitions. If ``beta`` is ``np.inf``, the algorithm switches to a greedy heuristic based on iterated nearest-neighbor searches. Parameters ---------- beta : ``float`` (optional, default: ``1``) Inverse temperature parameter. niter : ``int`` (optional, default: ``1``) Number of sweeps. k : ``int`` (optional, default: ``1``) :math:`\kappa` parameter to be passed to :meth:`~.DynamicsBlockStateBase.get_candidate_edges`. This parameter is ignored if ``beta`` is not ``np.inf``. elist_args : ``dict`` (optiona, default: ``{}``) Paramters to pass to call :meth:`~.DynamicsBlockStateBase.get_candidate_edges`. This parameter is ignored if ``beta`` is not ``np.inf``. keep_elist : ``boolean`` (optional, default: ``False``) If ``True``, the candidate edge list from last call will be re-used (if it exists). edge : ``boolean`` (optiona, default: ``True``) Whether to call :meth:`~.DynamicsBlockStateBase.edge_mcmc_sweep`. edge_mcmc_args : ``dict`` (optiona, default: ``{}``) Paramters to pass to call :meth:`~.DynamicsBlockStateBase.edge_mcmc_sweep`. edge_swap : ``boolean`` (optiona, default: ``True``) Whether to call :meth:`~.DynamicsBlockStateBase.swap_mcmc_sweep`. edge_mcmc_args : ``dict`` (optiona, default: ``{}``) Paramters to pass to call :meth:`~.DynamicsBlockStateBase.swap_mcmc_sweep`. edge_multiflip : ``boolean`` (optiona, default: ``True``) Whether to call :meth:`~.DynamicsBlockStateBase.edge_multiflip_mcmc_sweep`. edge_multiflip_mcmc_args : ``dict`` (optiona, default: ``{}``) Paramters to pass to call :meth:`~.DynamicsBlockStateBase.edge_multiflip_mcmc_sweep`. theta : ``boolean`` (optiona, default: ``True``) Whether to call :meth:`~.DynamicsBlockStateBase.theta_mcmc_sweep`. theta_mcmc_args : ``dict`` (optiona, default: ``{}``) Paramters to pass to call :meth:`~.DynamicsBlockStateBase.theta_mcmc_sweep`. theta_multiflip : ``boolean`` (optiona, default: ``True``) Whether to call :meth:`~.DynamicsBlockStateBase.theta_multiflip_mcmc_sweep`. theta_multiflip_mcmc_args : ``dict`` (optiona, default: ``{}``) Paramters to pass to call :meth:`~.DynamicsBlockStateBase.theta_multiflip_mcmc_sweep`. sbm : ``boolean`` (optiona, default: ``True``) Whether to call :meth:`~.DynamicsBlockStateBase.sbm_mcmc_sweep`. sbm_mcmc_args : ``dict`` (optiona, default: ``{}``) Paramters to pass to call :meth:`~.DynamicsBlockStateBase.sbm_mcmc_sweep`. xvals : ``boolean`` (optiona, default: ``True``) Whether to call :meth:`~.DynamicsBlockStateBase.xvals_sweep`. xvals_mcmc_args : ``dict`` (optiona, default: ``{}``) Paramters to pass to call :meth:`~.DynamicsBlockStateBase.xvals_sweep`. tvals : ``boolean`` (optiona, default: ``True``) Whether to call :meth:`~.DynamicsBlockStateBase.tvals_sweep`. tvals_mcmc_args : ``dict`` (optiona, default: ``{}``) Paramters to pass to call :meth:`~.DynamicsBlockStateBase.tvals_sweep`. verbose : ``boolean`` (optional, default: ``False``) If ``verbose == True``, detailed information will be displayed. **kwargs : ``dict`` (optional, default: ``{}``) Remaining keyword parameters will be passed to all individual MCMC functions. Returns ------- dS : ``float`` Entropy difference after the sweeps. nmoves : ``int`` Number of variables moved. """ ret = (0, 0, 0) def theta_mcmc(): nonlocal ret if self.tmax_bound != self.tmin_bound: if theta: if verbose: print("theta_mcmc_sweep:") tret = self.theta_mcmc_sweep(**dict(dict(beta=beta, niter=niter, **kwargs), **theta_mcmc_args)) ret = (sum(x) for x in zip(ret, tret)) if not self.disable_tdist: if theta_multiflip: if verbose: print("theta_multiflip_mcmc_sweep:") tret = self.theta_multiflip_mcmc_sweep(**dict(dict(beta=beta, niter=10 * niter, **kwargs), **theta_multiflip_mcmc_args)) ret = (sum(x) for x in zip(ret, tret)) if tvals: if verbose: print("tvals_sweep:") eret = self.tvals_sweep(**dict(dict(beta=beta, niter=10 * niter, **kwargs), **tvals_mcmc_args)) ret = (sum(x) for x in zip(ret, eret)) if self.theta_first: theta_mcmc() if edge: if verbose: print("edge_mcmc_sweep:") edge_mcmc_args = dict(edge_mcmc_args, k=k, keep_elist=keep_elist, elist_args=elist_args) eret = self.edge_mcmc_sweep(**dict(dict(beta=beta, niter=niter, **kwargs), **edge_mcmc_args)) ret = (sum(x) for x in zip(ret, eret)) keep_elist = True if edge_swap: if verbose: print("swap_mcmc_sweep:") eret = self.swap_mcmc_sweep(**dict(dict(beta=beta, niter=niter, **kwargs), **edge_swap_mcmc_args)) ret = (sum(x) for x in zip(ret, eret)) keep_elist = True if self.xmax_bound != self.xmin_bound and not self.disable_xdist: if edge_multiflip: if verbose: print("edge_multiflip_sweep:") eret = self.edge_multiflip_mcmc_sweep(**dict(dict(beta=beta, niter=10 * niter, **kwargs), **edge_multiflip_mcmc_args)) ret = (sum(x) for x in zip(ret, eret)) if xvals: if verbose: print("xvals_sweep:") eret = self.xvals_sweep(**dict(dict(beta=beta, niter=10 * niter, **kwargs), **xvals_mcmc_args)) ret = (sum(x) for x in zip(ret, eret)) if not self.theta_first: theta_mcmc() if sbm: if verbose: print("sbm_mcmc_sweep:") kwargs = kwargs.copy() eargs = kwargs.get("entropy_args", {}).copy() for k in list(eargs.keys()): if k not in self.bstate._entropy_args: eargs.pop(k, None) kwargs["entropy_args"] = eargs sret = self.sbm_mcmc_sweep(**dict(dict(beta=beta, niter=10 * niter, **kwargs), **sbm_mcmc_args)) ret = (sum(x) for x in zip(ret, sret)) if np.isinf(beta): self.collect_candidates() return tuple(ret)
[docs] @mcmc_sweep_wrap def edge_mcmc_sweep(self, beta=1., niter=1, k=1, elist_args={}, keep_elist=False, pold=1, pnew=1, pxu=.1, pm=1, premove=1, pself=0.1, puniform=1, pedge=1, pnearby=1, d=2, pcandidates=1, bisect_args={}, binary=False, deterministic=False, sequential=True, parallel=True, verbose=False, entropy_args={}, **kwargs): r"""Perform sweeps of a Metropolis-Hastings acceptance-rejection sampling MCMC to sample latent edges. If ``beta`` is ``np.inf``, the algorithm switches to a greedy heuristic based on iterated nearest-neighbor searches. Parameters ---------- beta : ``float`` (optional, default: ``1.``) Inverse temperature parameter. niter : ``int`` (optional, default: ``1``) Number of sweeps. k : ``int`` (optional, default: ``1``) :math:`\kappa` parameter to be passed to :meth:`~.DynamicsBlockStateBase.get_candidate_edges`. This parameter is ignored if ``beta`` is not ``np.inf``. elist_args : ``dict`` (optional, default: ``{}``) Paramters to pass to call :meth:`~.DynamicsBlockStateBase.get_candidate_edges`. This parameter is ignored if ``beta`` is not ``np.inf``. keep_elist : ``boolean`` (optional, default: ``False``) If ``True``, the candidate edge list from last call will be re-used (if it exists). pold : ``float`` (optional, default: ``1``) Relative probability of proposing a new edge weight from existing categories. pnew : ``float`` (optional, default: ``1``) Relative probability of proposing a new edge weight from a new categories. pxu : ``float`` (optional, default: ``.1``) Probability of choosing from an existing category uniformly at random (instead of doing a bisection search). pm : ``float`` (optional, default: ``1``) Relative probability of doing edge multiplicity updates. premove : ``float`` (optional, default: ``1``) Relative probability of removing edges. pself : ``float`` (optional, default: ``.1``) Relative probability to search for self-loops as candidate node "pairs". puniform : ``float`` (optional, default: ``1``) Relative probability to search for candidate node pairs uniformly at random. pedge : ``float`` (optional, default: ``1``) Relative probability to search for candidate node pairs among the current edges. pnearby : ``float`` (optional, default: ``1``) Relative probability to search for candidate node pairs among the neighborhood of current nodes up to distance ``d``. d : ``int`` (optional, default: ``2``) Maximum distance used to search for candidate node pairs. pcandidates : ``float`` (optional, default: ``1``) Relative probability to search for candidate node pairs among currently stored list of candidate edges (obtained with :meth:`~.DynamicsBlockStateBase.collect_candidates`). bisect_args : ``dict`` (optional, default: ``{}``) Parameter for the bisection search use to optimize/sample edge weights. See :meth:`~.DynamicsBlockStateBase.bisect_x`) for documentation. binary : ``boolean`` (optional, default: ``False``) If ``True``, the latent graph will be assumed to be a simple graph, otherwise a multigraph. deterministic : ``boolean`` (optional, default: ``False``) If ``True``, the the order of edge updates will be determinisitc, otherwise uniformly at random. sequential : ``boolean`` (optional, default: ``True``) If ``True``, a sweep will visit every edge candidate once, otherwise individiual updates will be chosen at random. parallel : ``boolean`` (optional, default: ``True``) If ``True``, the updates are performed in parallel, using locks on edges candidate incident on the same node. verbose : ``boolean`` (optional, default: ``False``) If ``verbose == True``, detailed information will be displayed. entropy_args : ``dict`` (optional, default: ``{}``) Entropy arguments, with the same meaning and defaults as in :meth:`~.DynamicsBlockStateBase.entropy`. Returns ------- dS : ``float`` Entropy difference after the sweeps. nmoves : ``int`` Number of variables moved. """ if not keep_elist and np.isinf(beta): self.elist = None elist_args = dict(elist_args, entropy_args=entropy_args) entropy_args = self._get_entropy_args(entropy_args) bisect_args = get_bisect_args(**dict(dict(min_bound=self.xmin_bound, max_bound=self.xmax_bound, reversible=not np.isinf(beta)), **bisect_args)) ecandidates = self.ecandidates self._get_elist(k, elist_args, beta) elist = self.elist[1] state = self._state if np.isinf(beta): pxu = 0 mcmc_state = DictState(dict(kwargs, **locals())) if len(kwargs) > 0: raise ValueError("unrecognized keyword arguments: " + str(list(kwargs.keys()))) if parallel: return self._state.pseudo_mcmc_sweep(mcmc_state, _get_rng()) else: return self._state.mcmc_sweep(mcmc_state, _get_rng())
[docs] @mcmc_sweep_wrap def swap_mcmc_sweep(self, beta=1, niter=1, preplace=1, pswap=1, pself=0.1, puniform=1, pedge=1, pnearby=1, d=2, pcandidates=1, parallel=True, verbose=False, entropy_args={}, **kwargs): r"""Perform sweeps of a Metropolis-Hastings acceptance-rejection sampling MCMC to swap edge endpoints. Parameters ---------- beta : ``float`` (optional, default: ``np.inf``) Inverse temperature parameter. niter : ``int`` (optional, default: ``1``) Number of sweeps. preplace : ``float`` (optional, default: ``1``) Relative probability of swaping the weights between an edge and another node pair incident on one of the same two endpoints. pswap : ``float`` (optional, default: ``1``) Relative probability of swapping the endpoints of two selected edges or node pairs. pself : ``float`` (optional, default: ``.1``) Relative probability to search for self-loops as candidate node "pairs". puniform : ``float`` (optional, default: ``1``) Relative probability to search for candidate node pairs uniformly at random. pedge : ``float`` (optional, default: ``1``) Relative probability to search for candidate node pairs among the current edges. pnearby : ``float`` (optional, default: ``1``) Relative probability to search for candidate node pairs among the neighborhood of current nodes up to distance ``d``. d : ``int`` (optional, default: ``2``) Maximum distance used to search for candidate node pairs. pcandidates : ``float`` (optional, default: ``1``) Relative probability to search for candidate node pairs among currently stored list of candidate edges (obtained with :meth:`~.DynamicsBlockStateBase.collect_candidates`). parallel : ``boolean`` (optional, default: ``True``) If ``True``, the updates are performed in parallel, using locks on edges candidate incident on the same node. verbose : ``boolean`` (optional, default: ``False``) If ``verbose == True``, detailed information will be displayed. entropy_args : ``dict`` (optional, default: ``{}``) Entropy arguments, with the same meaning and defaults as in :meth:`~.DynamicsBlockStateBase.entropy`. Returns ------- dS : ``float`` Entropy difference after the sweeps. nmoves : ``int`` Number of variables moved. """ entropy_args = self._get_entropy_args(entropy_args) ecandidates = self.ecandidates state = self._state if parallel: sequential = True mcmc_state = DictState(dict(kwargs, **locals())) if len(kwargs) > 0: raise ValueError("unrecognized keyword arguments: " + str(list(kwargs.keys()))) if parallel: return self._state.pseudo_swap_mcmc_sweep(mcmc_state, _get_rng()) else: return self._state.swap_mcmc_sweep(mcmc_state, _get_rng())
[docs] @mcmc_sweep_wrap def edge_multiflip_mcmc_sweep(self, beta=1., niter=1, pmerge=1, psplit=1, pmergesplit=1, gibbs_sweeps=5, c=.1, bisect_args={}, accept_stats=None, verbose=False, entropy_args={}, **kwargs): r"""Perform sweeps of a Metropolis-Hastings acceptance-rejection merge-split MCMC to sample discrete edge weight categories. Parameters ---------- beta : ``float`` (optional, default: ``1.``) Inverse temperature parameter. niter : ``int`` (optional, default: ``1``) Number of sweeps. pmerge : ``float`` (optional, default: ``1``) Relative probability of merging two discrete categories. psplit : ``float`` (optional, default: ``1``) Relative probability of splitting two discrete categories. pmergesplit : ``float`` (optional, default: ``1``) Relative probability of simultaneoulsly merging and splitting two discrete categories. gibbs_sweeps : ``int`` (optional, default: ``5``) Number of Gibbs sweeps performed to achieve a split proposal. c : ``double`` (optional, default: ``.1``) Probability of choosing a category uniformly at random to perform a merge, otherwise an adjacent one is chosen. bisect_args : ``dict`` (optional, default: ``{}``) Parameter for the bisection search. See :meth:`~.DynamicsBlockStateBase.bisect_x`) for documentation. accept_stats : ``dict`` (optional, default: ``None``) If provided, the dictionary will be updated with acceptance statistics. verbose : ``boolean`` (optional, default: ``False``) If ``verbose == True``, detailed information will be displayed. entropy_args : ``dict`` (optional, default: ``{}``) Entropy arguments, with the same meaning and defaults as in :meth:`~.DynamicsBlockStateBase.entropy`. Returns ------- dS : ``float`` Entropy difference after the sweeps. nmoves : ``int`` Number of variables moved. """ kwargs = dict(**kwargs) E = self.u.num_edges() if E == 0: return (0, 0, 0) niter /= E gibbs_sweeps = max((gibbs_sweeps, 1)) nproposal = Vector_size_t(4) nacceptance = Vector_size_t(4) force_move = kwargs.pop("force_move", False) bisect_args = get_bisect_args(**dict(dict(min_bound=self.xmin_bound, max_bound=self.xmax_bound, reversible=not np.isinf(beta)), **bisect_args)) mcmc_state = DictState(locals()) mcmc_state.state = self._state mcmc_state.entropy_args = self._get_entropy_args(entropy_args) if len(kwargs) > 0: raise ValueError("unrecognized keyword arguments: " + str(list(kwargs.keys()))) ret = self._state.multiflip_mcmc_sweep(mcmc_state, _get_rng()) if accept_stats is not None: for key in ["proposal", "acceptance"]: if key not in accept_stats: accept_stats[key] = np.zeros(len(nproposal), dtype="uint64") accept_stats["proposal"] += nproposal.a accept_stats["acceptance"] += nacceptance.a return ret
[docs] @mcmc_sweep_wrap def xvals_sweep(self, beta=1., niter=100, bisect_args={}, min_size=1, entropy_args={}, verbose=False): r"""Perform sweeps of a Metropolis-Hastings acceptance-rejection merge-split MCMC to sample the edge weight category values, based on a bisection search. Parameters ---------- beta : ``float`` (optional, default: ``1.``) Inverse temperature parameter. niter : ``int`` (optional, default: ``100``) Maximum number of categories to update. bisect_args : ``dict`` (optional, default: ``{}``) Parameter for the bisection search use to optimize/sample edge weights. See :meth:`~.DynamicsBlockStateBase.bisect_x`) for documentation. min_size : ``int`` (optional, default: ``1``) Minimum size of edge categories that will be updated. entropy_args : ``dict`` (optional, default: ``{}``) Entropy arguments, with the same meaning and defaults as in :meth:`~.DynamicsBlockStateBase.entropy`. verbose : ``boolean`` (optional, default: ``False``) If ``verbose == True``, detailed information will be displayed. Returns ------- dS : ``float`` Entropy difference after the sweeps. nmoves : ``int`` Number of variables moved. """ xvals = self.get_xvals() r = min(niter/len(xvals), 1) if len(xvals) > 0 else 1 niter = max(1, int(round(niter/len(xvals)))) if len(xvals) > 0 else 1 ea = self._get_entropy_args(entropy_args) ba = get_bisect_args(**dict(dict(min_bound=self.xmin_bound, max_bound=self.xmax_bound, reversible=not np.isinf(beta)), **bisect_args)) ret = (0, 0) for i in range(niter): eret = self._state.xvals_sweep(beta, r, min_size, ea, ba, _get_rng()) ret = tuple(sum(x) for x in zip(ret, eret)) return ret
[docs] def get_x(self): """Return latent edge weights.""" return self.x.copy()
[docs] def get_xvals(self): """Return latent edge weight categories.""" return self._state.get_xvals()
[docs] def get_xhist(self): """Return histogram (i.e. counts) of edge weight categories.""" xvals = self.get_xvals() if len(xvals) == 0: return np.array([], dtype="int") xvals = np.append(xvals, np.nextafter(xvals[-1], np.inf)) return np.histogram(self.get_x().fa, xvals)[0]
[docs] @mcmc_sweep_wrap def theta_mcmc_sweep(self, beta=1., niter=1, pold=1, pnew=1, ptu=.1, bisect_args={}, deterministic=False, sequential=True, parallel=True, verbose=False, entropy_args={}, **kwargs): r"""Perform sweeps of a Metropolis-Hastings acceptance-rejection sampling MCMC to sample node parameters. Parameters ---------- beta : ``float`` (optional, default: ``1.``) Inverse temperature parameter. niter : ``int`` (optional, default: ``1``) Number of sweeps. pold : ``float`` (optional, default: ``1``) Relative probability of proposing a new node value from existing categories. pnew : ``float`` (optional, default: ``1``) Relative probability of proposing a new node value from a new categories. ptu : ``float`` (optional, default: ``.1``) Probability of choosing from an existing category uniformly at random (instead of doing a bisection search). bisect_args : ``dict`` (optional, default: ``{}``) Parameter for the bisection search use to optimize/sample edge weights. See :meth:`~.DynamicsBlockStateBase.bisect_x`) for documentation. deterministic : ``boolean`` (optional, default: ``False``) If ``True``, the the order of node updates will be determinisitc, otherwise uniformly at random. sequential : ``boolean`` (optional, default: ``True``) If ``True``, a sweep will visit every node once, otherwise individiual updates will be chosen at random. parallel : ``boolean`` (optional, default: ``True``) If ``True``, the updates are performed in parallel. verbose : ``boolean`` (optional, default: ``False``) If ``verbose == True``, detailed information will be displayed. entropy_args : ``dict`` (optional, default: ``{}``) Entropy arguments, with the same meaning and defaults as in :meth:`~.DynamicsBlockStateBase.entropy`. Returns ------- dS : ``float`` Entropy difference after the sweeps. nmoves : ``int`` Number of variables moved. """ entropy_args = self._get_entropy_args(entropy_args) bisect_args = get_bisect_args(**dict(dict(min_bound=self.tmin_bound, max_bound=self.tmax_bound, reversible=not np.isinf(beta)), **bisect_args)) state = self._state mcmc_state = DictState(dict(kwargs, **locals())) if len(kwargs) > 0: raise ValueError("unrecognized keyword arguments: " + str(list(kwargs.keys()))) if parallel: return self._state.pseudo_mcmc_theta_sweep(mcmc_state, _get_rng()) else: return self._state.mcmc_theta_sweep(mcmc_state, _get_rng())
[docs] @mcmc_sweep_wrap def theta_multiflip_mcmc_sweep(self, beta=1., niter=1, pmerge=1, psplit=1, pmergesplit=1, gibbs_sweeps=5, c=.1, bisect_args={}, accept_stats=None, verbose=False, entropy_args={}, **kwargs): r"""Perform sweeps of a Metropolis-Hastings acceptance-rejection merge-split MCMC to sample discrete node value categories. Parameters ---------- beta : ``float`` (optional, default: ``1.``) Inverse temperature parameter. niter : ``int`` (optional, default: ``1``) Number of sweeps. pmerge : ``float`` (optional, default: ``1``) Relative probability of merging two discrete categories. psplit : ``float`` (optional, default: ``1``) Relative probability of splitting two discrete categories. pmergesplit : ``float`` (optional, default: ``1``) Relative probability of simultaneoulsly merging and splitting two discrete categories. gibbs_sweeps : ``int`` (optional, default: ``5``) Number of Gibbs sweeps performed to achieve a split proposal. c : ``double`` (optional, default: ``.1``) Probability of choosing a category uniformly at random to perform a merge, otherwise an adjacent one is chosen. bisect_args : ``dict`` (optional, default: ``{}``) Parameter for the bisection search. See :meth:`~.DynamicsBlockStateBase.bisect_x`) for documentation. accept_stats : ``dict`` (optional, default: ``None``) If provided, the dictionary will be updated with acceptance statistics. verbose : ``boolean`` (optional, default: ``False``) If ``verbose == True``, detailed information will be displayed. entropy_args : ``dict`` (optional, default: ``{}``) Entropy arguments, with the same meaning and defaults as in :meth:`~.DynamicsBlockStateBase.entropy`. Returns ------- dS : ``float`` Entropy difference after the sweeps. nmoves : ``int`` Number of variables moved. """ niter /= self.u.num_vertices() gibbs_sweeps = max((gibbs_sweeps, 1)) nproposal = Vector_size_t(4) nacceptance = Vector_size_t(4) force_move = kwargs.pop("force_move", False) bisect_args = get_bisect_args(**dict(dict(min_bound=self.tmin_bound, max_bound=self.tmax_bound, reversible=not np.isinf(beta)), **bisect_args)) mcmc_state = DictState(locals()) mcmc_state.entropy_args = self._get_entropy_args(entropy_args) mcmc_state.state = self._state if len(kwargs) > 0: raise ValueError("unrecognized keyword arguments: " + str(list(kwargs.keys()))) ret = self._state.multiflip_mcmc_theta_sweep(mcmc_state, _get_rng()) if accept_stats is not None: for key in ["proposal", "acceptance"]: if key not in accept_stats: accept_stats[key] = np.zeros(len(nproposal), dtype="uint64") accept_stats["proposal"] += nproposal.a accept_stats["acceptance"] += nacceptance.a return ret
[docs] @mcmc_sweep_wrap def tvals_sweep(self, beta=1., niter=100, min_size=1, bisect_args={}, entropy_args={}): r"""Perform sweeps of a Metropolis-Hastings acceptance-rejection merge-split MCMC to sample the node bias category values, based on a bisection search. Parameters ---------- beta : ``float`` (optional, default: ``1.``) Inverse temperature parameter. niter : ``int`` (optional, default: ``100``) Maximum number of categories to update. bisect_args : ``dict`` (optional, default: ``{}``) Parameter for the bisection search use to optimize/sample node biases. See :meth:`~.DynamicsBlockStateBase.bisect_x`) for documentation. min_size : ``int`` (optional, default: ``1``) Minimum size of node categories that will be updated. entropy_args : ``dict`` (optional, default: ``{}``) Entropy arguments, with the same meaning and defaults as in :meth:`~.DynamicsBlockStateBase.entropy`. verbose : ``boolean`` (optional, default: ``False``) If ``verbose == True``, detailed information will be displayed. Returns ------- dS : ``float`` Entropy difference after the sweeps. nmoves : ``int`` Number of variables moved. """ tvals = self.get_tvals() r = min(niter/len(tvals), 1) if len(tvals) > 0 else 1 niter = max(1, int(round(niter/len(tvals)))) if len(tvals) > 0 else 1 ea = self._get_entropy_args(entropy_args) ba = get_bisect_args(**dict(dict(min_bound=self.tmin_bound, max_bound=self.tmax_bound, reversible=not np.isinf(beta)), **bisect_args)) ret = (0, 0) for i in range(niter): eret = self._state.tvals_sweep(beta, r, min_size, ea, ba, _get_rng()) ret = tuple(sum(x) for x in zip(ret, eret)) return ret
[docs] def get_theta(self): """Return latent node values.""" return self.theta.copy()
[docs] def get_tvals(self): """Return latent node categories.""" return self._state.get_tvals()
[docs] def get_thist(self): """Return histogram of node categories.""" tvals = self.get_tvals() tvals = np.append(tvals, np.nextafter(tvals[-1], np.inf)) return np.histogram(self.get_theta().fa, tvals)[0]
[docs] def get_edge_prob(self, u, v, x, entropy_args={}, epsilon=1e-8): r"""Return conditional posterior log-probability of edge :math:`(u,v)`.""" ea = self._get_entropy_args(entropy_args) return self._state.get_edge_prob(u, v, x, ea, epsilon)
[docs] def get_edges_prob(self, elist, entropy_args={}, epsilon=1e-8): r"""Return conditional posterior log-probability of an edge list, with shape :math:`(E,2)`.""" ea = self._get_entropy_args(entropy_args) elist = np.asarray(elist, dtype="double") probs = np.zeros(elist.shape[0]) self._state.get_edges_prob(elist, probs, ea, epsilon) return probs
[docs] def edge_cov(self, u, v, toffset=True, pearson=False): r"""Return the covariance (or Pearson correlation if ``pearson == True``) between nodes :math:`u` and :math:`v`, according to their time-series. """ return self._state.node_cov(u, v, toffset, pearson)
[docs] def edge_TE(self, u, v): r"""Return the transfer entropy between nodes :math:`u` and :math:`v`, according to their time-series. """ return self._state.node_TE(u, v)
[docs] def edge_MI(self, u, v): r"""Return the mutual information between nodes :math:`u` and :math:`v`, according to their time-series. """ return self._state.node_MI(u, v)
[docs] def get_candidate_edges(self, k=1, r=1, max_rk="k", epsilon=0.01, c_stop=False, maxiter=0, knn=False, gradient=None, h=2e-3, allow_edges=False, include_edges=True, use_hint=True, nrandom=0, keep_all=False, exact=False, return_graph=False, keep_iter=False, entropy_args={}, bisect_args={}, verbose=False): r"""Return the :math:`\lfloor\kappa N\rceil` best edge candidates according to a stochastic second neighbor search. Parameters ---------- k : ``float`` (optional, default: ``1``) :math:`\kappa` parameter. r : ``float`` (optional, default: ``1``) Fraction of second neighbors to consider during the search. max_rk : ``float`` (optional, default: ``"k"``) Maximum number of second-neighbors to be considered per iteration. A string value ``"k"`` means that this will match the number of first neighbors. epsilon : ``float`` (optional, default: ``.01``) Convergence criterion. c_stop : ``boolean`` (optional, default: ``False``) If ``True``, the clustering coefficient will be used for the convergence criterion. maxiter : ``int`` (optional, default: ``0``) Maximum number of iterations allowed (``0`` means unlimited). knn : ``boolean`` (optional, default: ``False``) If ``True``, the KNN graph will be returned. gradient : ``boolean`` (optional, default: ``None``) Whether to use the gradient to rank edges. If ``None``, it defaults to ``True`` is the number of edge categories is empty. h : ``float`` (optional, default: ``1e-8``) Step length used to compute the gradient with central finite difference. allow_edges : ``boolean`` (optional, default: ``False``) Permit currently present edges to be included in the search. use_hint : ``boolean`` (optional, default: ``True``) Use current edges as a hint during the search. nrandom : ``int`` (optional, default: ``0``) Add this many random entries to the list. keep_all : ``boolean`` (optional, default: ``False``) Keep all entries seen during the search, not only the best. exact : ``boolean`` (optional, default: ``False``) If ``True`` an exact quadratic algorithm will be used. return_graph : ``boolean`` (optional, default: ``False``) If ``True`` the result will be returned as graph and a property map. keep_iter : ``boolean`` (optional, default: ``False``) If ``True`` the results contain the iteration at which an entry has been found. entropy_args : ``dict`` (optional, default: ``{}``) Entropy arguments, with the same meaning and defaults as in :meth:`~.DynamicsBlockStateBase.entropy`. bisect_args : ``dict`` (optional, default: ``{}``) Parameter for the bisection search use to optimize/sample edge weights. See :meth:`~.DynamicsBlockStateBase.bisect_x`) for documentation. Returns ------- elist : :class:``~numpy.ndarray`` of shape ``(E, 2)`` Best entries. a : :class:``~numpy.ndarray`` Edge scores. """ g = Graph(self.g.num_vertices(), fast_edge_removal=True) w = g.new_ep("double") ei = g.new_ep("int32_t") N = g.num_vertices() if knn: M = k else: M = k * self.g.num_vertices() M = max((int(round(M)), 1)) if max_rk is None: max_rk = g.num_vertices() elif max_rk == "k": max_rk = k ea = self._get_entropy_args(entropy_args) if gradient is None: gradient = len(self.get_xvals()) == 0 bisect_args = get_bisect_args(**dict(dict(maxiter=10, tol=1e-4, min_bound=self.xmin_bound, max_bound=self.xmax_bound), **bisect_args)) n_tot = self._state.get_candidate_edges(g._Graph__graph, M, r, max_rk, epsilon, c_stop, maxiter, _prop("e", g, w), _prop("e", g, ei), keep_iter, ea, exact, knn, keep_all, gradient, h, bisect_args, allow_edges, include_edges, use_hint, nrandom, verbose, _get_rng()) if return_graph: if not knn and not self.u.is_directed(): g.set_directed(False) if keep_iter: return g, w, ei, n_tot else: return g, w, n_tot elist = g.get_edges([w]) idx = elist[:,2].argsort() a = elist[idx,2] elist = numpy.array((elist[idx,0], elist[idx,1]), dtype="int").T return elist, a
[docs] def virtual_remove_edge(self, u, v, dm=1, entropy_args={}): """Return the difference in description length if edge :math:`(u, v)` with multiplicity ``dm`` would be removed. """ entropy_args = self._get_entropy_args(entropy_args) return self._state.remove_edge_dS(int(u), int(v), dm, entropy_args)
[docs] def virtual_add_edge(self, u, v, x, dm=1, entropy_args={}): """Return the difference in description length if edge :math:`(u, v)` would be added with multiplicity ``dm`` and weight ``x``. """ entropy_args = self._get_entropy_args(entropy_args) return self._state.add_edge_dS(int(u), int(v), dm, x, entropy_args)
[docs] def virtual_update_edge(self, u, v, nx, entropy_args={}): """Return the difference in description length if edge :math:`(u, v)` would take a new weight ``nx``. """ entropy_args = self._get_entropy_args(entropy_args) return self._state.update_edge_dS(int(u), int(v), nx, entropy_args)
[docs] def virtual_update_node(self, v, nt, entropy_args={}): """Return the difference in description length if node ``v`` would take a new value ``nt``. """ entropy_args = self._get_entropy_args(entropy_args) return self._state.update_node_dS(int(v), nt, entropy_args)
[docs] def remove_edge(self, u, v, dm=1): r"""Remove edge :math:`(u, v)` with multiplicity ``dm``.""" return self._state.remove_edge(int(u), int(v), dm)
[docs] def add_edge(self, u, v, x, dm=1): r"""Add edge :math:`(u, v)` with multiplicity ``dm`` and weight ``x``.""" return self._state.add_edge(int(u), int(v), dm, x)
[docs] def update_edge(self, u, v, nx): r"""update edge :math:`(u, v)` with weight ``nx``.""" return self._state.update_edge(int(u), int(v), nx)
[docs] def update_node(self, v, nt): r"""update node :math:`(u, v)` with value ``nt``.""" return self._state.update_node(int(v), nt)
[docs] def bisect_x(self, u, v, entropy_args={}, bisect_args={}, fb=False, ret_sampler=False): r"""Perform a bisection search to find the best weight value for edge :math:`(u, v)`. Parameters ---------- u : ``int`` or :class:`~graph_tool.Vertex` Source v : ``int`` or :class:`~graph_tool.Vertex` Target entropy_args : ``dict`` (optional, default: ``{}``) Entropy arguments, with the same meaning and defaults as in :meth:`~.DynamicsBlockStateBase.entropy`. bisect_args : ``dict`` (optional, default: ``{}``) Parameter for the bisection search use to optimize/sample edge weights. The recognized parameters and their default values are as follows: maxiter : ``int`` (default: ``0``) Maximum number of iterations for bisection search (``0`` means unlimited). tol : ``float`` (default: ``2e-3``) Relative tolerance for bisection search. ftol : ``float`` (default: ``100``) Absolute tolerance used to extend the search range. nmax_extend : ``int`` (default: ``8``) Maximum number of min/max range extensions. min_bound : ``float`` (default: ``-np.inf``) Minimum bound for bisection search. max_bound : ``float`` (default: ``np.inf``) Maximum bound for bisection search. min_init : ``float`` (default: ``-np.inf``) Iniital minimum bound for bisection search. max_init : ``float`` (default: ``np.inf``) Initial maximum bound for bisection search. reversible : ``boolean`` (default: ``True``) Perform search in a manner that is usable for a reversible Markov chain. fb : ``boolean`` (optional, default: ``False``) Perform a Fibonacci (a.k.a. golden ratio) search among the current edge categories, instead of a bisection search among all possible values. ret_sampler : ``boolean`` (optional, default: ``False``) If ``True``, a ``BisectionSampler`` object will be returned as well (for debugging purposes). """ entropy_args = self._get_entropy_args(entropy_args) bisect_args = get_bisect_args(**bisect_args) ret = self._state.bisect_x(int(u), int(v), entropy_args, bisect_args, fb, _get_rng()) if ret_sampler: return ret return ret[0]
[docs] def bisect_t(self, v, entropy_args={}, bisect_args={}, fb=False, ret_sampler=False): r"""Perform a bisection search to find the best bias value for node ``v``. Parameters ---------- v : ``int`` or :class:`~graph_tool.Vertex` Vertex to consider. entropy_args : ``dict`` (optional, default: ``{}``) Entropy arguments, with the same meaning and defaults as in :meth:`~.DynamicsBlockStateBase.entropy`. bisect_args : ``dict`` (optional, default: ``{}``) Parameter for the bisection search. See :meth:`~.DynamicsBlockStateBase.bisect_x`) for documentation. fb : ``boolean`` (optional, default: ``False``) Perform a Fibonacci (a.k.a. golden ratio) search among the current node categories, instead of a bisection search among all possible values. ret_sampler : ``boolean`` (optional, default: ``False``) If ``True``, a ``BisectionSampler`` object will be returned as well (for debugging purposes). """ entropy_args = self._get_entropy_args(entropy_args) bisect_args = get_bisect_args(**bisect_args) ret = self._state.bisect_t(int(v), entropy_args, bisect_args, fb, _get_rng()) if ret_sampler: return ret return ret[0]
[docs] @mcmc_sweep_wrap def xdelta_mcmc_sweep(self, beta=1., niter=1, step=10, pold=.5, pxu=.1, intra_sweeps=10, verbose=False, entropy_args={}, bisect_args={}, **kwargs): r"""Perform sweeps of a Metropolis-Hastings acceptance-rejection MCMC to sample the precision parameter of the edge categories. Parameters ---------- beta : ``float`` (optional, default: ``1.``) Inverse temperature parameter. niter : ``int`` (optional, default: ``1``) Number of sweeps. step : ``float`` (optional, default: ``10``) Multiplicative move step size. pold : ``float`` (optional, default: ``1``) Relative probability of proposing a new edge weight from existing categories. pxu : ``float`` (optional, default: ``.1``) Probability of choosing from an existing category uniformly at random (instead of doing a bisection search). intra_sweeps : ``int`` (optional, default: ``10``) Number of Metropolis-Hastings sweeps performed to achieve a staging proposal. verbose : ``boolean`` (optional, default: ``False``) If ``verbose == True``, detailed information will be displayed. entropy_args : ``dict`` (optional, default: ``{}``) Entropy arguments, with the same meaning and defaults as in :meth:`~.DynamicsBlockStateBase.entropy`. bisect_args : ``dict`` (optional, default: ``{}``) Parameter for the bisection search. See :meth:`~.DynamicsBlockStateBase.bisect_x`) for documentation. Returns ------- dS : ``float`` Entropy difference after the sweeps. nmoves : ``int`` Number of variables moved. """ entropy_args = self._get_entropy_args(entropy_args) bisect_args = get_bisect_args(**dict(dict(min_bound=self.xmin_bound, max_bound=self.xmax_bound), **bisect_args)) state = self._state mcmc_state = DictState(dict(kwargs, **locals())) if len(kwargs) > 0: raise ValueError("unrecognized keyword arguments: " + str(list(kwargs.keys()))) try: ret = self._state.mcmc_sweep_xdelta(mcmc_state, _get_rng()) finally: self.xdelta = self._state.get_xdelta() return ret
[docs] @mcmc_sweep_wrap def tdelta_mcmc_sweep(self, beta=1., niter=1, step=10, pold=.5, ptu=.1, intra_sweeps=10, verbose=False, entropy_args={}, bisect_args={}, **kwargs): r"""Perform sweeps of a Metropolis-Hastings acceptance-rejection MCMC to sample the precision parameter of the node categories. Parameters ---------- beta : ``float`` (optional, default: ``1.``) Inverse temperature parameter. niter : ``int`` (optional, default: ``1``) Number of sweeps. step : ``float`` (optional, default: ``10``) Multiplicative move step size. pold : ``float`` (optional, default: ``1``) Relative probability of proposing a new edge weight from existing categories. ptu : ``float`` (optional, default: ``.1``) Probability of choosing from an existing category uniformly at random (instead of doing a bisection search). intra_sweeps : ``int`` (optional, default: ``10``) Number of Metropolis-Hastings sweeps performed to achieve a staging proposal. verbose : ``boolean`` (optional, default: ``False``) If ``verbose == True``, detailed information will be displayed. entropy_args : ``dict`` (optional, default: ``{}``) Entropy arguments, with the same meaning and defaults as in :meth:`~.DynamicsBlockStateBase.entropy`. bisect_args : ``dict`` (optional, default: ``{}``) Parameter for the bisection search. See :meth:`~.DynamicsBlockStateBase.bisect_x`) for documentation. Returns ------- dS : ``float`` Entropy difference after the sweeps. nmoves : ``int`` Number of variables moved. """ entropy_args = self._get_entropy_args(entropy_args) bisect_args = get_bisect_args(**dict(dict(min_bound=self.tmin_bound, max_bound=self.tmax_bound), **bisect_args)) state = self._state mcmc_state = DictState(dict(kwargs, **locals())) if len(kwargs) > 0: raise ValueError("unrecognized keyword arguments: " + str(list(kwargs.keys()))) try: ret = self._state.mcmc_sweep_tdelta(mcmc_state, _get_rng()) finally: self.tdelta = self._state.get_tdelta() return ret
[docs] def sample_x(self, u, v, beta=1, entropy_args={}, bisect_args={}, ret_sampler=False): r"""Sample a proposed weight value for edge :math:`(u, v)`. Parameters ---------- u : ``int`` or :class:`~graph_tool.Vertex` Source node. v : ``int`` or :class:`~graph_tool.Vertex` Target node. beta : ``float`` (optional, default: ``1``) Inverse temperature. entropy_args : ``dict`` (optional, default: ``{}``) Entropy arguments, with the same meaning and defaults as in :meth:`~.DynamicsBlockStateBase.entropy`. bisect_args : ``dict`` (optional, default: ``{}``) Parameter for the bisection search. See :meth:`~.DynamicsBlockStateBase.bisect_x`) for documentation. ret_sampler : ``boolean`` (optional, default: ``False``) If ``True``, a ``BisectionSampler`` object will be returned as well (for debugging purposes). """ entropy_args = self._get_entropy_args(entropy_args) bisect_args = get_bisect_args(**dict(dict(min_bound=self.xmin_bound, max_bound=self.xmax_bound), **bisect_args)) ret = self._state.sample_x(int(u), int(v), beta, entropy_args, bisect_args, _get_rng()) if ret_sampler: return ret return ret[0]
[docs] def sample_t(self, v, beta=1, entropy_args={}, bisect_args={}): r"""Sample a value for node ``v``. Parameters ---------- v : ``int`` or :class:`~graph_tool.Vertex` Node to be considered. beta : ``float`` (optional, default: ``1``) Inverse temperature. entropy_args : ``dict`` (optional, default: ``{}``) Entropy arguments, with the same meaning and defaults as in :meth:`~.DynamicsBlockStateBase.entropy`. bisect_args : ``dict`` (optional, default: ``{}``) Parameter for the bisection search. See :meth:`~.DynamicsBlockStateBase.bisect_x`) for documentation. ret_sampler : ``boolean`` (optional, default: ``False``) If ``True``, a ``BisectionSampler`` object will be returned as well (for debugging purposes). """ entropy_args = self._get_entropy_args(entropy_args) bisect_args = get_bisect_args(**dict(dict(min_bound=self.tmin_bound, max_bound=self.tmax_bound), **bisect_args)) ret = self._state.sample_t(int(v), beta, entropy_args, bisect_args, _get_rng()) if ret_sampler: return ret return ret[0]
[docs] def set_xdelta(self, delta): """Set edge weight precision parameter.""" self._state.set_xdelta(delta) self.xdelta = delta
[docs] def set_tdelta(self, delta): """Set node bias precision parameter.""" self._state.set_tdelta(delta) self.tdelta = delta
[docs] def collect_marginal(self, g=None): r"""Collect marginal inferred network during MCMC runs. Parameters ---------- g : :class:`~graph_tool.Graph` (optional, default: ``None``) Previous marginal graph. Returns ------- g : :class:`~graph_tool.Graph` New marginal graph, with internal edge :class:`~graph_tool.EdgePropertyMap` ``"eprob"``, ``"x"``, and ``"xdev"`` containing the marginal probabilities, mean weight, and weight standard deviation, respectively, for each edge, and :class:`~graph_tool.VertexPropertyMap` ``"t"`` and ``"tdev"``, for the mean and standard deviation, respectively, for the node biases. Notes ----- The posterior marginal probability of an edge :math:`(i,j)` is defined as .. math:: \pi_{ij} = \sum_{\boldsymbol x}{\boldsymbol 1}_{x_{ij}>0}P(\boldsymbol x|\boldsymbol D) where :math:`P(\boldsymbol x|\boldsymbol D)` is the posterior probability of the edge weights :math:`\boldsymbol x` given the data. Likewise, the marginal mean :math:`\left<x_{ij}\right>` and standard deviation :math:`\sigma_{ij}` for the edge weights are given by .. math:: \begin{aligned} \left<x_{ij}\right> &= \sum_{\boldsymbol x}x_{ij}P(\boldsymbol x|\boldsymbol D)\\ \sigma_{ij}^2 &= \sum_{\boldsymbol x}(x_{ij} - \left<x_{ij}\right>)^2 P(\boldsymbol x|\boldsymbol D). \end{aligned} The mean and standard deviation for the node biases are entirely analogous. """ if g is None: g = Graph(directed=self.g.is_directed()) g.add_vertex(self.g.num_vertices()) g.gp.count = g.new_gp("int", 0) g.ep.count = g.new_ep("int") g.ep.eprob = g.new_ep("double") if "x" not in g.ep: g.ep.xsum = g.new_ep("double") g.ep.x2sum = g.new_ep("double") g.ep.x = g.new_ep("double") g.ep.xdev = g.new_ep("double") if "t" not in g.vp: g.vp.tsum = g.new_vp("double") g.vp.t2sum = g.new_vp("double") g.vp.t = g.new_vp("double") g.vp.tdev = g.new_vp("double") u = self.get_graph() x = self.get_x() try: libinference.collect_xmarginal(g._Graph__graph, u._Graph__graph, _prop("e", u, x), _prop("e", g, g.ep.count), _prop("e", g, g.ep.xsum), _prop("e", g, g.ep.x2sum)) finally: g.gp.count += 1 g.ep.eprob.fa = g.ep.count.fa / g.gp.count g.ep.x.fa = g.ep.xsum.fa / g.ep.count.fa g.ep.xdev.fa = np.sqrt(np.abs(g.ep.x2sum.fa / g.ep.count.fa - g.ep.x.fa ** 2)) g.vp.tsum.fa += self.theta.fa g.vp.t2sum.fa += self.theta.fa ** 2 g.vp.t.fa = g.vp.tsum.fa / g.gp.count g.vp.tdev.fa = np.sqrt(np.abs(g.vp.t2sum.fa / g.gp.count - g.vp.t.fa ** 2)) return g
[docs] class BPBlockStateBase(DynamicsBlockStateBase, ABC): def __init__(self, *args, **kwargs): r"""Base state for network reconstruction where the respective model can be used with belief-propagation.""" DynamicsBlockStateBase.__init__(self, *args, **kwargs)
[docs] def get_S_bp(self, **kwargs): """Get negative model likelihood according to BP.""" bstate = self.get_bp_state(converge=False) bstate.converge(**kwargs) S = 0 for s in self.s: S -= bstate.log_prob(s) return S
[docs] def get_elist_grad(self, h=2e-3, entropy_args={}): """Get edge list gradient.""" entropy_args = self._get_entropy_args(entropy_args) ediff = [] for u, v in self.elist[1]: ediff.append(self._state.edge_diff(u, v, h, entropy_args)) return ediff
[docs] def get_node_grad(self, h=2e-3, entropy_args={}): """Get node gradient.""" entropy_args = self._get_entropy_args(entropy_args) ndiff = [] for v in self.u.vertices(): ndiff.append(self._state.node_diff(v, h, entropy_args)) return ndiff
[docs] def get_S(self): """Get negative model likelihood according to pseudo-likelihood.""" return DynamicsBlockStateBase.entropy(self, sbm=False, tdist=False, xdist=False, density=False, xl1=0, tl1=0, alpha=1, test=False)
[docs] @abstractmethod def get_bp_state(**kwargs): pass
[docs] class EpidemicsBlockState(DynamicsBlockStateBase): def __init__(self, s, g=None, exposed=False, theta=-1, x_range=(-np.inf, 1e-8), theta_range=(-np.inf, 1e-8), **kwargs): r"""Inference state for network reconstruction based on epidemic dynamics, using the stochastic block model as a prior. Parameters ---------- s : :class:`~numpy.ndarray` of shape ``(N,M)`` or ``list`` of :class:`~graph_tool.VertexPropertyMap` or :class:`~graph_tool.VertexPropertyMap` Time series used for reconstruction. If the type is :class:`~numpy.ndarray`, it should correspond to a ``(N,M)`` data matrix with ``M`` samples for all ``N`` nodes. A value of ``1`` means infected and ``0`` susceptible. Other values are allowed (e.g. for recovered), but their actual value is unimportant for reconstruction. If the parameter ``g`` is provided, this can be optionally a list of of :class:`~graph_tool.VertexPropertyMap` objects, where each entry in this list must be a :class:`~graph_tool.VertexPropertyMap` with type ``vector<int>``. If a single property map is given, then a single time series is assumed. If the parameter ``t`` below is given, each property map value for a given node should contain only the states for the same points in time given by that parameter. g : :class:`~graph_tool.Graph` (optional, default: ``None``) Initial graph state. If not provided, an empty graph will be assumed. exposed : ``boolean`` (optional, default: ``False``) If ``True``, the data is supposed to come from a SEI, SEIR, etc. model, where a susceptible node (valued ``0``) first transits to an exposed state (valued ``-1``) upon transmission, before transiting to the infective state (valued ``1``). **kwargs : (optional) Remaining parameters to be passed to :class:`~graph_tool.inference.DynamicsBlockStateBase` References ---------- .. [peixoto-network-2019] Tiago P. Peixoto, "Network reconstruction and community detection from dynamics", Phys. Rev. Lett. 123 128301 (2019), :doi:`10.1103/PhysRevLett.123.128301`, :arxiv:`1903.10833` .. [peixoto-network-2024] Tiago P. Peixoto, "Network reconstruction via the minimum description length principle", :arxiv:`2405.01015` .. [peixoto-scalable-2024] Tiago P. Peixoto, "Scalable network reconstruction in subquadratic time", :arxiv:`2401.01404` """ DynamicsBlockStateBase.__init__(self, s, g=g, exposed=exposed, theta=theta, x_range=x_range, theta_range=theta_range, directed=directed, discrete=True, **dict(dict(directed=True), **kwargs)) def _make_state(self): return libinference.make_epidemics_state(self._state, self.ot, self.os, self.oactive, self.params)
[docs] def get_t(self): """Return the latent edge transmission probabilities.""" x = self.get_x() x.fa = 1-exp(x.fa) return x
def _mcmc_sweep(self, mcmc_state): return libinference.mcmc_epidemics_sweep(mcmc_state, self._state, _get_rng())
[docs] class IsingBlockStateBase(ABC): @abstractmethod def __init__(self, s, g=None, has_zero=False, **kwargs): r"""Base state for network reconstruction based on the Ising model, using the stochastic block model as a prior. This class is not supposed to be instantiated directly. Instead one of its specialized subclasses must be used, which have the same signature: :class:`IsingGlauberBlockState`, :class:`PseudoIsingBlockState`, :class:`CIsingGlauberBlockState`, :class:`PseudoCIsingBlockState`. Parameters ---------- s : :class:`~numpy.ndarray` of shape ``(N,M)`` or ``list`` of :class:`~graph_tool.VertexPropertyMap` or :class:`~graph_tool.VertexPropertyMap` Time series or independent samples used for reconstruction. If the type is :class:`~numpy.ndarray`, it should correspond to a ``(N,M)`` data matrix with ``M`` samples for all ``N`` nodes. The values must correspond to Ising states: ``-1`` or ``+1`` If the parameter ``g`` is provided, this can be optionally a list of of :class:`~graph_tool.VertexPropertyMap` objects, where each entry in this list must be a :class:`~graph_tool.VertexPropertyMap` with type ``vector<int>``. If a single property map is given, then a single time series is assumed. If the parameter ``t`` below is given, each property map value for a given node should contain only the states for the same points in time given by that parameter. g : :class:`~graph_tool.Graph` (optional, default: ``None``) Initial graph state. If not provided, an empty graph will be assumed. has_zero : bool (optional, default: ``False``) If ``True``, the three-state "Ising" model with values ``{-1,0,1}`` is used. **kwargs : (optional) Remaining parameters to be passed to :class:`~graph_tool.inference.DynamicsBlockStateBase` References ---------- .. [ising-model] https://en.wikipedia.org/wiki/Ising_model .. [peixoto-network-2019] Tiago P. Peixoto, "Network reconstruction and community detection from dynamics", Phys. Rev. Lett. 123 128301 (2019), :doi:`10.1103/PhysRevLett.123.128301`, :arxiv:`1903.10833` .. [peixoto-network-2024] Tiago P. Peixoto, "Network reconstruction via the minimum description length principle", :arxiv:`2405.01015` .. [peixoto-scalable-2024] Tiago P. Peixoto, "Scalable network reconstruction in subquadratic time", :arxiv:`2401.01404` """ pass
[docs] def get_dyn_state(self, s=None): """Return an :class:`~graph_tool.dynamics.IsingGlauberState` instance corresponding to the inferred model, optionally with initial state given by ``s``.""" return IsingGlauberState(self.u, w=self.x, h=self.theta, s=s)
[docs] class IsingGlauberBlockState(DynamicsBlockStateBase, IsingBlockStateBase): def __init__(self, s, g=None, has_zero=False, **kwargs): r"""State for network reconstruction based on the Glauber dynamics of the Ising model, using the stochastic block model as a prior. See documentation for :class:`IsingBlockStateBase` for details. Notes ----- This is a dynamical model with a transition likelihood for node :math:`i` to state :math:`s_i(t+1) \in \{-1,+1\}` given by: .. math:: P(s_i(t+1)|\boldsymbol s(t), \boldsymbol A, \boldsymbol x, \boldsymbol \theta) = \frac{\exp(s_i(t+1)\sum_jA_{ij}x_{ij}s_j(t) + \theta_is_i(t+1))} {2\cosh(\sum_jA_{ij}x_{ij}s_j(t) + \theta_i)}. """ DynamicsBlockStateBase.__init__(self, s, g=g, has_zero=has_zero, discrete=True, **dict(dict(directed=True), **kwargs)) def _make_state(self): return libinference.make_ising_glauber_state(self._state, self.ot, self.os, self.oactive, self.params)
[docs] class CIsingGlauberBlockState(DynamicsBlockStateBase, IsingBlockStateBase): def __init__(self, s, g=None, has_zero=False, **kwargs): r"""State for network reconstruction based on the Glauber dynamics of the continuous Ising model, using the stochastic block model as a prior. See documentation for :class:`IsingBlockStateBase` for details. Note that in this case the ``s`` parameter should contain property maps of type ``vector<double>``, with values in the range :math:`[-1,1]`. Notes ----- This is a dynamical model with a transition likelihood for node :math:`i` to state :math:`s_i(t+1) \in [-1,+1]` given by .. math:: P(s_i(t+1)|\boldsymbol s(t), \boldsymbol A, \boldsymbol x, \boldsymbol \theta) = \frac{\exp(s_i(t+1)\sum_jA_{ij}w_{ij}s_j(t) + h_is_i(t+1))} {Z(\sum_jA_{ij}w_{ij}s_j(t) + h_i)}, with :math:`Z(x) = 2\sinh(x)/x`. """ DynamicsBlockStateBase.__init__(self, s, g=g, has_zero=has_zero, **dict(dict(directed=True), **kwargs))
[docs] def get_dyn_state(self, s=None): """Return an :class:`~graph_tool.dynamics.CIsingGlauberState` instance corresponding to the inferred model, optionally with initial state given by ``s``.""" return CIsingGlauberState(self.u, w=self.x, h=self.theta, s=s)
def _make_state(self): return libinference.make_cising_glauber_state(self._state, self.ot, self.os, self.oactive, self.params)
[docs] class PseudoIsingBlockState(IsingBlockStateBase, BPBlockStateBase): def __init__(self, s, g=None, has_zero=False, **kwargs): r"""State for network reconstruction based on the equilibrium configurations of the Ising model, using the pseudolikelihood approximation and the stochastic block model as a prior. See documentation for :class:`IsingBlockStateBase` for details. Notes ----- This is a equilibrium model with where the states :math:`\boldsymbol s` are sampled with probability .. math:: P(\boldsymbol s | \boldsymbol A, \boldsymbol x, \boldsymbol \theta) = \frac{\exp(\sum_jA_{ij}x_{ij}s_is_j + \sum_i\theta_is_i)} {Z(\boldsymbol A, \boldsymbol x, \boldsymbol \theta)}, where :math:`Z(\boldsymbol A, \boldsymbol x, \boldsymbol \theta)` is an intractable normalization constant. Instead of computing this likelihood exactly, this model makes use of the pseudo-likelihood approximation [pseudo]_: .. math:: P(\boldsymbol s | \boldsymbol A, \boldsymbol x, \boldsymbol \theta) = \prod_{i<j}\frac{\exp(s_i\sum_{j\ne i}A_{ij}x_{ij}s_j + \theta_is_i)} {2\cosh(\sum_{j\ne i}A_{ij}x_{ij}s_j + \theta_i)}. References ---------- .. [pseudo] https://en.wikipedia.org/wiki/Pseudolikelihood """ BPBlockStateBase.__init__(self, s, g=g, has_zero=has_zero, directed=False, discrete=True, **dmask(kwargs, ["directed", "discrete"]))
[docs] def get_bp_state(self, **kwargs): """Return an :class:`~graph_tool.dynamics.IsingBPState` instance corresponding to the inferred model.""" return IsingBPState(self.u, x=self.x, theta=self.theta, has_zero=self.params.get("has_zero"), **kwargs)
def _make_state(self): return libinference.make_pseudo_ising_state(self._state, self.ot, self.os, self.oactive, self.params)
[docs] class PseudoCIsingBlockState(DynamicsBlockStateBase, IsingBlockStateBase): def __init__(self, s, g=None, has_zero=False, **kwargs): r"""State for network reconstruction based on the equilibrium configurations of the continuous Ising model, using the Pseudolikelihood approximation and the stochastic block model as a prior. See documentation for :class:`IsingBlockStateBase` for details. Note that in this case the ``s`` parameter should contain property maps of type ``vector<double>``, with values in the range :math:`[-1,1]`. Notes ----- This is a equilibrium model with where the states :math:`\boldsymbol s` are sampled with probability .. math:: P(\boldsymbol s | \boldsymbol A, \boldsymbol x, \boldsymbol \theta) = \frac{\exp(\sum_jA_{ij}x_{ij}s_is_j + \sum_i\theta_is_i)} {Z(\boldsymbol A, \boldsymbol x, \boldsymbol \theta)}, where :math:`Z(\boldsymbol A, \boldsymbol x, \boldsymbol \theta)` is an intractable normalization constant. Instead of computing this likelihood exactly, this model makes use of the pseudo-likelihood approximation [pseudo]_: .. math:: P(\boldsymbol s | \boldsymbol A, \boldsymbol x, \boldsymbol \theta) = \prod_{i<j}\frac{\exp(s_i\sum_{j\ne i}A_{ij}x_{ij}s_j + \theta_is_i)} {Z(\sum_{j\ne i}A_{ij}x_{ij}s_j + \theta_i)}, with :math:`Z(x) = 2\sinh(x)/x`. References ---------- .. [pseudo] https://en.wikipedia.org/wiki/Pseudolikelihood """ DynamicsBlockStateBase.__init__(self, s, g=g, has_zero=has_zero, directed=False, **kwargs) def _make_state(self): return libinference.make_pseudo_cising_state(self._state, self.ot, self.os, self.oactive, self.params)
[docs] def get_dyn_state(self, s=None): """Return an :class:`~graph_tool.dynamics.CIsingGlauberState` instance corresponding to the inferred model, optionally with initial state given by ``s``.""" return CIsingGlauberState(self.u, w=self.x, h=self.theta, s=s)
def zero_mean(s, g): if isinstance(s, np.ndarray): s = s.copy() for i in range(s.shape[0]): s[i,:] -= s[i,:].mean() else: if isinstance(s, list): s = [sx.copy("vector<double>") for sx in s] else: s = [s.copy("vector<double>")] for sx in s: for v in g.vertices(): sx[v].a -= sx[v].a.mean() return s
[docs] class PseudoNormalBlockState(BPBlockStateBase): def __init__(self, s, g=None, fix_mean=True, positive=True, pslack=1e-6, **kwargs): r"""State for network reconstruction based on the multivariate normal distribution, using the Pseudolikelihood approximation and the stochastic block model as a prior. ``fix_mean == True`` means that ``s`` will be changed to become zero-mean. ``positive == True`` ensures that the result is positive-semidefinite, according to slack given by ``pslack``. See documentation for :class:`DynamicsBlockStateBase` for more details. """ if fix_mean: s = zero_mean(s, g) BPBlockStateBase.__init__(self, s, g=g, positive=positive, pslack=pslack, directed=False, **kwargs) self.theta_first = False def _make_state(self): return libinference.make_pseudo_normal_state(self._state, self.ot, self.os, self.oactive, self.params)
[docs] def get_dcov(self): """Return data covariance matrix.""" N = self.g.num_vertices() S = np.zeros((N, N)) M = 0 for m in range(len(self.s)): M += len(self.s[m][0]) for i in range(N): for j in range(N): S[i,j] += np.dot(self.s[0][i].a, self.s[0][j].a) S /= M return S
[docs] def get_precision(self): """Return precision matrix.""" theta = self.theta.copy() w = self.x.copy() w.fa = abs(w.fa) pslack = self.params["pslack"] for v in self.u.vertices(): k = v.out_degree(w) theta[v] = min(theta[v], -log(k)/2-pslack) if k > 0 else theta[v] W = adjacency(self.u, self.x).tolil() for v in self.u.vertices(): W[int(v), int(v)] = 1/exp(theta[v] * 2) return W
[docs] def get_theta_shifted(self): """Return shifted node values to ensure positive semi-definiteness.""" h = self.theta.copy() w = self.x.copy() w.fa = np.abs(w.fa) k = self.u.degree_property_map("out", w) idx = k.a > 0 pslack = self.params["pslack"] h.a[idx] = np.minimum(-np.log(k.a[idx])/2 - pslack, h.a[idx]) return h
[docs] def log_P_exact(self): """Return exact log-likelihood.""" S = self.get_dcov() W = self.get_precision().todense() N = W.shape[0] D = np.linalg.slogdet(W)[1] M = 0 for m in range(len(self.s)): M += len(self.s[m][0]) return float(-M*(S@W).trace()/2 + M/2*D - M*N*np.log(2*np.pi)/2)
[docs] def log_Z_exact(self): """Return exact log-likelihood normalization constant.""" W = self.get_precision().todense() N = W.shape[0] D = np.linalg.slogdet(W)[1] return -float(D/2 - N * np.log(2 * np.pi)/2)
[docs] def get_dyn_state(self, s=None): """Return an :class:`~graph_tool.dynamics.NormalState` instance corresponding to the inferred model, optionally with initial state given by ``s``.""" h = self.get_theta_shifted() h.a = np.exp(h.a) return NormalState(self.u, w=self.x, h=h, s=s)
[docs] def get_bp_state(self, **kwargs): """Return an :class:`~graph_tool.dynamics.NormalBPState` instance corresponding to the inferred model.""" theta = self.get_theta_shifted() theta.fa = np.exp(-2 * theta.fa) return NormalBPState(self.u, x=self.x, theta=theta, **kwargs)
[docs] class NormalGlauberBlockState(DynamicsBlockStateBase): def __init__(self, s, g=None, self_loops=False, fix_mean=True, **kwargs): r"""State for network reconstruction based on the dynamical multivariate normal distribution, using the Pseudolikelihood approximation and the stochastic block model as a prior. ``fix_mean == True`` means that ``s`` will be changed to become zero-mean. ``positive == True`` ensures that the result is positive-semidefinite, according to slack given by ``pslack``. See documentation for :class:`DynamicsBlockStateBase` for more details. """ if fix_mean: s = zero_mean(s, g) DynamicsBlockStateBase.__init__(self, s, g=g, self_loops=self_loops, **kwargs) self.theta_first = False def _make_state(self): return libinference.make_normal_glauber_state(self._state, self.ot, self.os, self.oactive, self.params)
[docs] def get_dyn_state(self, s=None): """Return an :class:`~graph_tool.dynamics.NormalState` instance corresponding to the inferred model, optionally with initial state given by ``s``.""" h = self.get_theta_shifted() h.a = np.exp(h.a) return NormalState(self.u, w=self.x, h=h, s=s)
[docs] class LinearNormalBlockState(DynamicsBlockStateBase): def __init__(self, s, g=None, self_loops=True, **kwargs): r"""State for network reconstruction based on a linear dynamical model. ``self_loops == True`` means self-loops will be allowed in the reconstruction. See documentation for :class:`DynamicsBlockStateBase` for more details. """ DynamicsBlockStateBase.__init__(self, s, g=g, self_loops=self_loops, **dict(dict(directed=True), **kwargs)) self.theta_first = False def _make_state(self): return libinference.make_linear_normal_state(self._state, self.ot, self.os, self.oactive, self.params)
[docs] def get_dyn_state(self, s=None): """Return an :class:`~graph_tool.dynamics.LinearNormalState` instance corresponding to the inferred model, optionally with initial state given by ``s``.""" h = self.theta.copy() h.a = np.exp(h.a) return LinearNormalState(self.u, w=self.x, sigma=h, s=s)
[docs] class LVBlockState(DynamicsBlockStateBase): def __init__(self, s, g=None, self_loops=True, sigma=1, **kwargs): DynamicsBlockStateBase.__init__(self, s, g=g, self_loops=self_loops, sigma=sigma, **dict(dict(directed=True), **kwargs)) self.theta_first = False def _make_state(self): return libinference.make_lotka_volterra_state(self._state, self.ot, self.os, self.oactive, self.params)
[docs] def get_dyn_state(self, s=None): """Return an :class:`~graph_tool.dynamics.LVState` instance corresponding to the inferred model, optionally with initial state given by ``s``.""" r = self.theta.copy() return LVState(self.u, w=self.x, r=r, sigma=self.params["sigma"], s=s)
def pinit(val, g, ptype="v", vtype="double"): if isinstance(val, PropertyMap): if val.value_type() == vtype: return g.own_property(val) else: return g.own_property(val.copy(value_type=vtype)) if ptype == "v": return g.new_vp(vtype, val=val) if ptype == "e": return g.new_ep(vtype, val=val) class TestBlockState(DynamicsBlockStateBase): def __init__(self, g, p=.9, mu=1, sigma=.1, p_default=1e-6, mu_default=.1, sigma_default=.01, mu_v=0, sigma_v=1, g_init=None, **kwargs): self.g_ref = g if g_init is None: g_init = Graph(g.num_vertices(), directed=g.is_directed()) self.p = pinit(p, g, "e") self.mu = pinit(mu, g, "e") self.sigma = pinit(sigma, g, "e") self.p_default = p_default self.mu_default = mu_default self.sigma_default = sigma_default self.mu_v = pinit(mu_v, g, "v") self.sigma_v = pinit(sigma_v, g, "v") s = g.new_vp("int") kwargs.pop("s", None) DynamicsBlockStateBase.__init__(self, s, g=g_init, directed=g.is_directed(), **kwargs) def __getstate__(self): return dict(DynamicsBlockStateBase.__getstate__(self), g=self.g_ref, p=self.p, mu=self.mu, sigma=self.sigma, p_default=self.p_default, mu_default=self.mu_default, sigma_default=self.sigma_default, mu_v=self.mu_v, sigma_v=self.sigma_v) def _make_state(self): return libinference.make_test_state(self.g_ref._Graph__graph, self._state, vars(self))