#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# graph_tool -- a general graph manipulation python module
#
# Copyright (C) 2006-2025 Tiago de Paula Peixoto <tiago@skewed.de>
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from .. import _prop, _get_rng, Vector_int32_t, group_vector_property
from .. spectral import adjacency
from .. generation import complete_graph, generate_triadic_closure, \
remove_parallel_edges, graph_union, graph_merge
from .. dynamics import IsingGlauberState, NormalState, IsingBPState, \
NormalBPState, LinearNormalState, LVState, PottsGlauberState
from . uncertain_blockmodel import *
from . util import pmap
from abc import ABC
from collections.abc import Iterable
import inspect
import numpy as np
import numbers
import scipy.stats
import warnings
def get_bisect_args(**kwargs):
args = libinference.bisect_args()
for k, v in kwargs.items():
try:
if not hasattr(args, k):
raise AttributeError(k)
setattr(args, k, v)
except AttributeError as e:
raise ValueError(f"invalid bisect_args: {str(k)}, {str(v)}") from e
return args
[docs]
@entropy_state_signature
class DynamicsBlockStateBase(UncertainBaseState):
def __init__(self, s, g=None, directed=False, t=[], active=None, x=1,
x_range=(-np.inf, np.inf), theta=0,
theta_range=(-np.inf, np.inf), xdelta=1e-8, xdelta_min=1e-8,
tdelta=1e-8, tdelta_min=1e-8, disable_xdist=False,
disable_tdist=False, nested=True, state_args={}, bstate=None,
self_loops=False, max_m=1 << 16, entropy_args={}, **kwargs):
r"""Base state for network reconstruction based on dynamical or
behavioral data, using the stochastic block model as a prior. This class
is not meant to be instantiated directly, only indirectly via one of its
subclasses. Nevertheless, its paramteres are inherited, and are
documented as follows.
Parameters
----------
s : :class:`~numpy.ndarray` of shape ``(N,M)`` or ``list`` of :class:`~graph_tool.VertexPropertyMap` or :class:`~graph_tool.VertexPropertyMap`
Time series or independent samples used for reconstruction.
If the type is :class:`~numpy.ndarray`, it should correspond to a
``(N,M)`` data matrix with ``M`` samples for all ``N`` nodes.
If the parameter ``g`` is provided, this can be optionally a list of
of :class:`~graph_tool.VertexPropertyMap` objects, where each entry
in this list must be a :class:`~graph_tool.VertexPropertyMap` with
type ``vector<int>`` or ``vector<double>``, depending on wether the
model has discrete or continuous state values. If a single property
map is given, then a single time series is assumed.
If the parameter ``t`` below is given, each property map value for a
given node should contain only the states for the same points in
time given by that parameter.
g : :class:`~graph_tool.Graph` (optional, default: ``None``)
Initial graph state. If not provided, an empty graph will be assumed.
directed : ``boolean`` or ``None`` (optional, default: ``None``)
If ``True``, the inferred graph will be directed, or if ``False`` it
will be undirected. If this option is ``None``, the directionality
will be the same as the parameter ``g``, if it is provided,
otherwise ``directed = False`` will be assumed.
t : ``list`` of :class:`~graph_tool.VertexPropertyMap` or :class:`~graph_tool.VertexPropertyMap` (optional, default: ``[]``)
If nonempty, this allows for a compressed representation of the
time-series parameter ``s``, corresponding only to points in time
where the state of each node changes. Each entry in this list
must be a :class:`~graph_tool.VertexPropertyMap` with type
``vector<int>`` containing the points in time where the state of
each node change. The corresponding state of the nodes at these
times are given by parameter ``s``. If a single property map is
given, then a single time series is assumed.
active : :class:`~numpy.ndarray` of shape ``(N,M)`` or ``list`` of :class:`~graph_tool.VertexPropertyMap` or :class:`~graph_tool.VertexPropertyMap` (optional, default: ``None``)
If provided, this should contain binary valies specifing wether
nodes are "active" or not at specific times/samples. An "active"
node contributes normally to the likelihood, while an inactive one
is removed from the likelihood computation, with its value being
replaced by a standard "inactive" state (the value of ``s`` for that
node/sample is ignored).
If the parameter ``g`` is provided, this can be optionally a list of
of :class:`~graph_tool.VertexPropertyMap` objects, where each entry
in this list must be a :class:`~graph_tool.VertexPropertyMap` with
type ``vector<int>``, with values interpreted in the same way as
above. If a single property map is given, then a single time series
is assumed.
x : :class:`~graph_tool.EdgePropertyMap` or ``float`` (optional, default: ``1.``)
Initial value of the edge weights. If a ``float`` is given, the edge
weights will be assumed to be the same for all edges.
x_range : pair of ``float``s (optional, default: ``(-np.inf, np.inf)``)
Determines the allowed range of edge weights.
theta : :class:`~graph_tool.EdgePropertyMap` or ``float`` (optional, default: ``0.``)
Initial value of the node parameters (a.k.a. node bias). If a
``float`` is given, the node biases will be assumed to be the same
for all edges.
theta_range : pair of ``float``s (optional, default: ``(-np.inf, np.inf)``)
Determines the allowed range of the node parameters.
xdelta : ``float`` (optional, default: ``1e-8``)
Initial value for the edge weight precision parameter.
xdelta_min : ``float`` (optional, default: ``1e-8``)
Minimal value for the edge weight precision parameter.
tdelta : ``float`` (optional, default: ``1e-8``)
Initial value for the node bias precision parameter.
tdelta_min : ``float`` (optional, default: ``1e-8``)
Minimal value for the node bias precision parameter.
disable_xdist : ``boolean`` (optional, default: ``False``)
If ``True`` the, quantization of the edge weights will be turned off,
and :math:`L_1` regularization will be used instead.
disable_tdist : ``boolean`` (optional, default: ``False``)
If ``True`` the, quantization of the node parameters will be turned off,
and :math:`L_1` regularization will be used instead.
nested : ``boolean`` (optional, default: ``True``)
If ``True``, a :class:`~graph_tool.inference.nested_blockmodel.NestedBlockState`
will be used, otherwise
:class:`~graph_tool.inference.blockmodel.BlockState`.
state_args : ``dict`` (optional, default: ``{}``)
Arguments to be passed to
:class:`~graph_tool.inference.nested_blockmodel.NestedBlockState` or
:class:`~graph_tool.inference.blockmodel.BlockState`.
bstate : :class:`~graph_tool.inference.nested_blockmodel.NestedBlockState` or :class:`~graph_tool.inference.blockmodel.BlockState` (optional, default: ``None``)
If passed, this will be used to initialize the block state
directly.
self_loops : bool (optional, default: ``False``)
If ``True``, it is assumed that the inferred graph can contain
self-loops.
max_m : ``int`` (optional, default: ``1 << 16``)
Maximum latent edge multiplicity allowed.
entropy_args : ``dict`` (optional, default: ``{}``)
Override default arguments for
:meth:`~DynamicsBlockStateBase.entropy()` method and releated
operations.
"""
if g is None:
if directed is None:
directed = False
g = Graph(s.shape[0], directed=directed)
else:
directed = g.is_directed()
UncertainBaseState.__init__(self, g, nested=nested,
state_args=state_args, bstate=bstate,
self_loops=self_loops,
entropy_args=entropy_args)
if isinstance(s, np.ndarray):
if kwargs.pop("discrete", False):
s = g.new_vp("vector<int>", vals=s)
else:
s = g.new_vp("vector<double>", vals=s)
if active is None:
active = []
if isinstance(active, np.ndarray):
active = g.new_vp("vector<int>", vals=active)
if isinstance(s, PropertyMap):
s = [s]
if isinstance(t, PropertyMap):
t = [t]
if isinstance(active, PropertyMap):
active = [active]
self.s = [self.u.own_property(x) for x in s]
self.t = [self.u.own_property(x) for x in t]
self.active = [self.u.own_property(x) for x in active]
if x is None:
x = self.u.new_ep("double")
elif isinstance(x, EdgePropertyMap):
x = self.u.copy_property(x, g=x.get_graph(), value_type="double")
else:
x = self.u.new_ep("double", val=x)
self.x = x
self.xmin_bound = x_range[0]
self.xmax_bound = x_range[1]
if theta is None:
theta = self.u.new_vp("double")
elif isinstance(theta, VertexPropertyMap):
theta = self.u.copy_property(theta, g=theta.get_graph(),
value_type="double")
else:
theta = self.u.new_vp("double", val=theta)
self.theta = theta
self.tmin_bound = theta_range[0]
self.tmax_bound = theta_range[1]
self.xdelta = xdelta
self.tdelta = tdelta
self.xdelta_min = xdelta_min
self.tdelta_min = tdelta_min
self.disable_xdist = disable_xdist
self.disable_tdist = disable_tdist
if disable_xdist:
self.update_entropy_args(xdist=False,
sbm=entropy_args.get("sbm", False))
if disable_tdist:
self.update_entropy_args(tdist=False)
kwargs = kwargs.copy()
for k in kwargs.keys():
v = kwargs[k]
if isinstance(v, PropertyMap):
kwargs[k] = g.own_property(v)
elif (isinstance(v, Iterable) and len(v) > 0 and
isinstance(v[0], PropertyMap)):
kwargs[k] = [g.own_property(x) for x in v]
self.rg = kwargs.pop("rg", None)
self.elist = kwargs.pop("elist", None)
self.ecandidates = kwargs.pop("ecandidates", np.zeros((0, 2), dtype="int64"))
self.params = kwargs
self.os = [ns._get_any() for ns in self.s]
self.ot = [nt._get_any() for nt in self.t]
self.oactive = [nt._get_any() for nt in self.active]
self.max_m = max_m
self._state = libinference.make_dynamics_state(self.bstate._state, self)
self._dstate = self._make_state()
self._state.set_dstate(self._dstate)
self.theta_first = True
[docs]
def set_params(self, params):
r"""Sets the model parameters via the dictionary ``params``."""
self.params = dict(self.params, **params)
self._state.set_params(self.params)
[docs]
def get_params(self, params):
r"""Gets the model parameters via the dictionary ``params``."""
return dict(self.params)
def __getstate__(self):
state = UncertainBaseState.__getstate__(self)
return dict(state, s=self.s, t=self.t,
active=self.active, x=self.x,
x_range=(self.xmin_bound, self.xmax_bound),
theta=self.theta,
theta_range=(self.tmin_bound, self.tmax_bound),
xdelta=self.xdelta,
tdelta=self.tdelta,
disable_xdist=self.disable_xdist,
disable_tdist=self.disable_tdist,
rg=self.rg, elist=self.elist,
ecandidates=self.ecandidates,
max_m=self.max_m,
**self.params)
def __repr__(self):
return "<%s object with %s, %d edge categories and %d node categories, at 0x%x>" % \
(self.__class__.__name__,
self.nbstate if self.nbstate is not None else self.bstate,
len(self.get_xvals()), len(self.get_tvals()),
id(self))
def _gen_eargs(self, args):
uea = UncertainBaseState._gen_eargs(self, args)
return libinference.dentropy_args(uea)
@copy_state_wrap
def _entropy(self, latent_edges=True, density=False, aE=1, sbm=True,
xdist=True, tdist=True, xdist_uniform=False,
tdist_uniform=False, xl1=1, tl1=1, alpha=1, normal=False, mu=0,
sigma=1, active=True, **kwargs):
r"""Return the description length, i.e. the negative joint
log-likelihood.
Parameters
----------
latent_edges : ``boolean`` (optional, default: ``True``)
If ``True``, the adjacency term of the description length will be
included.
density : ``boolean`` (optional, default: ``False``)
If ``True``, a geometric prior for the total number of edges will be
included.
aE : ``double`` (optional, default: ``1``)
If ``density=True``, this will correspond to the expected number of
edges according to the geometric prior.
sbm : ``boolean`` (optional, default: ``True``)
If ``True``, SBM description length will be included.
xdist : ``boolean`` (optional, default: ``True``)
If ``True``, the quantized edge weight distribution description
length will be included.
xdist_uniform : ``boolean`` (optional, default: ``False``)
If ``True``, a uniform prior for the edge weight distribution will
be used.
tdist : ``boolean`` (optional, default: ``True``)
If ``True``, the quantized node parameter distribution description
length will be included.
tdist_uniform : ``boolean`` (optional, default: ``False``)
If ``True``, a uniform prior for the node parameter distribution will
be used.
xl1 : ``float`` (optional, default: ``1``)
Specifies the :math:`\lambda` parameter for :math:`L_1`
regularization for the edge weights if ``xdist == False``, or the
Laplace hyperprior for the discrete categories if ``xdist == True``.
tl1 : ``float`` (optional, default: ``1``)
Specifies the :math:`\lambda` parameter for :math:`L_1`
regularization for the node paraemters if ``tdist == False``, or the
Laplace hyperprior for the discrete categories if ``tdist == True``.
normal : ``boolean`` (optional, default: ``False``)
If ``True``, a normal distribution will be used for the weight
priors.
mu : ``double`` (optional, default: ``0``)
If ``normal == True``, this will be the mean of the normal distribution.
sigma : ``double`` (optional, default: ``1``)
If ``normal == True``, this will be the standard deviation of the
normal distribution.
active : ``boolean`` (optional, default: ``True``)
If ``True``, the contribution of the active/inactive node states
will be added to the description length.
Notes
-----
The "entropy" of the state is the negative log-likelihood of the
generative model for the data :math:`\boldsymbol S`, that includes the
inferred weighted adjacency matrix :math:`\boldsymbol{X}`, the node
parameters :math:`\boldsymbol{\theta}`, and the SBM node partition
:math:`\boldsymbol{b},` given by
.. math::
\begin{aligned}
\Sigma(\boldsymbol{S},\boldsymbol{X},\boldsymbol{\theta}|\lambda_x,\lambda_{\theta},\Delta)
= &- \ln P(\boldsymbol{S}|\boldsymbol{X},\boldsymbol{\theta})\\
&- \ln P(\boldsymbol{X}|\boldsymbol{A},\lambda_x, \Delta)\\
&- \ln P(\boldsymbol{A},\boldsymbol{b})\\
&- \ln P(\boldsymbol{\theta}, \lambda_{\theta}, \Delta).
\end{aligned}
The term :math:`P(\boldsymbol{S}|\boldsymbol{X},\boldsymbol{\theta})` is
given by the particular generative model being used and
:math:`P(\boldsymbol{A},\boldsymbol{b})` by the SBM. The weight
ditribution is given by the quantized model
.. math::
P(\boldsymbol X|\boldsymbol A,\lambda_x,\Delta) =
\frac{\prod_{k}m_{k}!\times \mathrm{e}^{-\lambda_x \sum_k |z_k|}(\mathrm{e}^{\lambda\Delta} - 1)^{K}}
{E!{E-1 \choose K-1}2^{K}\max(E,1)}
where :math:`\boldsymbol z` are the :math:`K` discrete weight
categories, and analogously
.. math::
P(\boldsymbol\theta|\lambda_{\theta},\Delta)
=\frac{\prod_{k}n_{k}!\times
\mathrm{e}^{-\lambda \sum_k |u_k|}
\sinh(\lambda_{\theta}\Delta)^{K_{\theta}-\mathbb{1}_{0\in\boldsymbol u}}
(1-\mathrm{e}^{-\lambda_{\theta}\Delta})^{\mathbb{1}_{0\in\boldsymbol u}}}
{N!{N-1 \choose K_{\theta}-1}N},
is the node parameter quantized distribution. For more details see
[peixoto-network-2024]_.
References
----------
.. [peixoto-network-2024] Tiago P. Peixoto, "Network reconstruction via
the minimum description length principle", :arxiv:`2405.01015`
.. [peixoto-scalable-2024] Tiago P. Peixoto, "Scalable network
reconstruction in subquadratic time", :arxiv:`2401.01404`
"""
eargs = self._get_entropy_args(locals())
S = self._state.entropy(eargs)
if sbm:
if self.nbstate is None:
S += self.bstate.entropy(**kwargs)
else:
S += self.nbstate.entropy(**kwargs)
return S
def _get_elist(self, k, elist_args={}, beta=np.inf):
if self.elist is not None and self.elist[0] == k:
return
if np.isinf(beta) or self.elist is None:
elist_args = elist_args.copy()
elist, w = self.get_candidate_edges(k, **elist_args)
self.elist = (k, elist, w)
[docs]
def collect_candidates(self, u=None):
r"""Store current edges into the list of candidates for MCMC. If
:class:`~graph_tool.Graph` ``u`` is provided, its edges will be added
instead."""
if u is None:
self.collect_candidates(GraphView(self.u, directed=False))
if self.elist is not None:
self.collect_candidates(self.elist[1])
else:
if not isinstance(u, Graph):
u = Graph(u, directed=False)
u.add_vertex(self.u.num_vertices() - u.num_vertices())
cg = Graph(self.ecandidates, directed=False)
cg.add_vertex(self.u.num_vertices() - cg.num_vertices())
graph_union(cg, u, in_place=True)
cg.set_fast_edge_removal()
remove_parallel_edges(cg)
self.ecandidates = cg.get_edges()
[docs]
def clear_candidates(self):
r"""Clear candidate edges for MCMC."""
self.ecandidates = np.zeros((0, 2), dtype="int64")
[docs]
def mcmc_sweep(self, beta=1, niter=1, k=1, keep_elist=False, edge=1,
edge_swap=1, edge_multiflip=None, theta=1,
theta_multiflip=1, sbm=1, xvals=1, tvals=1,
verbose=False, elist_args={}, edge_mcmc_args={},
edge_swap_mcmc_args={}, edge_multiflip_mcmc_args={},
xvals_mcmc_args={}, theta_mcmc_args={},
theta_multiflip_mcmc_args={}, tvals_mcmc_args={},
sbm_mcmc_args={}, **kwargs):
r"""Perform sweeps of a Metropolis-Hastings acceptance-rejection
sampling MCMC to sample latent edges and network partitions.
If ``beta`` is ``np.inf``, the algorithm switches to a greedy heuristic
based on iterated nearest-neighbor searches.
Parameters
----------
beta : ``float`` (optional, default: ``1``)
Inverse temperature parameter.
niter : ``int`` (optional, default: ``1``)
Number of sweeps.
k : ``int`` (optional, default: ``1``)
:math:`\kappa` parameter to be passed to :meth:`~.DynamicsBlockStateBase.get_candidate_edges`.
This parameter is ignored if ``beta`` is not ``np.inf``.
elist_args : ``dict`` (optional, default: ``{}``)
Paramters to pass to call :meth:`~.DynamicsBlockStateBase.get_candidate_edges`.
This parameter is ignored if ``beta`` is not ``np.inf``.
keep_elist : ``boolean`` (optional, default: ``False``)
If ``True``, the candidate edge list from last call will be re-used
(if it exists).
edge : ``float`` (optional, default: ``1``)
Probability with which :meth:`~.DynamicsBlockStateBase.edge_mcmc_sweep` will be called.
edge_mcmc_args : ``dict`` (optional, default: ``{}``)
Paramters to pass to call :meth:`~.DynamicsBlockStateBase.edge_mcmc_sweep`.
edge_swap : ``float`` (optional, default: ``1.``)
Probability with which :meth:`~.DynamicsBlockStateBase.swap_mcmc_sweep` will be called.
edge_mcmc_args : ``dict`` (optional, default: ``{}``)
Paramters to pass to call :meth:`~.DynamicsBlockStateBase.swap_mcmc_sweep`.
edge_multiflip : ``float`` (optional, default: ``1 if np.isinf(beta) else .1``)
Probability with which :meth:`~.DynamicsBlockStateBase.edge_multiflip_mcmc_sweep` will be called.
edge_multiflip_mcmc_args : ``dict`` (optional, default: ``{}``)
Paramters to pass to call :meth:`~.DynamicsBlockStateBase.edge_multiflip_mcmc_sweep`.
theta : ``float`` (optional, default: ``1.``)
Probability with which :meth:`~.DynamicsBlockStateBase.theta_mcmc_sweep` will be called.
theta_mcmc_args : ``dict`` (optional, default: ``{}``)
Paramters to pass to call :meth:`~.DynamicsBlockStateBase.theta_mcmc_sweep`.
theta_multiflip : ``float`` (optional, default: ``1.``)
Probability with which :meth:`~.DynamicsBlockStateBase.theta_multiflip_mcmc_sweep` will be called.
theta_multiflip_mcmc_args : ``dict`` (optional, default: ``{}``)
Paramters to pass to call :meth:`~.DynamicsBlockStateBase.theta_multiflip_mcmc_sweep`.
sbm : ``float`` (optional, default: ``1.``)
Probability with which :meth:`~.DynamicsBlockStateBase.sbm_mcmc_sweep` will be called.
sbm_mcmc_args : ``dict`` (optional, default: ``{}``)
Paramters to pass to call :meth:`~.DynamicsBlockStateBase.sbm_mcmc_sweep`.
xvals : ``float`` (optional, default: ``1.``)
Probability with which :meth:`~.DynamicsBlockStateBase.xvals_sweep` will be called.
xvals_mcmc_args : ``dict`` (optional, default: ``{}``)
Paramters to pass to call :meth:`~.DynamicsBlockStateBase.xvals_sweep`.
tvals : ``float`` (optional, default: ``1.``)
Probability with which :meth:`~.DynamicsBlockStateBase.tvals_sweep` will be called.
tvals_mcmc_args : ``dict`` (optional, default: ``{}``)
Paramters to pass to call :meth:`~.DynamicsBlockStateBase.tvals_sweep`.
verbose : ``boolean`` (optional, default: ``False``)
If ``verbose == True``, detailed information will be displayed.
**kwargs : ``dict`` (optional, default: ``{}``)
Remaining keyword parameters will be passed to all individual MCMC
functions.
Returns
-------
dS : ``float``
Entropy difference after the sweeps.
nmoves : ``int``
Number of variables moved.
"""
ret = (0, 0, 0)
if edge_multiflip is None:
edge_multiflip = 1 if np.isinf(beta) else .1
def theta_mcmc():
nonlocal ret
if self.tmax_bound != self.tmin_bound:
if np.random.random() < theta:
if verbose:
print("theta_mcmc_sweep:")
tret = self.theta_mcmc_sweep(**dict(dict(beta=beta, niter=niter,
**kwargs), **theta_mcmc_args))
ret = (sum(x) for x in zip(ret, tret))
if not self.disable_tdist:
if np.random.random() < theta_multiflip:
if verbose:
print("theta_multiflip_mcmc_sweep:")
tret = self.theta_multiflip_mcmc_sweep(**dict(dict(beta=beta,
niter=10 * niter,
**kwargs),
**theta_multiflip_mcmc_args))
ret = (sum(x) for x in zip(ret, tret))
if np.random.random() < tvals:
if verbose:
print("tvals_sweep:")
eret = self.tvals_sweep(**dict(dict(beta=beta,
niter=10 * niter,
**kwargs),
**tvals_mcmc_args))
ret = (sum(x) for x in zip(ret, eret))
if self.theta_first:
theta_mcmc()
if np.random.random() < edge:
if verbose:
print("edge_mcmc_sweep:")
edge_mcmc_args = dict(edge_mcmc_args, k=k, keep_elist=keep_elist,
elist_args=elist_args)
eret = self.edge_mcmc_sweep(**dict(dict(beta=beta, niter=niter,
**kwargs), **edge_mcmc_args))
ret = (sum(x) for x in zip(ret, eret))
keep_elist = True
if np.random.random() < edge_swap:
if verbose:
print("swap_mcmc_sweep:")
eret = self.swap_mcmc_sweep(**dict(dict(beta=beta, niter=niter,
**kwargs), **edge_swap_mcmc_args))
ret = (sum(x) for x in zip(ret, eret))
keep_elist = True
if self.xmax_bound != self.xmin_bound and not self.disable_xdist:
if np.random.random() < edge_multiflip:
if verbose:
print("edge_multiflip_sweep:")
eret = self.edge_multiflip_mcmc_sweep(**dict(dict(beta=beta,
niter=10 * niter,
**kwargs),
**edge_multiflip_mcmc_args))
ret = (sum(x) for x in zip(ret, eret))
if np.random.random() < xvals:
if verbose:
print("xvals_sweep:")
eret = self.xvals_sweep(**dict(dict(beta=beta, niter=10 * niter,
**kwargs),
**xvals_mcmc_args))
ret = (sum(x) for x in zip(ret, eret))
if not self.theta_first:
theta_mcmc()
if np.random.random() < sbm:
if verbose:
print("sbm_mcmc_sweep:")
kwargs = kwargs.copy()
eargs = kwargs.get("entropy_args", {}).copy()
for k in list(eargs.keys()):
if k not in self.bstate._entropy_args:
eargs.pop(k, None)
kwargs["entropy_args"] = eargs
sret = self.sbm_mcmc_sweep(**dict(dict(beta=beta, niter=10 * niter,
**kwargs), **sbm_mcmc_args))
ret = (sum(x) for x in zip(ret, sret))
if np.isinf(beta):
self.collect_candidates()
return tuple(ret)
[docs]
@mcmc_sweep_wrap
def edge_mcmc_sweep(self, beta=1., niter=1, m=1, k=1, elist_args={},
keep_elist=False, pold=1, pnew=1, pxu=.1, pm=1,
premove=1, pself=0.1, puniform=1, pedge=1, pnearby=1,
d=2, pcandidates=1, bisect_args={}, binary=False,
deterministic=False, sequential=True, parallel=True,
pseudo=False, verbose=False, entropy_args={}, **kwargs):
r"""Perform sweeps of a Metropolis-Hastings acceptance-rejection
sampling MCMC to sample latent edges.
If ``beta`` is ``np.inf``, the algorithm switches to a greedy heuristic
based on iterated nearest-neighbor searches.
Parameters
----------
beta : ``float`` (optional, default: ``1.``)
Inverse temperature parameter.
niter : ``int`` (optional, default: ``1``)
Number of sweeps.
m : ``double`` (optional, default: ``1``)
Multiplicative factor to the number of nodes yielding the number of
attempted moves at every sweep. This parameter is ignored if
``beta`` is ``np.inf``.
k : ``int`` (optional, default: ``1``)
:math:`\kappa` parameter to be passed to :meth:`~.DynamicsBlockStateBase.get_candidate_edges`.
This parameter is ignored if ``beta`` is not ``np.inf``.
elist_args : ``dict`` (optional, default: ``{}``)
Paramters to pass to call :meth:`~.DynamicsBlockStateBase.get_candidate_edges`.
This parameter is ignored if ``beta`` is not ``np.inf``.
keep_elist : ``boolean`` (optional, default: ``False``)
If ``True``, the candidate edge list from last call will be re-used
(if it exists).
pold : ``float`` (optional, default: ``1``)
Relative probability of proposing a new edge weight from existing
categories.
pnew : ``float`` (optional, default: ``1``)
Relative probability of proposing a new edge weight from a new
categories.
pxu : ``float`` (optional, default: ``.1``)
Probability of choosing from an existing category uniformly at
random (instead of doing a bisection search).
pm : ``float`` (optional, default: ``1``)
Relative probability of doing edge multiplicity updates.
premove : ``float`` (optional, default: ``1``)
Relative probability of removing edges.
pself : ``float`` (optional, default: ``.1``)
Relative probability to search for self-loops as candidate node "pairs".
puniform : ``float`` (optional, default: ``1``)
Relative probability to search for candidate node pairs uniformly at
random.
pedge : ``float`` (optional, default: ``1``)
Relative probability to search for candidate node pairs among the
current edges.
pnearby : ``float`` (optional, default: ``1``)
Relative probability to search for candidate node pairs among the
neighborhood of current nodes up to distance ``d``.
d : ``int`` (optional, default: ``2``)
Maximum distance used to search for candidate node pairs.
pcandidates : ``float`` (optional, default: ``1``)
Relative probability to search for candidate node pairs among
currently stored list of candidate edges (obtained with
:meth:`~.DynamicsBlockStateBase.collect_candidates`).
bisect_args : ``dict`` (optional, default: ``{}``)
Parameter for the bisection search use to optimize/sample edge
weights. See :meth:`~.DynamicsBlockStateBase.bisect_x`) for
documentation.
binary : ``boolean`` (optional, default: ``False``)
If ``True``, the latent graph will be assumed to be a simple graph,
otherwise a multigraph.
deterministic : ``boolean`` (optional, default: ``False``)
If ``True``, the the order of edge updates will be determinisitc,
otherwise uniformly at random.
sequential : ``boolean`` (optional, default: ``True``)
If ``True``, a sweep will visit every edge candidate once, otherwise
individiual updates will be chosen at random.
parallel : ``boolean`` (optional, default: ``True``)
If ``True``, the updates are performed in parallel, using locks on
edges candidate incident on the same node.
pseudo : ``boolean`` (optional, default: ``False``)
If ``True``, detailed balance will no longer be stricly perserved,
in favor of higher parallelism.
verbose : ``boolean`` (optional, default: ``False``)
If ``verbose == True``, detailed information will be displayed.
entropy_args : ``dict`` (optional, default: ``{}``)
Entropy arguments, with the same meaning and defaults as in
:meth:`~.DynamicsBlockStateBase.entropy`.
Returns
-------
dS : ``float``
Entropy difference after the sweeps.
nmoves : ``int``
Number of variables moved.
"""
if not keep_elist and np.isinf(beta):
self.elist = None
elist_args = dict(elist_args, entropy_args=entropy_args)
entropy_args = self._get_entropy_args(entropy_args)
bisect_args = get_bisect_args(**dict(dict(min_bound=self.xmin_bound,
max_bound=self.xmax_bound,
reversible=not np.isinf(beta)),
**bisect_args))
ecandidates = self.ecandidates
self._get_elist(k, elist_args, beta)
elist = self.elist[1]
state = self._state
if np.isinf(beta):
pxu = 0
mcmc_state = DictState(dict(kwargs, **locals()))
if len(kwargs) > 0:
raise ValueError("unrecognized keyword arguments: " +
str(list(kwargs.keys())))
if parallel:
return self._state.pseudo_mcmc_sweep(mcmc_state, _get_rng())
else:
return self._state.mcmc_sweep(mcmc_state, _get_rng())
[docs]
@mcmc_sweep_wrap
def swap_mcmc_sweep(self, beta=1, niter=1, preplace=1, pswap=1, pself=0.1,
puniform=1, pedge=1, pnearby=1, d=2, pcandidates=1,
parallel=True, verbose=False, entropy_args={}, **kwargs):
r"""Perform sweeps of a Metropolis-Hastings acceptance-rejection
sampling MCMC to swap edge endpoints.
Parameters
----------
beta : ``float`` (optional, default: ``np.inf``)
Inverse temperature parameter.
niter : ``int`` (optional, default: ``1``)
Number of sweeps.
preplace : ``float`` (optional, default: ``1``)
Relative probability of swaping the weights between an edge and
another node pair incident on one of the same two endpoints.
pswap : ``float`` (optional, default: ``1``)
Relative probability of swapping the endpoints of two
selected edges or node pairs.
pself : ``float`` (optional, default: ``.1``)
Relative probability to search for self-loops as candidate node "pairs".
puniform : ``float`` (optional, default: ``1``)
Relative probability to search for candidate node pairs uniformly at
random.
pedge : ``float`` (optional, default: ``1``)
Relative probability to search for candidate node pairs among the
current edges.
pnearby : ``float`` (optional, default: ``1``)
Relative probability to search for candidate node pairs among the
neighborhood of current nodes up to distance ``d``.
d : ``int`` (optional, default: ``2``)
Maximum distance used to search for candidate node pairs.
pcandidates : ``float`` (optional, default: ``1``)
Relative probability to search for candidate node pairs among
currently stored list of candidate edges (obtained with
:meth:`~.DynamicsBlockStateBase.collect_candidates`).
parallel : ``boolean`` (optional, default: ``True``)
If ``True``, the updates are performed in parallel, using locks on
edges candidate incident on the same node.
verbose : ``boolean`` (optional, default: ``False``)
If ``verbose == True``, detailed information will be displayed.
entropy_args : ``dict`` (optional, default: ``{}``)
Entropy arguments, with the same meaning and defaults as in
:meth:`~.DynamicsBlockStateBase.entropy`.
Returns
-------
dS : ``float``
Entropy difference after the sweeps.
nmoves : ``int``
Number of variables moved.
"""
entropy_args = self._get_entropy_args(entropy_args)
ecandidates = self.ecandidates
state = self._state
if parallel:
sequential = True
mcmc_state = DictState(dict(kwargs, **locals()))
if len(kwargs) > 0:
raise ValueError("unrecognized keyword arguments: " +
str(list(kwargs.keys())))
if parallel:
return self._state.pseudo_swap_mcmc_sweep(mcmc_state, _get_rng())
else:
return self._state.swap_mcmc_sweep(mcmc_state, _get_rng())
[docs]
@mcmc_sweep_wrap
def edge_multiflip_mcmc_sweep(self, beta=1., niter=1, pmerge=1, psplit=1,
pmergesplit=1, gibbs_sweeps=5,
c=.1, bisect_args={}, accept_stats=None,
verbose=False, entropy_args={}, **kwargs):
r"""Perform sweeps of a Metropolis-Hastings acceptance-rejection
merge-split MCMC to sample discrete edge weight categories.
Parameters
----------
beta : ``float`` (optional, default: ``1.``)
Inverse temperature parameter.
niter : ``int`` (optional, default: ``1``)
Number of sweeps.
pmerge : ``float`` (optional, default: ``1``)
Relative probability of merging two discrete categories.
psplit : ``float`` (optional, default: ``1``)
Relative probability of splitting two discrete categories.
pmergesplit : ``float`` (optional, default: ``1``)
Relative probability of simultaneoulsly merging and splitting two
discrete categories.
gibbs_sweeps : ``int`` (optional, default: ``5``)
Number of Gibbs sweeps performed to achieve a split proposal.
c : ``double`` (optional, default: ``.1``)
Probability of choosing a category uniformly at random to perform a
merge, otherwise an adjacent one is chosen.
bisect_args : ``dict`` (optional, default: ``{}``)
Parameter for the bisection search. See
:meth:`~.DynamicsBlockStateBase.bisect_x`) for documentation.
accept_stats : ``dict`` (optional, default: ``None``)
If provided, the dictionary will be updated with acceptance statistics.
verbose : ``boolean`` (optional, default: ``False``)
If ``verbose == True``, detailed information will be displayed.
entropy_args : ``dict`` (optional, default: ``{}``)
Entropy arguments, with the same meaning and defaults as in
:meth:`~.DynamicsBlockStateBase.entropy`.
Returns
-------
dS : ``float``
Entropy difference after the sweeps.
nmoves : ``int``
Number of variables moved.
"""
kwargs = dict(**kwargs)
E = self.u.num_edges()
if E == 0:
return (0, 0, 0)
niter /= E
gibbs_sweeps = max((gibbs_sweeps, 1))
nproposal = Vector_size_t(4)
nacceptance = Vector_size_t(4)
force_move = kwargs.pop("force_move", False)
bisect_args = get_bisect_args(**dict(dict(min_bound=self.xmin_bound,
max_bound=self.xmax_bound,
reversible=not np.isinf(beta)),
**bisect_args))
mcmc_state = DictState(locals())
mcmc_state.state = self._state
mcmc_state.entropy_args = self._get_entropy_args(entropy_args)
if len(kwargs) > 0:
raise ValueError("unrecognized keyword arguments: " +
str(list(kwargs.keys())))
ret = self._state.multiflip_mcmc_sweep(mcmc_state, _get_rng())
if accept_stats is not None:
for key in ["proposal", "acceptance"]:
if key not in accept_stats:
accept_stats[key] = np.zeros(len(nproposal),
dtype="uint64")
accept_stats["proposal"] += nproposal.a
accept_stats["acceptance"] += nacceptance.a
return ret
[docs]
@mcmc_sweep_wrap
def xvals_sweep(self, beta=1., niter=100, bisect_args={}, min_size=1,
entropy_args={}, verbose=False):
r"""Perform sweeps of a Metropolis-Hastings acceptance-rejection
merge-split MCMC to sample the edge weight category
values, based on a bisection search.
Parameters
----------
beta : ``float`` (optional, default: ``1.``)
Inverse temperature parameter.
niter : ``int`` (optional, default: ``100``)
Maximum number of categories to update.
bisect_args : ``dict`` (optional, default: ``{}``)
Parameter for the bisection search use to optimize/sample edge
weights. See :meth:`~.DynamicsBlockStateBase.bisect_x`) for
documentation.
min_size : ``int`` (optional, default: ``1``)
Minimum size of edge categories that will be updated.
entropy_args : ``dict`` (optional, default: ``{}``)
Entropy arguments, with the same meaning and defaults as in
:meth:`~.DynamicsBlockStateBase.entropy`.
verbose : ``boolean`` (optional, default: ``False``)
If ``verbose == True``, detailed information will be displayed.
Returns
-------
dS : ``float``
Entropy difference after the sweeps.
nmoves : ``int``
Number of variables moved.
"""
xvals = self.get_xvals()
r = min(niter/len(xvals), 1) if len(xvals) > 0 else 1
niter = max(1, int(round(niter/len(xvals)))) if len(xvals) > 0 else 1
ea = self._get_entropy_args(entropy_args)
ba = get_bisect_args(**dict(dict(min_bound=self.xmin_bound,
max_bound=self.xmax_bound,
reversible=not np.isinf(beta)),
**bisect_args))
ret = (0, 0)
for i in range(niter):
eret = self._state.xvals_sweep(beta, r, min_size, ea, ba,
_get_rng())
ret = tuple(sum(x) for x in zip(ret, eret))
return ret
[docs]
def get_x(self):
"""Return latent edge weights."""
return self.x.copy()
[docs]
def get_xvals(self):
"""Return latent edge weight categories."""
return self._state.get_xvals()
[docs]
def get_xhist(self):
"""Return histogram (i.e. counts) of edge weight categories."""
xvals = self.get_xvals()
if len(xvals) == 0:
return np.array([], dtype="int")
xvals = np.append(xvals, np.nextafter(xvals[-1], np.inf))
return np.histogram(self.get_x().fa, xvals)[0]
[docs]
@mcmc_sweep_wrap
def theta_mcmc_sweep(self, beta=1., niter=1, pold=1, pnew=1, ptu=.1,
bisect_args={}, deterministic=False, sequential=True,
parallel=True, pseudo=False, verbose=False,
entropy_args={}, **kwargs):
r"""Perform sweeps of a Metropolis-Hastings acceptance-rejection
sampling MCMC to sample node parameters.
Parameters
----------
beta : ``float`` (optional, default: ``1.``)
Inverse temperature parameter.
niter : ``int`` (optional, default: ``1``)
Number of sweeps.
pold : ``float`` (optional, default: ``1``)
Relative probability of proposing a new node value from existing
categories.
pnew : ``float`` (optional, default: ``1``)
Relative probability of proposing a new node value from a new
categories.
ptu : ``float`` (optional, default: ``.1``)
Probability of choosing from an existing category uniformly at
random (instead of doing a bisection search).
bisect_args : ``dict`` (optional, default: ``{}``)
Parameter for the bisection search use to optimize/sample edge
weights. See :meth:`~.DynamicsBlockStateBase.bisect_x`) for
documentation.
deterministic : ``boolean`` (optional, default: ``False``)
If ``True``, the the order of node updates will be determinisitc,
otherwise uniformly at random.
sequential : ``boolean`` (optional, default: ``True``)
If ``True``, a sweep will visit every node once, otherwise
individiual updates will be chosen at random.
parallel : ``boolean`` (optional, default: ``True``)
If ``True``, the updates are performed in parallel.
pseudo : ``boolean`` (optional, default: ``False``)
If ``True``, detailed balance will no longer be stricly perserved,
in favor of higher parallelism.
verbose : ``boolean`` (optional, default: ``False``)
If ``verbose == True``, detailed information will be displayed.
entropy_args : ``dict`` (optional, default: ``{}``)
Entropy arguments, with the same meaning and defaults as in
:meth:`~.DynamicsBlockStateBase.entropy`.
Returns
-------
dS : ``float``
Entropy difference after the sweeps.
nmoves : ``int``
Number of variables moved.
"""
entropy_args = self._get_entropy_args(entropy_args)
bisect_args = get_bisect_args(**dict(dict(min_bound=self.tmin_bound,
max_bound=self.tmax_bound,
reversible=not np.isinf(beta)),
**bisect_args))
state = self._state
mcmc_state = DictState(dict(kwargs, **locals()))
if len(kwargs) > 0:
raise ValueError("unrecognized keyword arguments: " +
str(list(kwargs.keys())))
if parallel:
return self._state.pseudo_mcmc_theta_sweep(mcmc_state, _get_rng())
else:
return self._state.mcmc_theta_sweep(mcmc_state, _get_rng())
[docs]
@mcmc_sweep_wrap
def theta_multiflip_mcmc_sweep(self, beta=1., niter=1, pmerge=1, psplit=1,
pmergesplit=1, gibbs_sweeps=5, c=.1,
bisect_args={}, accept_stats=None,
verbose=False, entropy_args={}, **kwargs):
r"""Perform sweeps of a Metropolis-Hastings acceptance-rejection
merge-split MCMC to sample discrete node value categories.
Parameters
----------
beta : ``float`` (optional, default: ``1.``)
Inverse temperature parameter.
niter : ``int`` (optional, default: ``1``)
Number of sweeps.
pmerge : ``float`` (optional, default: ``1``)
Relative probability of merging two discrete categories.
psplit : ``float`` (optional, default: ``1``)
Relative probability of splitting two discrete categories.
pmergesplit : ``float`` (optional, default: ``1``)
Relative probability of simultaneoulsly merging and splitting two
discrete categories.
gibbs_sweeps : ``int`` (optional, default: ``5``)
Number of Gibbs sweeps performed to achieve a split proposal.
c : ``double`` (optional, default: ``.1``)
Probability of choosing a category uniformly at random to perform a
merge, otherwise an adjacent one is chosen.
bisect_args : ``dict`` (optional, default: ``{}``)
Parameter for the bisection search. See
:meth:`~.DynamicsBlockStateBase.bisect_x`) for documentation.
accept_stats : ``dict`` (optional, default: ``None``)
If provided, the dictionary will be updated with acceptance statistics.
verbose : ``boolean`` (optional, default: ``False``)
If ``verbose == True``, detailed information will be displayed.
entropy_args : ``dict`` (optional, default: ``{}``)
Entropy arguments, with the same meaning and defaults as in
:meth:`~.DynamicsBlockStateBase.entropy`.
Returns
-------
dS : ``float``
Entropy difference after the sweeps.
nmoves : ``int``
Number of variables moved.
"""
niter /= self.u.num_vertices()
gibbs_sweeps = max((gibbs_sweeps, 1))
nproposal = Vector_size_t(4)
nacceptance = Vector_size_t(4)
force_move = kwargs.pop("force_move", False)
bisect_args = get_bisect_args(**dict(dict(min_bound=self.tmin_bound,
max_bound=self.tmax_bound,
reversible=not np.isinf(beta)),
**bisect_args))
mcmc_state = DictState(locals())
mcmc_state.entropy_args = self._get_entropy_args(entropy_args)
mcmc_state.state = self._state
if len(kwargs) > 0:
raise ValueError("unrecognized keyword arguments: " +
str(list(kwargs.keys())))
ret = self._state.multiflip_mcmc_theta_sweep(mcmc_state, _get_rng())
if accept_stats is not None:
for key in ["proposal", "acceptance"]:
if key not in accept_stats:
accept_stats[key] = np.zeros(len(nproposal),
dtype="uint64")
accept_stats["proposal"] += nproposal.a
accept_stats["acceptance"] += nacceptance.a
return ret
[docs]
@mcmc_sweep_wrap
def tvals_sweep(self, beta=1., niter=100, min_size=1, bisect_args={},
entropy_args={}):
r"""Perform sweeps of a Metropolis-Hastings acceptance-rejection
merge-split MCMC to sample the node bias category values, based on a
bisection search.
Parameters
----------
beta : ``float`` (optional, default: ``1.``)
Inverse temperature parameter.
niter : ``int`` (optional, default: ``100``)
Maximum number of categories to update.
bisect_args : ``dict`` (optional, default: ``{}``)
Parameter for the bisection search use to optimize/sample node
biases. See :meth:`~.DynamicsBlockStateBase.bisect_x`) for
documentation.
min_size : ``int`` (optional, default: ``1``)
Minimum size of node categories that will be updated.
entropy_args : ``dict`` (optional, default: ``{}``)
Entropy arguments, with the same meaning and defaults as in
:meth:`~.DynamicsBlockStateBase.entropy`.
verbose : ``boolean`` (optional, default: ``False``)
If ``verbose == True``, detailed information will be displayed.
Returns
-------
dS : ``float``
Entropy difference after the sweeps.
nmoves : ``int``
Number of variables moved.
"""
tvals = self.get_tvals()
r = min(niter/len(tvals), 1) if len(tvals) > 0 else 1
niter = max(1, int(round(niter/len(tvals)))) if len(tvals) > 0 else 1
ea = self._get_entropy_args(entropy_args)
ba = get_bisect_args(**dict(dict(min_bound=self.tmin_bound,
max_bound=self.tmax_bound,
reversible=not np.isinf(beta)),
**bisect_args))
ret = (0, 0)
for i in range(niter):
eret = self._state.tvals_sweep(beta, r, min_size, ea, ba,
_get_rng())
ret = tuple(sum(x) for x in zip(ret, eret))
return ret
[docs]
def get_theta(self):
"""Return latent node values."""
return self.theta.copy()
[docs]
def get_tvals(self):
"""Return latent node categories."""
return self._state.get_tvals()
[docs]
def get_thist(self):
"""Return histogram of node categories."""
tvals = self.get_tvals()
tvals = np.append(tvals, np.nextafter(tvals[-1], np.inf))
return np.histogram(self.get_theta().fa, tvals)[0]
[docs]
def get_edge_prob(self, u, v, x, entropy_args={}, epsilon=1e-8):
r"""Return conditional posterior log-probability of edge :math:`(u,v)`."""
ea = self._get_entropy_args(entropy_args)
return self._state.get_edge_prob(u, v, x, ea, epsilon)
[docs]
def get_edges_prob(self, elist, entropy_args={}, epsilon=1e-8):
r"""Return conditional posterior log-probability of an edge list, with
shape :math:`(E,2)`."""
ea = self._get_entropy_args(entropy_args)
elist = np.asarray(elist, dtype="double")
probs = np.zeros(elist.shape[0])
self._state.get_edges_prob(elist, probs, ea, epsilon)
return probs
[docs]
def edge_cov(self, u, v, toffset=True, pearson=False):
r"""Return the covariance (or Pearson correlation if ``pearson ==
True``) between nodes :math:`u` and :math:`v`, according to their
time-series.
"""
return self._state.node_cov(u, v, toffset, pearson)
[docs]
def edge_TE(self, u, v):
r"""Return the transfer entropy between nodes :math:`u` and :math:`v`,
according to their time-series.
"""
return self._state.node_TE(u, v)
[docs]
def edge_MI(self, u, v):
r"""Return the mutual information between nodes :math:`u` and :math:`v`,
according to their time-series.
"""
return self._state.node_MI(u, v)
[docs]
def get_candidate_edges(self, k=1, r=1, max_rk="k", epsilon=0.01,
c_stop=False, maxiter=0, knn=False, gradient=None,
h=2e-3, allow_edges=False, include_edges=True,
use_hint=True, nrandom=0, keep_all=False,
exact=False, return_graph=False, keep_iter=False,
entropy_args={}, bisect_args={}, verbose=False):
r"""Return the :math:`\lfloor\kappa N\rceil` best edge candidates
according to a stochastic second neighbor search.
Parameters
----------
k : ``float`` (optional, default: ``1``)
:math:`\kappa` parameter.
r : ``float`` (optional, default: ``1``)
Fraction of second neighbors to consider during the search.
max_rk : ``float`` (optional, default: ``"k"``)
Maximum number of second-neighbors to be considered per iteration. A
string value ``"k"`` means that this will match the number of first
neighbors.
epsilon : ``float`` (optional, default: ``.01``)
Convergence criterion.
c_stop : ``boolean`` (optional, default: ``False``)
If ``True``, the clustering coefficient will be used for the
convergence criterion.
maxiter : ``int`` (optional, default: ``0``)
Maximum number of iterations allowed (``0`` means unlimited).
knn : ``boolean`` (optional, default: ``False``)
If ``True``, the KNN graph will be returned.
gradient : ``boolean`` (optional, default: ``None``)
Whether to use the gradient to rank edges. If ``None``, it defaults
to ``True`` if the number of edge categories is empty.
h : ``float`` (optional, default: ``1e-8``)
Step length used to compute the gradient with central finite
difference.
allow_edges : ``boolean`` (optional, default: ``False``)
Permit currently present edges to be included in the search.
include_edges : ``boolean`` (optional, default: ``True``)
Include currently present edges in the result.
use_hint : ``boolean`` (optional, default: ``True``)
Use current edges as a hint during the search.
nrandom : ``int`` (optional, default: ``0``)
Add this many random entries to the list.
keep_all : ``boolean`` (optional, default: ``False``)
Keep all entries seen during the search, not only the best.
exact : ``boolean`` (optional, default: ``False``)
If ``True`` an exact quadratic algorithm will be used.
return_graph : ``boolean`` (optional, default: ``False``)
If ``True`` the result will be returned as graph and a property map.
keep_iter : ``boolean`` (optional, default: ``False``)
If ``True`` the results contain the iteration at which an entry has
been found.
entropy_args : ``dict`` (optional, default: ``{}``)
Entropy arguments, with the same meaning and defaults as in
:meth:`~.DynamicsBlockStateBase.entropy`.
bisect_args : ``dict`` (optional, default: ``{}``)
Parameter for the bisection search use to optimize/sample edge
weights. See :meth:`~.DynamicsBlockStateBase.bisect_x`) for
documentation.
Returns
-------
elist : :class:``~numpy.ndarray`` of shape ``(E, 2)``
Best entries.
a : :class:``~numpy.ndarray``
Edge scores.
"""
g = Graph(self.g.num_vertices(), fast_edge_removal=True)
w = g.new_ep("double")
ei = g.new_ep("int32_t")
N = g.num_vertices()
if knn:
M = k
else:
M = k * self.g.num_vertices()
M = max((int(round(M)), 1))
if max_rk is None:
max_rk = g.num_vertices()
elif max_rk == "k":
max_rk = k
ea = self._get_entropy_args(entropy_args)
if gradient is None:
gradient = len(self.get_xvals()) == 0
bisect_args = get_bisect_args(**dict(dict(maxiter=10, tol=1e-4,
min_bound=self.xmin_bound,
max_bound=self.xmax_bound),
**bisect_args))
n_tot = self._state.get_candidate_edges(g._Graph__graph, M, r, max_rk,
epsilon, c_stop, maxiter,
_prop("e", g, w),
_prop("e", g, ei),
keep_iter, ea, exact, knn,
keep_all, gradient, h,
bisect_args, allow_edges,
include_edges, use_hint,
nrandom, verbose,
_get_rng())
if return_graph:
if not knn and not self.u.is_directed():
g.set_directed(False)
if keep_iter:
return g, w, ei, n_tot
else:
return g, w, n_tot
elist = g.get_edges([w])
idx = elist[:,2].argsort()
a = elist[idx,2]
elist = numpy.array((elist[idx,0], elist[idx,1]), dtype="int").T
return elist, a
[docs]
def virtual_remove_edge(self, u, v, dm=1, entropy_args={}):
"""Return the difference in description length if edge :math:`(u, v)`
with multiplicity ``dm`` would be removed.
"""
entropy_args = self._get_entropy_args(entropy_args)
return self._state.remove_edge_dS(int(u), int(v), dm, entropy_args)
[docs]
def virtual_add_edge(self, u, v, x, dm=1, entropy_args={}):
"""Return the difference in description length if edge :math:`(u, v)`
would be added with multiplicity ``dm`` and weight ``x``.
"""
entropy_args = self._get_entropy_args(entropy_args)
return self._state.add_edge_dS(int(u), int(v), dm, x, entropy_args)
[docs]
def virtual_update_edge(self, u, v, nx, entropy_args={}):
"""Return the difference in description length if edge :math:`(u, v)`
would take a new weight ``nx``.
"""
entropy_args = self._get_entropy_args(entropy_args)
return self._state.update_edge_dS(int(u), int(v), nx, entropy_args)
[docs]
def virtual_update_node(self, v, nt, entropy_args={}):
"""Return the difference in description length if node ``v``
would take a new value ``nt``.
"""
entropy_args = self._get_entropy_args(entropy_args)
return self._state.update_node_dS(int(v), nt, entropy_args)
[docs]
def remove_edge(self, u, v, dm=1):
r"""Remove edge :math:`(u, v)` with multiplicity ``dm``."""
return self._state.remove_edge(int(u), int(v), dm)
[docs]
def add_edge(self, u, v, x, dm=1):
r"""Add edge :math:`(u, v)` with multiplicity ``dm`` and weight ``x``."""
return self._state.add_edge(int(u), int(v), dm, x)
[docs]
def update_edge(self, u, v, nx):
r"""update edge :math:`(u, v)` with weight ``nx``."""
return self._state.update_edge(int(u), int(v), nx)
[docs]
def update_node(self, v, nt):
r"""update node :math:`(u, v)` with value ``nt``."""
return self._state.update_node(int(v), nt)
[docs]
def bisect_x(self, u, v, entropy_args={}, bisect_args={}, fb=False,
ret_sampler=False):
r"""Perform a bisection search to find the best weight value for edge
:math:`(u, v)`.
Parameters
----------
u : ``int`` or :class:`~graph_tool.Vertex`
Source
v : ``int`` or :class:`~graph_tool.Vertex`
Target
entropy_args : ``dict`` (optional, default: ``{}``)
Entropy arguments, with the same meaning and defaults as in
:meth:`~.DynamicsBlockStateBase.entropy`.
bisect_args : ``dict`` (optional, default: ``{}``)
Parameter for the bisection search use to optimize/sample edge
weights. The recognized parameters and their default values are as
follows:
maxiter : ``int`` (default: ``0``)
Maximum number of iterations for bisection search (``0``
means unlimited).
tol : ``float`` (default: ``2e-3``)
Relative tolerance for bisection search.
ftol : ``float`` (default: ``100``)
Absolute tolerance used to extend the search range.
nmax_extend : ``int`` (default: ``8``)
Maximum number of min/max range extensions.
min_bound : ``float`` (default: ``-np.inf``)
Minimum bound for bisection search.
max_bound : ``float`` (default: ``np.inf``)
Maximum bound for bisection search.
min_init : ``float`` (default: ``-np.inf``)
Iniital minimum bound for bisection search.
max_init : ``float`` (default: ``np.inf``)
Initial maximum bound for bisection search.
reversible : ``boolean`` (default: ``True``)
Perform search in a manner that is usable for a reversible
Markov chain.
fb : ``boolean`` (optional, default: ``False``)
Perform a Fibonacci (a.k.a. golden ratio) search among the current
edge categories, instead of a bisection search among all possible
values.
ret_sampler : ``boolean`` (optional, default: ``False``)
If ``True``, a ``BisectionSampler`` object will be returned as well
(for debugging purposes).
"""
entropy_args = self._get_entropy_args(entropy_args)
bisect_args = get_bisect_args(**bisect_args)
ret = self._state.bisect_x(int(u), int(v), entropy_args,
bisect_args, fb, _get_rng())
if ret_sampler:
return ret
return ret[0]
[docs]
def bisect_t(self, v, entropy_args={}, bisect_args={}, fb=False, ret_sampler=False):
r"""Perform a bisection search to find the best bias value for node ``v``.
Parameters
----------
v : ``int`` or :class:`~graph_tool.Vertex`
Vertex to consider.
entropy_args : ``dict`` (optional, default: ``{}``)
Entropy arguments, with the same meaning and defaults as in
:meth:`~.DynamicsBlockStateBase.entropy`.
bisect_args : ``dict`` (optional, default: ``{}``)
Parameter for the bisection search. See
:meth:`~.DynamicsBlockStateBase.bisect_x`) for documentation.
fb : ``boolean`` (optional, default: ``False``)
Perform a Fibonacci (a.k.a. golden ratio) search among the current
node categories, instead of a bisection search among all possible
values.
ret_sampler : ``boolean`` (optional, default: ``False``)
If ``True``, a ``BisectionSampler`` object will be returned as well
(for debugging purposes).
"""
entropy_args = self._get_entropy_args(entropy_args)
bisect_args = get_bisect_args(**bisect_args)
ret = self._state.bisect_t(int(v), entropy_args, bisect_args,
fb, _get_rng())
if ret_sampler:
return ret
return ret[0]
[docs]
@mcmc_sweep_wrap
def xdelta_mcmc_sweep(self, beta=1., niter=1, step=10, pold=.5, pxu=.1,
intra_sweeps=10, verbose=False, entropy_args={},
bisect_args={}, **kwargs):
r"""Perform sweeps of a Metropolis-Hastings acceptance-rejection
MCMC to sample the precision parameter of the edge categories.
Parameters
----------
beta : ``float`` (optional, default: ``1.``)
Inverse temperature parameter.
niter : ``int`` (optional, default: ``1``)
Number of sweeps.
step : ``float`` (optional, default: ``10``)
Multiplicative move step size.
pold : ``float`` (optional, default: ``1``)
Relative probability of proposing a new edge weight from existing
categories.
pxu : ``float`` (optional, default: ``.1``)
Probability of choosing from an existing category uniformly at
random (instead of doing a bisection search).
intra_sweeps : ``int`` (optional, default: ``10``)
Number of Metropolis-Hastings sweeps performed to achieve a staging proposal.
verbose : ``boolean`` (optional, default: ``False``)
If ``verbose == True``, detailed information will be displayed.
entropy_args : ``dict`` (optional, default: ``{}``)
Entropy arguments, with the same meaning and defaults as in
:meth:`~.DynamicsBlockStateBase.entropy`.
bisect_args : ``dict`` (optional, default: ``{}``)
Parameter for the bisection search. See
:meth:`~.DynamicsBlockStateBase.bisect_x`) for documentation.
Returns
-------
dS : ``float``
Entropy difference after the sweeps.
nmoves : ``int``
Number of variables moved.
"""
entropy_args = self._get_entropy_args(entropy_args)
bisect_args = get_bisect_args(**dict(dict(min_bound=self.xmin_bound,
max_bound=self.xmax_bound),
**bisect_args))
state = self._state
mcmc_state = DictState(dict(kwargs, **locals()))
if len(kwargs) > 0:
raise ValueError("unrecognized keyword arguments: " +
str(list(kwargs.keys())))
try:
ret = self._state.mcmc_sweep_xdelta(mcmc_state, _get_rng())
finally:
self.xdelta = self._state.get_xdelta()
return ret
[docs]
@mcmc_sweep_wrap
def tdelta_mcmc_sweep(self, beta=1., niter=1, step=10, pold=.5, ptu=.1,
intra_sweeps=10, verbose=False, entropy_args={},
bisect_args={}, **kwargs):
r"""Perform sweeps of a Metropolis-Hastings acceptance-rejection
MCMC to sample the precision parameter of the node categories.
Parameters
----------
beta : ``float`` (optional, default: ``1.``)
Inverse temperature parameter.
niter : ``int`` (optional, default: ``1``)
Number of sweeps.
step : ``float`` (optional, default: ``10``)
Multiplicative move step size.
pold : ``float`` (optional, default: ``1``)
Relative probability of proposing a new edge weight from existing
categories.
ptu : ``float`` (optional, default: ``.1``)
Probability of choosing from an existing category uniformly at
random (instead of doing a bisection search).
intra_sweeps : ``int`` (optional, default: ``10``)
Number of Metropolis-Hastings sweeps performed to achieve a staging proposal.
verbose : ``boolean`` (optional, default: ``False``)
If ``verbose == True``, detailed information will be displayed.
entropy_args : ``dict`` (optional, default: ``{}``)
Entropy arguments, with the same meaning and defaults as in
:meth:`~.DynamicsBlockStateBase.entropy`.
bisect_args : ``dict`` (optional, default: ``{}``)
Parameter for the bisection search. See
:meth:`~.DynamicsBlockStateBase.bisect_x`) for documentation.
Returns
-------
dS : ``float``
Entropy difference after the sweeps.
nmoves : ``int``
Number of variables moved.
"""
entropy_args = self._get_entropy_args(entropy_args)
bisect_args = get_bisect_args(**dict(dict(min_bound=self.tmin_bound,
max_bound=self.tmax_bound),
**bisect_args))
state = self._state
mcmc_state = DictState(dict(kwargs, **locals()))
if len(kwargs) > 0:
raise ValueError("unrecognized keyword arguments: " +
str(list(kwargs.keys())))
try:
ret = self._state.mcmc_sweep_tdelta(mcmc_state, _get_rng())
finally:
self.tdelta = self._state.get_tdelta()
return ret
[docs]
def sample_x(self, u, v, beta=1, entropy_args={}, bisect_args={}, fb=False,
ret_sampler=False):
r"""Sample a proposed weight value for edge :math:`(u, v)`.
Parameters
----------
u : ``int`` or :class:`~graph_tool.Vertex`
Source node.
v : ``int`` or :class:`~graph_tool.Vertex`
Target node.
beta : ``float`` (optional, default: ``1``)
Inverse temperature.
entropy_args : ``dict`` (optional, default: ``{}``)
Entropy arguments, with the same meaning and defaults as in
:meth:`~.DynamicsBlockStateBase.entropy`.
bisect_args : ``dict`` (optional, default: ``{}``)
Parameter for the bisection search. See
:meth:`~.DynamicsBlockStateBase.bisect_x`) for documentation.
fb : ``bool`` (optional, default: ``False``)
If ``True``, the search will be confined to a Fibonacci search over
the discrete values given by
:meth:`~DynamicsBlockStateBase.get_xvals()`.
ret_sampler : ``boolean`` (optional, default: ``False``)
If ``True``, a ``BisectionSampler`` object will be returned as well
(for debugging purposes).
"""
entropy_args = self._get_entropy_args(entropy_args)
bisect_args = get_bisect_args(**dict(dict(min_bound=self.xmin_bound,
max_bound=self.xmax_bound),
**bisect_args))
ret = self._state.sample_x(int(u), int(v), beta, entropy_args,
bisect_args, fb, _get_rng())
if ret_sampler:
return ret
return ret[0]
[docs]
def sample_t(self, v, beta=1, entropy_args={}, bisect_args={}, fb=False,
ret_sampler=False):
r"""Sample a value for node ``v``.
Parameters
----------
v : ``int`` or :class:`~graph_tool.Vertex`
Node to be considered.
beta : ``float`` (optional, default: ``1``)
Inverse temperature.
entropy_args : ``dict`` (optional, default: ``{}``)
Entropy arguments, with the same meaning and defaults as in
:meth:`~.DynamicsBlockStateBase.entropy`.
bisect_args : ``dict`` (optional, default: ``{}``)
Parameter for the bisection search. See
:meth:`~.DynamicsBlockStateBase.bisect_x`) for documentation.
fb : ``bool`` (optional, default: ``False``)
If ``True``, the search will be confined to a Fibonacci search over
the discrete values given by
:meth:`~DynamicsBlockStateBase.get_xvals()`.
ret_sampler : ``boolean`` (optional, default: ``False``)
If ``True``, a ``BisectionSampler`` object will be returned as well
(for debugging purposes).
"""
entropy_args = self._get_entropy_args(entropy_args)
bisect_args = get_bisect_args(**dict(dict(min_bound=self.tmin_bound,
max_bound=self.tmax_bound),
**bisect_args))
ret = self._state.sample_t(int(v), beta, entropy_args,
bisect_args, fb, _get_rng())
if ret_sampler:
return ret
return ret[0]
[docs]
def set_xdelta(self, delta):
"""Set edge weight precision parameter."""
self._state.set_xdelta(delta)
self.xdelta = delta
[docs]
def set_tdelta(self, delta):
"""Set node bias precision parameter."""
self._state.set_tdelta(delta)
self.tdelta = delta
[docs]
def collect_marginal(self, g=None, xbins=100, xslack=.2, expand_xbins=True,
tbins=100, tslack=.2, expand_tbins=True):
r"""Collect marginal inferred network during MCMC runs.
Parameters
----------
g : :class:`~graph_tool.Graph` (optional, default: ``None``)
Previous marginal graph.
xbins : :class:`numpy.ndarray` or ``int`` (optional, default: ``100``)
Bins to be used to obtain the marginal edge weight distribution. If
an integer is given, it will correspond to the number of equally
spaced bins spanning the range of current weight values, increased
in both directions by a factor ``xslack`` of the total range.
xslack : ``float`` (optional, default: ``.2``)
Fraction of the current range of edge weight values to increase when
constructing the bins.
expand_xbins : ``bool`` (optional, default: ``True``)
If ``True``, when a edge weight value is encountered below or above
the current range of the bins, the bins are expanded in the
corresponding direction by duplicating their size, using the same
spacing.
tbins : :class:`numpy.ndarray` or ``int`` (optional, default: ``100``)
Bins to be used to obtain the marginal node bias distribution. If
an integer is given, it will correspond to the number of equally
spaced bins spanning the range of current bias values, increased in
both directions by a factor ``tslack`` of the total range.
tslack : ``float`` (optional, default: ``.2``)
Fraction of the current range of node bias values to increase when
constructing the bins.
expand_tbins : ``bool`` (optional, default: ``True``)
If ``True``, when a node bias value is encountered below or above
the current range of the bins, the bins are expanded in the
corresponding direction by duplicating their size, using the same
spacing.
Returns
-------
g : :class:`~graph_tool.Graph`
New marginal graph, containing the following :ref:`internal properties <sec_internal_props>`:
============ ====== ======================= ===========================================
Name Key Value type Description
============ ====== ======================= ===========================================
``"eprob"`` Edge ``double`` Marginal edge probabilities
``"x"`` Edge ``double`` Marginal edge weight mean
``"xdev"`` Edge ``double`` Marginal edge weight standard deviation
``"t"`` Vertex ``double`` Marginal node bias mean
``"tdev"`` Vertex ``double`` Marginal node bias standard deviation
``"xbins"`` Graph :class:`~numpy.ndarray` Edge weight bins
``"xcount"`` Edge ``vector<int>`` Marginal edge weight counts
``"tbins"`` Graph :class:`~numpy.ndarray` Node bias bins
``"tcount"`` Vertex ``vector<int>`` Marginal node bias counts
``"count"`` Graph ``int`` Total number of marginal samples collected
============ ====== ======================= ===========================================
Notes
-----
The posterior marginal probability of an edge :math:`(i,j)` is defined as
.. math::
\pi_{ij} = \sum_{\boldsymbol x}{\boldsymbol 1}_{x_{ij}>0}P(\boldsymbol x|\boldsymbol D)
where :math:`P(\boldsymbol x|\boldsymbol D)` is the posterior
probability of the edge weights :math:`\boldsymbol x` given the data.
Likewise, the marginal mean :math:`\left<x_{ij}\right>` and standard
deviation :math:`\sigma_{ij}` for the edge weights are given by
.. math::
\begin{aligned}
\left<x_{ij}\right> &= \sum_{\boldsymbol x}x_{ij}P(\boldsymbol x|\boldsymbol D)\\
\sigma_{ij}^2 &= \sum_{\boldsymbol x}(x_{ij} - \left<x_{ij}\right>)^2 P(\boldsymbol x|\boldsymbol D).
\end{aligned}
The mean and standard deviation for the node biases are entirely
analogous.
"""
if g is None:
g = Graph(directed=self.g.is_directed())
g.add_vertex(self.g.num_vertices())
g.gp.count = g.new_gp("int", 0)
g.ep.count = g.new_ep("int")
g.ep.eprob = g.new_ep("double")
g.ep.xsum = g.new_ep("double")
g.ep.x2sum = g.new_ep("double")
g.ep.x = g.new_ep("double")
g.ep.xdev = g.new_ep("double")
g.vp.tsum = g.new_vp("double")
g.vp.t2sum = g.new_vp("double")
g.vp.t = g.new_vp("double")
g.vp.tdev = g.new_vp("double")
if "xbins" not in g.gp:
g.gp.xbins = g.new_gp("object", xbins)
g.gp.tbins = g.new_gp("object", tbins)
if "xcount" not in g.ep:
g.ep.xcount = g.new_ep("vector<int32_t>")
g.vp.tcount = g.new_vp("vector<int32_t>")
def get_bins(bins, vals, slack):
if isinstance(bins, Iterable) or len(vals) == 0:
return bins
xr = [vals.min(), vals.max()]
delta = xr[1] - xr[0]
if delta == 0:
return bins
xr[0] -= delta * slack
xr[1] += delta * slack
return np.linspace(xr[0], xr[1], bins)
def expand_bins(bins, count, left):
d = np.diff(bins)
if left:
nbins = np.concatenate((np.cumsum(-d)[::-1] + bins[0], bins))
graph_merge(g, g, in_place=True,
props=dict(idx_inc=[(count,
g.new_property(count.key_type(),
"vector<int>",
val=[-len(bins)+1, 0]))]))
else:
nbins = np.concatenate((bins, np.cumsum(d) + bins[-1]))
return nbins
u = self.get_graph()
x = self.get_x()
t = self.theta
xbins = g.gp.xbins = get_bins(g.gp.xbins, x.fa, xslack)
tbins = g.gp.tbins = get_bins(g.gp.tbins, t.fa, tslack)
if len(x.fa) > 0 and isinstance(xbins, Iterable):
if expand_xbins:
while x.fa.min() < xbins[0] and not np.isinf(x.fa.min()):
xbins = g.gp.xbins = expand_bins(xbins, g.ep.xcount, True)
while x.fa.max() >= xbins[-1] and not np.isinf(x.fa.max()):
xbins = g.gp.xbins = expand_bins(xbins, g.ep.xcount, False)
xd = x.t(lambda y: np.digitize(y, xbins), value_type="int")
else:
xd = g.new_ep("int", val=-1)
if isinstance(tbins, Iterable):
if expand_tbins:
while t.fa.min() < tbins[0] and not np.isinf(t.fa.min()):
tbins = g.gp.tbins = expand_bins(tbins, g.vp.tcount, True)
while t.fa.max() >= tbins[-1] and not np.isinf(t.fa.max()):
tbins = g.gp.tbins = expand_bins(tbins, g.vp.tcount, False)
td = t.t(lambda y: np.digitize(y, tbins), value_type="int")
else:
td = g.new_vp("int", val=-1)
try:
g.set_fast_edge_lookup(True)
graph_merge(g, u, in_place=True, simple=True,
props=dict(sum=[(g.ep.count, u.new_ep("int", val=1)),
(g.ep.xsum, x),
(g.ep.x2sum, x.t(lambda y: y ** 2)),
(g.vp.tsum, t),
(g.vp.t2sum, t.t(lambda y: y ** 2))],
idx_inc=[(g.ep.xcount, xd),
(g.vp.tcount, td)]))
finally:
g.gp.count += 1
g.ep.eprob.fa = g.ep.count.fa / g.gp.count
g.ep.x.fa = g.ep.xsum.fa / g.ep.count.fa
g.ep.xdev.fa = np.sqrt(np.abs(g.ep.x2sum.fa / g.ep.count.fa - g.ep.x.fa ** 2))
g.vp.t.fa = g.vp.tsum.fa / g.gp.count
g.vp.tdev.fa = np.sqrt(np.abs(g.vp.t2sum.fa / g.gp.count - g.vp.t.fa ** 2))
return g
[docs]
class BPBlockStateBase(DynamicsBlockStateBase, ABC):
def __init__(self, *args, **kwargs):
r"""Base state for network reconstruction where the respective model can
be used with belief-propagation."""
DynamicsBlockStateBase.__init__(self, *args, **kwargs)
[docs]
def get_S_bp(self, **kwargs):
"""Get negative model likelihood according to BP."""
bstate = self.get_bp_state(converge=False)
bstate.converge(**kwargs)
S = 0
for s in self.s:
S -= bstate.log_prob(s)
return S
[docs]
def get_elist_grad(self, h=2e-3, entropy_args={}):
"""Get edge list gradient."""
entropy_args = self._get_entropy_args(entropy_args)
ediff = []
for u, v in self.elist[1]:
ediff.append(self._state.edge_diff(u, v, h, entropy_args))
return ediff
[docs]
def get_node_grad(self, h=2e-3, entropy_args={}):
"""Get node gradient."""
entropy_args = self._get_entropy_args(entropy_args)
ndiff = []
for v in self.u.vertices():
ndiff.append(self._state.node_diff(v, h, entropy_args))
return ndiff
[docs]
def get_S(self):
"""Get negative model likelihood according to pseudo-likelihood."""
return DynamicsBlockStateBase.entropy(self, sbm=False, tdist=False,
xdist=False, density=False, xl1=0,
tl1=0, alpha=1, test=False)
[docs]
@abstractmethod
def get_bp_state(**kwargs):
pass
[docs]
class EpidemicsBlockState(DynamicsBlockStateBase):
def __init__(self, s, g=None, exposed=False, theta=-1, x_range=(-np.inf, 1e-8),
theta_range=(-np.inf, 1e-8), **kwargs):
r"""Inference state for network reconstruction based on epidemic dynamics, using
the stochastic block model as a prior.
Parameters
----------
s : :class:`~numpy.ndarray` of shape ``(N,M)`` or ``list`` of :class:`~graph_tool.VertexPropertyMap` or :class:`~graph_tool.VertexPropertyMap`
Time series used for reconstruction.
If the type is :class:`~numpy.ndarray`, it should correspond to a
``(N,M)`` data matrix with ``M`` samples for all ``N`` nodes.
A value of ``1`` means infected and ``0`` susceptible. Other
values are allowed (e.g. for recovered), but their actual value is
unimportant for reconstruction.
If the parameter ``g`` is provided, this can be optionally a list of
of :class:`~graph_tool.VertexPropertyMap` objects, where each entry
in this list must be a :class:`~graph_tool.VertexPropertyMap` with
type ``vector<int>``. If a single property map is given, then a
single time series is assumed.
If the parameter ``t`` below is given, each property map value for a
given node should contain only the states for the same points in
time given by that parameter.
g : :class:`~graph_tool.Graph` (optional, default: ``None``)
Initial graph state. If not provided, an empty graph will be assumed.
exposed : ``boolean`` (optional, default: ``False``)
If ``True``, the data is supposed to come from a SEI, SEIR,
etc. model, where a susceptible node (valued ``0``) first transits
to an exposed state (valued ``-1``) upon transmission, before
transiting to the infective state (valued ``1``).
**kwargs : (optional)
Remaining parameters to be passed to :class:`~graph_tool.inference.DynamicsBlockStateBase`
References
----------
.. [peixoto-network-2019] Tiago P. Peixoto, "Network reconstruction and
community detection from dynamics", Phys. Rev. Lett. 123 128301
(2019), :doi:`10.1103/PhysRevLett.123.128301`, :arxiv:`1903.10833`
.. [peixoto-network-2024] Tiago P. Peixoto, "Network reconstruction via
the minimum description length principle", :arxiv:`2405.01015`
.. [peixoto-scalable-2024] Tiago P. Peixoto, "Scalable network
reconstruction in subquadratic time", :arxiv:`2401.01404`
"""
DynamicsBlockStateBase.__init__(self, s, g=g, exposed=exposed,
theta=theta, x_range=x_range,
theta_range=theta_range,
directed=directed, discrete=True,
**dict(dict(directed=True),
**dmask(kwargs, ["discrete"])))
def _make_state(self):
return libinference.make_epidemics_state(self._state, self.ot, self.os,
self.oactive, self.params)
[docs]
def get_t(self):
"""Return the latent edge transmission probabilities."""
x = self.get_x()
x.fa = 1-exp(x.fa)
return x
def _mcmc_sweep(self, mcmc_state):
return libinference.mcmc_epidemics_sweep(mcmc_state, self._state,
_get_rng())
[docs]
class IsingBlockStateBase(ABC):
@abstractmethod
def __init__(self, s, g=None, has_zero=False, **kwargs):
r"""Base state for network reconstruction based on the Ising model, using the
stochastic block model as a prior.
This class is not supposed to be instantiated directly.
Instead one of its specialized subclasses must be used, which have the
same signature: :class:`IsingGlauberBlockState`,
:class:`PseudoIsingBlockState`, :class:`CIsingGlauberBlockState`,
:class:`PseudoCIsingBlockState`.
Parameters
----------
s : :class:`~numpy.ndarray` of shape ``(N,M)`` or ``list`` of :class:`~graph_tool.VertexPropertyMap` or :class:`~graph_tool.VertexPropertyMap`
Time series or independent samples used for reconstruction.
If the type is :class:`~numpy.ndarray`, it should correspond to a
``(N,M)`` data matrix with ``M`` samples for all ``N`` nodes.
The values must correspond to Ising states: ``-1`` or ``+1``
If the parameter ``g`` is provided, this can be optionally a list of
of :class:`~graph_tool.VertexPropertyMap` objects, where each entry
in this list must be a :class:`~graph_tool.VertexPropertyMap` with
type ``vector<int>``. If a single property map is given, then a
single time series is assumed.
If the parameter ``t`` below is given, each property map value for a
given node should contain only the states for the same points in
time given by that parameter.
g : :class:`~graph_tool.Graph` (optional, default: ``None``)
Initial graph state. If not provided, an empty graph will be assumed.
has_zero : bool (optional, default: ``False``)
If ``True``, the three-state "Ising" model with values ``{-1,0,1}``
is used.
**kwargs : (optional)
Remaining parameters to be passed to :class:`~graph_tool.inference.DynamicsBlockStateBase`
References
----------
.. [ising-model] https://en.wikipedia.org/wiki/Ising_model
.. [peixoto-network-2019] Tiago P. Peixoto, "Network reconstruction and
community detection from dynamics", Phys. Rev. Lett. 123 128301
(2019), :doi:`10.1103/PhysRevLett.123.128301`, :arxiv:`1903.10833`
.. [peixoto-network-2024] Tiago P. Peixoto, "Network reconstruction via
the minimum description length principle", :arxiv:`2405.01015`
.. [peixoto-scalable-2024] Tiago P. Peixoto, "Scalable network
reconstruction in subquadratic time", :arxiv:`2401.01404`
"""
pass
[docs]
def get_dyn_state(self, s=None):
"""Return an :class:`~graph_tool.dynamics.IsingGlauberState` instance
corresponding to the inferred model, optionally with initial state given
by ``s``."""
if not self.params["has_zero"]:
return IsingGlauberState(self.u, w=self.x, h=self.theta, s=s)
return PottsGlauberState(self.u, f=[[ 1 , 0, -1],
[ 0 , 0, 0],
[-1, 0, 1]],
w=self.x,
h=group_vector_property([self.theta.t(lambda x: -x),
self.u.new_vp("double"),
self.theta]),
s=s,
shift=-1)
[docs]
class IsingGlauberBlockState(DynamicsBlockStateBase, IsingBlockStateBase):
def __init__(self, s, g=None, has_zero=False, **kwargs):
r"""State for network reconstruction based on the Glauber dynamics of the Ising
model, using the stochastic block model as a prior.
See documentation for :class:`IsingBlockStateBase` for details.
Notes
-----
This is a dynamical model with a transition likelihood for node
:math:`i` to state :math:`s_i(t+1) \in \{-1,+1\}` given by:
.. math::
P(s_i(t+1)|\boldsymbol s(t), \boldsymbol A,
\boldsymbol x, \boldsymbol \theta) =
\frac{\exp(s_i(t+1)\sum_jA_{ij}x_{ij}s_j(t) + \theta_is_i(t+1))}
{2\cosh(\sum_jA_{ij}x_{ij}s_j(t) + \theta_i)}.
"""
DynamicsBlockStateBase.__init__(self, s, g=g, has_zero=has_zero,
discrete=True,
**dict(dict(directed=True),
**dmask(kwargs, ["discrete"])))
def _make_state(self):
return libinference.make_ising_glauber_state(self._state, self.ot,
self.os, self.oactive,
self.params)
[docs]
class CIsingGlauberBlockState(DynamicsBlockStateBase, IsingBlockStateBase):
def __init__(self, s, g=None, has_zero=False, **kwargs):
r"""State for network reconstruction based on the Glauber dynamics of the
continuous Ising model, using the stochastic block model as a prior.
See documentation for :class:`IsingBlockStateBase` for details. Note
that in this case the ``s`` parameter should contain property maps of
type ``vector<double>``, with values in the range :math:`[-1,1]`.
Notes
-----
This is a dynamical model with a transition likelihood for node
:math:`i` to state :math:`s_i(t+1) \in [-1,+1]` given by
.. math::
P(s_i(t+1)|\boldsymbol s(t), \boldsymbol A,
\boldsymbol x, \boldsymbol \theta) =
\frac{\exp(s_i(t+1)\sum_jA_{ij}w_{ij}s_j(t) + h_is_i(t+1))}
{Z(\sum_jA_{ij}w_{ij}s_j(t) + h_i)},
with :math:`Z(x) = 2\sinh(x)/x`.
"""
DynamicsBlockStateBase.__init__(self, s, g=g, has_zero=has_zero,
**dict(dict(directed=True), **kwargs))
[docs]
def get_dyn_state(self, s=None):
"""Return an :class:`~graph_tool.dynamics.CIsingGlauberState` instance
corresponding to the inferred model, optionally with initial state given
by ``s``."""
return CIsingGlauberState(self.u, w=self.x, h=self.theta, s=s)
def _make_state(self):
return libinference.make_cising_glauber_state(self._state, self.ot,
self.os, self.oactive,
self.params)
[docs]
class PseudoIsingBlockState(IsingBlockStateBase, BPBlockStateBase):
def __init__(self, s, g=None, has_zero=False, **kwargs):
r"""State for network reconstruction based on the equilibrium
configurations of the Ising model, using the pseudolikelihood
approximation and the stochastic block model as a prior.
See documentation for :class:`IsingBlockStateBase` for details.
Notes
-----
This is a equilibrium model with where the states :math:`\boldsymbol s`
are sampled with probability
.. math::
P(\boldsymbol s | \boldsymbol A,
\boldsymbol x, \boldsymbol \theta) =
\frac{\exp(\sum_jA_{ij}x_{ij}s_is_j + \sum_i\theta_is_i)}
{Z(\boldsymbol A, \boldsymbol x, \boldsymbol \theta)},
where :math:`Z(\boldsymbol A, \boldsymbol x, \boldsymbol \theta)` is an
intractable normalization constant.
Instead of computing this likelihood exactly, this model makes use of
the pseudo-likelihood approximation [pseudo]_:
.. math::
P(\boldsymbol s | \boldsymbol A,
\boldsymbol x, \boldsymbol \theta) =
\prod_{i<j}\frac{\exp(s_i\sum_{j\ne i}A_{ij}x_{ij}s_j + \theta_is_i)}
{2\cosh(\sum_{j\ne i}A_{ij}x_{ij}s_j + \theta_i)}.
References
----------
.. [pseudo] https://en.wikipedia.org/wiki/Pseudolikelihood
"""
BPBlockStateBase.__init__(self, s, g=g, has_zero=has_zero,
directed=False, discrete=True,
**dmask(kwargs, ["directed", "discrete"]))
[docs]
def get_bp_state(self, **kwargs):
"""Return an :class:`~graph_tool.dynamics.IsingBPState` instance
corresponding to the inferred model."""
return IsingBPState(self.u, x=self.x,
theta=self.theta,
has_zero=self.params.get("has_zero"), **kwargs)
def _make_state(self):
return libinference.make_pseudo_ising_state(self._state, self.ot,
self.os, self.oactive,
self.params)
[docs]
class PseudoCIsingBlockState(DynamicsBlockStateBase, IsingBlockStateBase):
def __init__(self, s, g=None, has_zero=False, **kwargs):
r"""State for network reconstruction based on the equilibrium configurations of
the continuous Ising model, using the Pseudolikelihood approximation and
the stochastic block model as a prior.
See documentation for :class:`IsingBlockStateBase` for details.
Note that in this case the ``s`` parameter should contain property maps
of type ``vector<double>``, with values in the range :math:`[-1,1]`.
Notes
-----
This is a equilibrium model with where the states :math:`\boldsymbol s`
are sampled with probability
.. math::
P(\boldsymbol s | \boldsymbol A,
\boldsymbol x, \boldsymbol \theta) =
\frac{\exp(\sum_jA_{ij}x_{ij}s_is_j + \sum_i\theta_is_i)}
{Z(\boldsymbol A, \boldsymbol x, \boldsymbol \theta)},
where :math:`Z(\boldsymbol A, \boldsymbol x, \boldsymbol \theta)` is an
intractable normalization constant.
Instead of computing this likelihood exactly, this model makes use of
the pseudo-likelihood approximation [pseudo]_:
.. math::
P(\boldsymbol s | \boldsymbol A,
\boldsymbol x, \boldsymbol \theta) =
\prod_{i<j}\frac{\exp(s_i\sum_{j\ne i}A_{ij}x_{ij}s_j + \theta_is_i)}
{Z(\sum_{j\ne i}A_{ij}x_{ij}s_j + \theta_i)},
with :math:`Z(x) = 2\sinh(x)/x`.
References
----------
.. [pseudo] https://en.wikipedia.org/wiki/Pseudolikelihood
"""
DynamicsBlockStateBase.__init__(self, s, g=g, has_zero=has_zero,
directed=False, **kwargs)
def _make_state(self):
return libinference.make_pseudo_cising_state(self._state, self.ot,
self.os, self.oactive,
self.params)
[docs]
def get_dyn_state(self, s=None):
"""Return an :class:`~graph_tool.dynamics.CIsingGlauberState` instance
corresponding to the inferred model, optionally with initial state given
by ``s``."""
return CIsingGlauberState(self.u, w=self.x, h=self.theta, s=s)
def zero_mean(s, g):
if isinstance(s, np.ndarray):
s = s.copy()
for i in range(s.shape[0]):
s[i,:] -= s[i,:].mean()
else:
if isinstance(s, list):
s = [sx.copy("vector<double>") for sx in s]
else:
s = [s.copy("vector<double>")]
for sx in s:
for v in g.vertices():
sx[v].a -= sx[v].a.mean()
return s
[docs]
class PseudoNormalBlockState(BPBlockStateBase):
def __init__(self, s, g=None, fix_mean=True, positive=True, pslack=1e-6,
theta_range=(-200, 200), **kwargs):
r"""State for network reconstruction based on the multivariate normal
distribution, using the Pseudolikelihood approximation and the
stochastic block model as a prior.
``fix_mean == True`` means that ``s`` will be changed to become zero-mean.
``positive == True`` ensures that the result is positive-semidefinite,
according to slack given by ``pslack``.
See documentation for :class:`DynamicsBlockStateBase` for more details.
"""
if fix_mean:
s = zero_mean(s, g)
BPBlockStateBase.__init__(self, s, g=g, positive=positive, pslack=pslack,
directed=False, theta_range=theta_range,
**kwargs)
self.theta_first = False
def _make_state(self):
return libinference.make_pseudo_normal_state(self._state, self.ot,
self.os, self.oactive,
self.params)
[docs]
def get_dcov(self):
"""Return data covariance matrix."""
N = self.g.num_vertices()
S = np.zeros((N, N))
M = 0
for m in range(len(self.s)):
M += len(self.s[m][0])
for i in range(N):
for j in range(N):
S[i,j] += np.dot(self.s[0][i].a,
self.s[0][j].a)
S /= M
return S
[docs]
def get_precision(self):
"""Return precision matrix."""
theta = self.theta.copy()
w = self.x.copy()
w.fa = abs(w.fa)
pslack = self.params["pslack"]
for v in self.u.vertices():
k = v.out_degree(w)
theta[v] = min(theta[v], -log(k)/2-pslack) if k > 0 else theta[v]
W = adjacency(self.u, self.x).tolil()
for v in self.u.vertices():
W[int(v), int(v)] = 1/exp(theta[v] * 2)
return W
[docs]
def get_theta_shifted(self):
"""Return shifted node values to ensure positive semi-definiteness."""
h = self.theta.copy()
w = self.x.copy()
w.fa = np.abs(w.fa)
k = self.u.degree_property_map("out", w)
idx = k.a > 0
pslack = self.params["pslack"]
h.a[idx] = np.minimum(-np.log(k.a[idx])/2 - pslack, h.a[idx])
return h
[docs]
def log_P_exact(self):
"""Return exact log-likelihood."""
S = self.get_dcov()
W = self.get_precision().todense()
N = W.shape[0]
D = np.linalg.slogdet(W)[1]
M = 0
for m in range(len(self.s)):
M += len(self.s[m][0])
return float(-M*(S@W).trace()/2 + M/2*D - M*N*np.log(2*np.pi)/2)
[docs]
def log_Z_exact(self):
"""Return exact log-likelihood normalization constant."""
W = self.get_precision().todense()
N = W.shape[0]
D = np.linalg.slogdet(W)[1]
return -float(D/2 - N * np.log(2 * np.pi)/2)
[docs]
def get_dyn_state(self, s=None):
"""Return an :class:`~graph_tool.dynamics.NormalState` instance
corresponding to the inferred model, optionally with initial state given
by ``s``."""
h = self.get_theta_shifted()
h.a = np.exp(h.a)
return NormalState(self.u, w=self.x, h=h, s=s)
[docs]
def get_bp_state(self, **kwargs):
"""Return an :class:`~graph_tool.dynamics.NormalBPState` instance
corresponding to the inferred model."""
theta = self.get_theta_shifted()
theta.fa = np.exp(-2 * theta.fa)
return NormalBPState(self.u, x=self.x, theta=theta, **kwargs)
[docs]
class NormalGlauberBlockState(DynamicsBlockStateBase):
def __init__(self, s, g=None, self_loops=False, fix_mean=True, **kwargs):
r"""State for network reconstruction based on the dynamical multivariate
normal distribution, using the Pseudolikelihood approximation and the
stochastic block model as a prior.
``fix_mean == True`` means that ``s`` will be changed to become zero-mean.
``positive == True`` ensures that the result is positive-semidefinite,
according to slack given by ``pslack``.
See documentation for :class:`DynamicsBlockStateBase` for more details.
"""
if fix_mean:
s = zero_mean(s, g)
DynamicsBlockStateBase.__init__(self, s, g=g, self_loops=self_loops,
**kwargs)
self.theta_first = False
def _make_state(self):
return libinference.make_normal_glauber_state(self._state, self.ot,
self.os, self.oactive,
self.params)
[docs]
def get_dyn_state(self, s=None):
"""Return an :class:`~graph_tool.dynamics.NormalState` instance
corresponding to the inferred model, optionally with initial state given
by ``s``."""
h = self.get_theta_shifted()
h.a = np.exp(h.a)
return NormalState(self.u, w=self.x, h=h, s=s)
[docs]
class LinearNormalBlockState(DynamicsBlockStateBase):
def __init__(self, s, g=None, self_loops=True, **kwargs):
r"""State for network reconstruction based on a linear dynamical model.
``self_loops == True`` means self-loops will be allowed in the
reconstruction.
See documentation for :class:`DynamicsBlockStateBase` for more details.
"""
DynamicsBlockStateBase.__init__(self, s, g=g, self_loops=self_loops,
**dict(dict(directed=True), **kwargs))
self.theta_first = False
def _make_state(self):
return libinference.make_linear_normal_state(self._state, self.ot,
self.os, self.oactive,
self.params)
[docs]
def get_dyn_state(self, s=None):
"""Return an :class:`~graph_tool.dynamics.LinearNormalState` instance
corresponding to the inferred model, optionally with initial state given
by ``s``."""
h = self.theta.copy()
h.a = np.exp(h.a)
return LinearNormalState(self.u, w=self.x, sigma=h, s=s)
[docs]
class LVBlockState(DynamicsBlockStateBase):
def __init__(self, s, g=None, self_loops=True, sigma=1, **kwargs):
DynamicsBlockStateBase.__init__(self, s, g=g, self_loops=self_loops,
sigma=sigma,
**dict(dict(directed=True), **kwargs))
self.theta_first = False
def _make_state(self):
return libinference.make_lotka_volterra_state(self._state, self.ot,
self.os, self.oactive,
self.params)
[docs]
def get_dyn_state(self, s=None):
"""Return an :class:`~graph_tool.dynamics.LVState` instance
corresponding to the inferred model, optionally with initial state given
by ``s``."""
r = self.theta.copy()
return LVState(self.u, w=self.x, r=r, sigma=self.params["sigma"], s=s)
def pinit(val, g, ptype="v", vtype="double"):
if isinstance(val, PropertyMap):
if val.value_type() == vtype:
return g.own_property(val)
else:
return g.own_property(val.copy(value_type=vtype))
if ptype == "v":
return g.new_vp(vtype, val=val)
if ptype == "e":
return g.new_ep(vtype, val=val)
class TestBlockState(DynamicsBlockStateBase):
def __init__(self, g, p=.9, mu=1, sigma=.1, p_default=1e-6, mu_default=.1,
sigma_default=.01, mu_v=0, sigma_v=1, g_init=None, **kwargs):
self.g_ref = g
if g_init is None:
g_init = Graph(g.num_vertices(), directed=g.is_directed())
self.p = pinit(p, g, "e")
self.mu = pinit(mu, g, "e")
self.sigma = pinit(sigma, g, "e")
self.p_default = p_default
self.mu_default = mu_default
self.sigma_default = sigma_default
self.mu_v = pinit(mu_v, g, "v")
self.sigma_v = pinit(sigma_v, g, "v")
s = g.new_vp("int")
kwargs.pop("s", None)
DynamicsBlockStateBase.__init__(self, s, g=g_init,
directed=g.is_directed(), **kwargs)
def __getstate__(self):
return dict(DynamicsBlockStateBase.__getstate__(self),
g=self.g_ref, p=self.p, mu=self.mu, sigma=self.sigma,
p_default=self.p_default, mu_default=self.mu_default,
sigma_default=self.sigma_default, mu_v=self.mu_v,
sigma_v=self.sigma_v)
def _make_state(self):
return libinference.make_test_state(self.g_ref._Graph__graph,
self._state, vars(self))