#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# graph_tool -- a general graph manipulation python module
#
# Copyright (C) 2006-2025 Tiago de Paula Peixoto <tiago@skewed.de>
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from .. import _prop, Graph, GraphView, _get_rng, PropertyMap, \
EdgePropertyMap, VertexPropertyMap, edge_endpoint_property, Vector_int32_t, \
Vector_size_t
from .. dl_import import dl_import
dl_import("from . import libgraph_tool_inference as libinference")
from . base_states import *
from . blockmodel import *
from . nested_blockmodel import *
from .. generation import graph_merge
import numpy as np
import scipy.special
import collections.abc
[docs]
@entropy_state_signature
class UncertainBaseState(EntropyState):
r"""Base state for uncertain network inference."""
def __init__(self, g, nested=True, state_args={}, bstate=None,
self_loops=False, init_empty=False, max_m=1 << 16,
entropy_args={}):
EntropyState.__init__(self, entropy_args=entropy_args)
self.g = g
state_args = dict(state_args)
if bstate is None:
if init_empty:
self.u = Graph(g.num_vertices(), directed=g.is_directed())
self.eweight = self.u.new_ep("int", val=1)
elif "g" in state_args:
self.u = state_args.pop("g")
self.eweight = state_args.pop("eweight",
self.u.new_ep("int", val=1))
else:
self.u = g.copy()
self.eweight = self.u.new_ep("int", val=1)
else:
self.u = bstate.g.copy()
if nested:
self.eweight = bstate.levels[0].eweight
else:
self.eweight = bstate.eweight
self.eweight = self.u.own_property(self.eweight.copy())
if nested:
bstate = bstate.copy(g=self.u,
state_args=dict(bstate.state_args,
eweight=self.eweight))
else:
bstate = bstate.copy(g=self.u, eweight=self.eweight)
self.u.set_fast_edge_removal()
self.self_loops = self_loops
N = self.u.num_vertices()
if self.u.is_directed():
if self_loops:
M = N * N
else:
M = N * (N - 1)
else:
if self_loops:
M = (N * (N + 1)) / 2
else:
M = (N * (N - 1)) / 2
self.M = M
if bstate is None:
if nested:
state_args["state_args"] = state_args.get("state_args", {})
state_args["state_args"].update(dict(eweight=self.eweight))
self.nbstate = NestedBlockState(self.u, **state_args)
self.bstate = self.nbstate.levels[0]
else:
self.nbstate = None
self.bstate = BlockState(self.u, eweight=self.eweight,
**state_args)
else:
if nested:
self.nbstate = bstate
self.bstate = bstate.levels[0]
else:
self.nbstate = None
self.bstate = bstate
self._entropy_args.update(self.bstate._entropy_args)
self._entropy_args.update(entropy_args)
edges = self.g.get_edges()
edges = np.concatenate((edges,
np.ones(edges.shape,
dtype=edges.dtype) * (N + 1)))
self.slist = Vector_size_t(init=edges[:,0])
self.tlist = Vector_size_t(init=edges[:,1])
self.max_m = max_m
init_q_cache()
def __getstate__(self):
state = EntropyState.__getstate__(self)
return dict(state, g=self.g, nested=self.nbstate is not None,
bstate=(self.nbstate if self.nbstate is not None else self.bstate),
self_loops=self.self_loops, max_m=self.max_m)
def __setstate__(self, state):
self.__init__(**state)
[docs]
def copy(self, **kwargs):
"""Return a copy of the state."""
args = dict(self.__getstate__(), **kwargs)
return type(self)(**args)
def __copy__(self):
return self.copy()
def _gen_eargs(self, args):
ea = self.bstate._get_entropy_args(args, consume=True)
return libinference.uentropy_args(ea)
[docs]
def get_block_state(self):
"""Return the underlying block state, which can be either
:class:`~graph_tool.inference.BlockState` or
:class:`~graph_tool.inference.NestedBlockState`.
"""
if self.nbstate is None:
return self.bstate
else:
return self.nbstate
@copy_state_wrap
def _entropy(self, latent_edges=True, density=False, aE=1., sbm=True, **kwargs):
"""Return the description length, i.e. the negative log-likelihood."""
eargs = self._get_entropy_args(locals())
S = self._state.entropy(eargs)
if sbm:
if self.nbstate is None:
S += self.bstate.entropy(**kwargs)
else:
S += self.nbstate.entropy(**kwargs)
return S
[docs]
def virtual_remove_edge(self, u, v, dm=1, entropy_args={}):
"""Return the difference in description length if edge :math:`(u, v)`
with multiplicity ``dm`` would be removed.
"""
entropy_args = self._get_entropy_args(entropy_args)
return self._state.remove_edge_dS(int(u), int(v), dm, entropy_args)
[docs]
def virtual_add_edge(self, u, v, dm=1, entropy_args={}):
"""Return the difference in description length if edge :math:`(u, v)`
would be added with multiplicity ``dm``.
"""
entropy_args = self._get_entropy_args(entropy_args)
return self._state.add_edge_dS(int(u), int(v), dm, entropy_args)
[docs]
def remove_edge(self, u, v, dm=1):
r"""Remove edge :math:`(u, v)` with multiplicity ``dm``."""
return self._state.remove_edge(int(u), int(v), dm)
[docs]
def add_edge(self, u, v, dm=1):
r"""Add edge :math:`(u, v)` with multiplicity ``dm``."""
return self._state.add_edge(int(u), int(v), dm)
[docs]
def set_state(self, g, w):
r"""Set all edge multiplicities via :class:`~graph_tool.EdgePropertyMap` ``w``."""
if w.value_type() != "int32_t":
w = w.copy("int32_t")
self._state.set_state(g._Graph__graph, w._get_any())
[docs]
@mcmc_sweep_wrap
def edge_mcmc_sweep(self, beta=1, niter=1, verbose=False, **kwargs):
r"""Perform sweeps of a Metropolis-Hastings acceptance-rejection
sampling MCMC to sample latent edges.
Parameters
----------
beta : ``float`` (optional, default: ``np.inf``)
Inverse temperature parameter.
niter : ``int`` (optional, default: ``1``)
Number of sweeps.
verbose : ``boolean`` (optional, default: ``False``)
If ``verbose == True``, detailed information will be displayed.
Returns
-------
dS : ``float``
Entropy difference after the sweeps.
nmoves : ``int``
Number of variables moved.
"""
kwargs = kwargs.copy()
edges_only = kwargs.pop("edges_only", False)
slist = self.slist
tlist = self.tlist
entropy_args = kwargs.pop("entropy_args", {}).copy()
entropy_args = self._get_entropy_args(entropy_args)
debug = kwargs.pop("debug", False)
state = self._state
mcmc_state = DictState(dict(kwargs, **locals()))
if len(kwargs) > 0:
raise ValueError("unrecognized keyword arguments: " +
str(list(kwargs.keys())))
return self._mcmc_sweep(mcmc_state)
#@mcmc_sweep_wrap
[docs]
def sbm_mcmc_sweep(self, multiflip=True, **kwargs):
r"""Perform sweeps of a Metropolis-Hastings acceptance-rejection
sampling MCMC to sample node partitions. The remaining keyword
parameters will be passed to
:meth:`~graph_tool.inference.BlockState.mcmc_sweep` or
:meth:`~graph_tool.inference.BlockState.multiflip_mcmc_sweep`, if
``multiflip=True``.
"""
if self.nbstate is None:
self.bstate._clear_egroups()
else:
self.nbstate._clear_egroups()
bstate = self.nbstate
if bstate is None:
bstate = self.bstate
if multiflip:
return bstate.multiflip_mcmc_sweep(**kwargs)
else:
return bstate.mcmc_sweep(**kwargs)
[docs]
def mcmc_sweep(self, beta=1, niter=1, pedges=.5, multiflip=True,
edge_mcmc_args=dict(), sbm_mcmc_args=dict(), **kwargs):
r"""Perform sweeps of a Metropolis-Hastings acceptance-rejection
sampling MCMC to sample network partitions and latent edges. The
parameter ``pedges`` controls the probability with which edge moves will
be attempted, instead of partition moves. The remaining keyword
parameters will be passed to
:meth:`~graph_tool.inference.BlockState.mcmc_sweep` or
:meth:`~graph_tool.inference.BlockState.multiflip_mcmc_sweep`, if
``multiflip=True``.
"""
if np.random.random() < pedges:
return self.edge_mcmc_sweep(**dict(dict(dict(beta=beta, niter=niter),
**edge_mcmc_args), **kwargs))
else:
return self.sbm_mcmc_sweep(**dict(dict(dict(beta=beta, niter=niter,
multiflip=multiflip),
**sbm_mcmc_args), **kwargs))
[docs]
def get_edge_prob(self, u, v, entropy_args={}, epsilon=1e-8):
r"""Return conditional posterior log-probability of edge :math:`(u,v)`."""
ea = self._get_entropy_args(entropy_args)
return self._state.get_edge_prob(u, v, ea, epsilon)
[docs]
def get_edges_prob(self, elist, entropy_args={}, epsilon=1e-8):
r"""Return conditional posterior log-probability of an edge list, with
shape :math:`(E,2)`."""
ea = self._get_entropy_args(entropy_args)
elist = np.asarray(elist, dtype="uint64")
probs = np.zeros(elist.shape[0])
self._state.get_edges_prob(elist, probs, ea, epsilon)
return probs
[docs]
def get_graph(self):
r"""Return the current inferred graph."""
if self.self_loops:
u = GraphView(self.u, efilt=self.eweight.fa > 0)
else:
es = edge_endpoint_property(self.u, self.u.vertex_index, "source")
et = edge_endpoint_property(self.u, self.u.vertex_index, "target")
u = GraphView(self.u, efilt=np.logical_and(self.eweight.fa > 0,
es.fa != et.fa))
return u
[docs]
def collect_marginal(self, g=None):
r"""Collect marginal inferred network during MCMC runs.
Parameters
----------
g : :class:`~graph_tool.Graph` (optional, default: ``None``)
Previous marginal graph.
Returns
-------
g : :class:`~graph_tool.Graph`
New marginal graph, with internal edge :class:`~graph_tool.EdgePropertyMap`
``"eprob"``, containing the marginal probabilities for each edge.
Notes
-----
The posterior marginal probability of an edge :math:`(i,j)` is defined as
.. math::
\pi_{ij} = \sum_{\boldsymbol A}A_{ij}P(\boldsymbol A|\boldsymbol D)
where :math:`P(\boldsymbol A|\boldsymbol D)` is the posterior
probability given the data.
"""
if g is None:
g = Graph(directed=self.g.is_directed())
g.add_vertex(self.g.num_vertices())
if "count" not in g.gp:
g.gp.count = g.new_gp("int", 0)
if "count" not in g.ep:
g.ep.count = g.new_ep("int")
if "eprob" not in g.ep:
g.ep.eprob = g.new_ep("double")
u = self.get_graph()
try:
g.set_fast_edge_lookup(True)
graph_merge(g, u, in_place=True, simple=True,
props=dict(sum=[(g.ep.count, u.new_ep("int", val=1))]))
finally:
g.gp.count += 1
g.ep.eprob.fa = g.ep.count.fa
g.ep.eprob.fa /= g.gp.count
return g
[docs]
def collect_marginal_multigraph(self, g=None):
r"""Collect marginal latent multigraph during MCMC runs.
Parameters
----------
g : :class:`~graph_tool.Graph` (optional, default: ``None``)
Previous marginal multigraph.
Returns
-------
g : :class:`~graph_tool.Graph`
New marginal graph, with internal edge
:class:`~graph_tool.EdgePropertyMap` ``"wcount"``,
containing the edge multiplicity counts.
Notes
-----
The mean posterior marginal multiplicity distribution of a multi-edge
:math:`(i,j)` is defined as
.. math::
\pi_{ij}(w) = \sum_{\boldsymbol G}\delta_{w,G_{ij}}P(\boldsymbol G|\boldsymbol D)
where :math:`P(\boldsymbol G|\boldsymbol D)` is the posterior
probability of a multigraph :math:`\boldsymbol G` given the data.
"""
if g is None:
g = Graph(directed=self.g.is_directed())
g.add_vertex(self.g.num_vertices())
if "wcount" not in g.ep:
g.ep.wcount = g.new_ep("vector<int32_t>")
if "wcount" not in g.gp:
g.gp.wcount = g.new_gp("int", 0)
try:
g.set_fast_edge_lookup(True)
graph_merge(g, self.u, in_place=True, simple=True,
props=dict(idx_inc=[(g.ep.wcount, self.eweight)]))
finally:
g.gp.wcount += 1
return g
[docs]
class UncertainBlockState(UncertainBaseState):
r"""Inference state of an uncertain graph, using the stochastic block model as a
prior.
Parameters
----------
g : :class:`~graph_tool.Graph`
Measured graph.
q : :class:`~graph_tool.EdgePropertyMap`
Edge probabilities in range :math:`[0,1]`.
q_default : ``float`` (optional, default: ``0.``)
Non-edge probability in range :math:`[0,1]`.
nested : ``boolean`` (optional, default: ``True``)
If ``True``, a :class:`~graph_tool.inference.NestedBlockState`
will be used, otherwise
:class:`~graph_tool.inference.BlockState`.
state_args : ``dict`` (optional, default: ``{}``)
Arguments to be passed to
:class:`~graph_tool.inference.NestedBlockState` or
:class:`~graph_tool.inference.BlockState`.
bstate : :class:`~graph_tool.inference.NestedBlockState` or :class:`~graph_tool.inference.BlockState` (optional, default: ``None``)
If passed, this will be used to initialize the block state
directly.
self_loops : bool (optional, default: ``False``)
If ``True``, it is assumed that the uncertain graph can contain
self-loops.
References
----------
.. [peixoto-reconstructing-2018] Tiago P. Peixoto, "Reconstructing
networks with unknown and heterogeneous errors", Phys. Rev. X 8
041011 (2018). :doi:`10.1103/PhysRevX.8.041011`, :arxiv:`1806.07956`
"""
def __init__(self, g, q, q_default=0., nested=True, state_args={},
bstate=None, self_loops=False, **kwargs):
super().__init__(g, nested=nested, state_args=state_args, bstate=bstate,
self_loops=self_loops, **kwargs)
self._q = q
self._q_default = q_default
self.p = (q.fa.sum() + (self.M - g.num_edges()) * q_default) / self.M
self.q = self.g.new_ep("double", vals=np.log(q.fa) - np.log1p(-q.fa))
self.q.fa -= np.log(self.p) - np.log1p(-self.p)
if q_default > 0:
self.q_default = np.log(q_default) - np.log1p(q_default)
self.q_default -= np.log(self.p) - np.log1p(-self.p)
else:
self.q_default = -np.inf
self.S_const = (np.log1p(-q.fa[q.fa<1]).sum() +
np.log1p(-q_default) * (self.M - self.g.num_edges())
- self.M * np.log1p(-self.p))
self._state = libinference.make_uncertain_state(self.bstate._state,
self)
def __getstate__(self):
state = super().__getstate__()
return dict(state, q=self._q, q_default=self._q_default)
def __repr__(self):
return "<UncertainBlockState object with %s, at 0x%x>" % \
(self.nbstate if self.nbstate is not None else self.bstate,
id(self))
def _mcmc_sweep(self, mcmc_state):
return libinference.mcmc_uncertain_sweep(mcmc_state,
self._state,
_get_rng())
[docs]
class LatentMultigraphBlockState(UncertainBaseState):
r"""Inference state of an erased Poisson multigraph, using the stochastic
block model as a prior.
Parameters
----------
g : :class:`~graph_tool.Graph`
Measured graph.
nested : ``boolean`` (optional, default: ``True``)
If ``True``, a :class:`~graph_tool.inference.NestedBlockState`
will be used, otherwise
:class:`~graph_tool.inference.BlockState`.
state_args : ``dict`` (optional, default: ``{}``)
Arguments to be passed to
:class:`~graph_tool.inference.NestedBlockState` or
:class:`~graph_tool.inference.BlockState`.
bstate : :class:`~graph_tool.inference.NestedBlockState` or :class:`~graph_tool.inference.BlockState` (optional, default: ``None``)
If passed, this will be used to initialize the block state
directly.
self_loops : bool (optional, default: ``False``)
If ``True``, it is assumed that the uncertain graph can contain
self-loops.
References
----------
.. [peixoto-latent-2020] Tiago P. Peixoto, "Latent Poisson models for
networks with heterogeneous density", Phys. Rev. E 102 012309 (2020)
:doi:`10.1103/PhysRevE.102.012309`, :arxiv:`2002.07803`
"""
def __init__(self, g, nested=True, state_args={},
bstate=None, self_loops=False, **kwargs):
super().__init__(g, nested=nested, state_args=state_args, bstate=bstate,
self_loops=self_loops, **kwargs)
self.q = self.g.new_ep("double", val=np.inf)
self.q_default = -np.inf
self.S_const = 0
self._state = libinference.make_uncertain_state(self.bstate._state,
self)
def __repr__(self):
return "<LatentMultigraphBlockState object with %s, at 0x%x>" % \
(self.nbstate if self.nbstate is not None else self.bstate,
id(self))
def _mcmc_sweep(self, mcmc_state):
mcmc_state.edges_only = True
return libinference.mcmc_uncertain_sweep(mcmc_state,
self._state,
_get_rng())
[docs]
class MeasuredBlockState(UncertainBaseState):
r"""Inference state of a measured graph, using the stochastic block model as a
prior.
Parameters
----------
g : :class:`~graph_tool.Graph`
Measured graph.
n : :class:`~graph_tool.EdgePropertyMap`
Edge property map of type ``int``, containing the total number of
measurements for each edge.
x : :class:`~graph_tool.EdgePropertyMap`
Edge property map of type ``int``, containing the number of
positive measurements for each edge.
n_default : ``int`` (optional, default: ``1``)
Total number of measurements for each non-edge.
x_default : ``int`` (optional, default: ``0``)
Total number of positive measurements for each non-edge.
lp : ``float`` (optional, default: ``NaN``)
Log-probability of missing edges (false negatives). If given as ``NaN``,
it is assumed this is an unknown sampled from a Beta distribution, with
hyperparameters given by ``fn_params`. Otherwise the values of
``fn_params`` are ignored.
lq : ``float`` (optional, default: ``NaN``)
Log-probability of spurious edges (false positives). If given as
``NaN``, it is assumed this is an unknown sampled from a Beta
distribution, with hyperparameters given by ``fp_params`. Otherwise the
values of ``fp_params`` are ignored.
fn_params : ``dict`` (optional, default: ``dict(alpha=1, beta=1)``)
Beta distribution hyperparameters for the probability of missing
edges (false negatives).
fp_params : ``dict`` (optional, default: ``dict(mu=1, nu=1)``)
Beta distribution hyperparameters for the probability of spurious
edges (false positives).
nested : ``boolean`` (optional, default: ``True``)
If ``True``, a :class:`~graph_tool.inference.NestedBlockState`
will be used, otherwise
:class:`~graph_tool.inference.BlockState`.
state_args : ``dict`` (optional, default: ``{}``)
Arguments to be passed to
:class:`~graph_tool.inference.NestedBlockState` or
:class:`~graph_tool.inference.BlockState`.
bstate : :class:`~graph_tool.inference.NestedBlockState` or :class:`~graph_tool.inference.BlockState` (optional, default: ``None``)
If passed, this will be used to initialize the block state
directly.
self_loops : bool (optional, default: ``False``)
If ``True``, it is assumed that the uncertain graph can contain
self-loops.
References
----------
.. [peixoto-reconstructing-2018] Tiago P. Peixoto, "Reconstructing
networks with unknown and heterogeneous errors", Phys. Rev. X 8
041011 (2018). :doi:`10.1103/PhysRevX.8.041011`, :arxiv:`1806.07956`
"""
def __init__(self, g, n, x, n_default=1, x_default=0, lp=np.nan,
lq=np.nan, fn_params=dict(alpha=1, beta=1),
fp_params=dict(mu=1, nu=1), nested=True,
state_args={}, bstate=None, self_loops=False, **kwargs):
super().__init__(g, nested=nested, state_args=state_args, bstate=bstate,
**kwargs)
self.n = n
self.x = x
self.n_default = n_default
self.x_default = x_default
self.alpha = fn_params.get("alpha", 1)
self.beta = fn_params.get("beta", 1)
self.mu = fp_params.get("mu", 1)
self.nu = fp_params.get("nu", 1)
self.lp = lp
self.lq = lq
self._state = libinference.make_measured_state(self.bstate._state,
self)
def __getstate__(self):
state = super().__getstate__()
return dict(state, n=self.n, x=self.x, n_default=self.n_default,
x_default=self.x_default,
fn_params=dict(alpha=self.alpha, beta=self.beta),
fp_params=dict(mu=self.mu, nu=self.nu), lp=self.lp,
lq=self.lq)
def __repr__(self):
return "<MeasuredBlockState object with %s, at 0x%x>" % \
(self.nbstate if self.nbstate is not None else self.bstate,
id(self))
def _mcmc_sweep(self, mcmc_state):
return libinference.mcmc_measured_sweep(mcmc_state,
self._state,
_get_rng())
[docs]
def set_hparams(self, alpha, beta, mu, nu):
"""Set edge and non-edge hyperparameters."""
self._state.set_hparams(alpha, beta, mu, nu)
self.alpha = alpha
self.beta = beta
self.mu = mu
self.nu = nu
[docs]
def get_p_posterior(self):
"""Get beta distribution parameters for the posterior probability of missing edges."""
T = self._state.get_T()
M = self._state.get_M()
return M - T + self.alpha, T + self.beta
[docs]
def get_q_posterior(self):
"""Get beta distribution parameters for the posterior probability of spurious edges."""
N = self._state.get_N()
X = self._state.get_X()
T = self._state.get_T()
M = self._state.get_M()
return X - T + self.mu, N - X - (M - T) + self.nu
[docs]
class MixedMeasuredBlockState(UncertainBaseState):
r"""Inference state of a measured graph with heterogeneous errors, using the
stochastic block model as a prior.
Parameters
----------
g : :class:`~graph_tool.Graph`
Measured graph.
n : :class:`~graph_tool.EdgePropertyMap`
Edge property map of type ``int``, containing the total number of
measurements for each edge.
x : :class:`~graph_tool.EdgePropertyMap`
Edge property map of type ``int``, containing the number of
positive measurements for each edge.
n_default : ``int`` (optional, default: ``1``)
Total number of measurements for each non-edge.
x_default : ``int`` (optional, default: ``1``)
Total number of positive measurements for each non-edge.
fn_params : ``dict`` (optional, default: ``dict(alpha=1, beta=10)``)
Beta distribution hyperparameters for the probability of missing
edges (false negatives).
fp_params : ``dict`` (optional, default: ``dict(mu=1, nu=10)``)
Beta distribution hyperparameters for the probability of spurious
edges (false positives).
nested : ``boolean`` (optional, default: ``True``)
If ``True``, a :class:`~graph_tool.inference.NestedBlockState`
will be used, otherwise
:class:`~graph_tool.inference.BlockState`.
state_args : ``dict`` (optional, default: ``{}``)
Arguments to be passed to
:class:`~graph_tool.inference.NestedBlockState` or
:class:`~graph_tool.inference.BlockState`.
bstate : :class:`~graph_tool.inference.NestedBlockState` or :class:`~graph_tool.inference.BlockState` (optional, default: ``None``)
If passed, this will be used to initialize the block state
directly.
self_loops : bool (optional, default: ``False``)
If ``True``, it is assumed that the uncertain graph can contain
self-loops.
References
----------
.. [peixoto-reconstructing-2018] Tiago P. Peixoto, "Reconstructing
networks with unknown and heterogeneous errors", Phys. Rev. X 8
041011 (2018). :doi:`10.1103/PhysRevX.8.041011`, :arxiv:`1806.07956`
"""
def __init__(self, g, n, x, n_default=1, x_default=0,
fn_params=dict(alpha=1, beta=10), fp_params=dict(mu=1, nu=10),
nested=True, state_args={}, bstate=None,
self_loops=False, **kwargs):
super().__init__(g, nested=nested, state_args=state_args, bstate=bstate,
**kwargs)
self.n = n
self.x = x
self.n_default = n_default
self.x_default = x_default
self.alpha = fn_params.get("alpha", 1)
self.beta = fn_params.get("beta", 10)
self.mu = fp_params.get("mu", 1)
self.nu = fp_params.get("nu", 10)
self._state = None
self.q = self.g.new_ep("double")
self.sync_q()
self._state = libinference.make_uncertain_state(self.bstate._state,
self)
[docs]
def sync_q(self):
ra, rb = self.transform(self.n.fa, self.x.fa)
self.q.fa = ra - rb
dra, drb = self.transform(self.n_default, self.x_default)
self.q_default = dra - drb
self.S_const = (self.M - self.g.num_edges()) * drb + rb.sum()
if self._state is not None:
self._state.set_q_default(self.q_default)
self._state.set_S_const(self.S_const)
[docs]
def set_hparams(self, alpha, beta, mu, nu):
"""Set edge and non-edge hyperparameters."""
self.alpha = alpha
self.beta = beta
self.mu = mu
self.nu = nu
self.sync_q()
def __getstate__(self):
state = super().__getstate__()
return dict(state, n=self.n, x=self.x, n_default=self.n_default,
x_default=self.x_default,
fn_params=dict(alpha=self.alpha, beta=self.beta),
fp_params=dict(mu=self.mu, nu=self.nu))
def __repr__(self):
return "<MixedMeasuredBlockState object with %s, at 0x%x>" % \
(self.nbstate if self.nbstate is not None else self.bstate,
id(self))
[docs]
@mcmc_sweep_wrap
def h_mcmc_step(self, hstep=1, **kwargs):
dS = nt = nm = 0
niter = kwargs.pop("niter", 1)
latent_edges = kwargs.pop("entropy_args", {}).get("latent_edges", True)
if len(kwargs) > 0:
raise ValueError("unrecognized keyword arguments: " +
str(list(kwargs.keys())))
for i in range(niter):
hs = [self.alpha, self.beta, self.mu, self.nu]
j = np.random.randint(len(hs))
f_dh = [max((0, hs[j] - hstep)), hs[j] + hstep]
pf = 1./(f_dh[1] - f_dh[0])
old_hs = hs[j]
hs[j] = f_dh[0] + np.random.random() * (f_dh[1] - f_dh[0])
b_dh = [max((0, hs[j] - hstep)), hs[j] + hstep]
pb = 1./min((1, hs[j]))
density = False
ea = self._gen_eargs(dict(latent_edges=latent_edges,
density=density))
Sb = self._state.entropy(ea)
self.set_hparams(*hs)
Sa = self._state.entropy(ea)
nt += 1
if Sa < Sb or np.random.random() < np.exp(-(Sa-Sb) + np.log(pb) - np.log(pf)):
dS += Sa - Sb
nm +=1
else:
hs[j] = old_hs
self.set_hparams(*hs)
return (dS, nt, nm)
[docs]
def mcmc_sweep(self, pedges=.5, ph=.1, hstep=1, multiflip=True, **kwargs):
r"""Perform sweeps of a Metropolis-Hastings acceptance-rejection sampling MCMC to
sample network partitions and latent edges. The parameter ``pedges``
controls the probability with which edge move will be attempted, instead
of partition moves. The parameter ``ph`` controls the relative
probability with which hyperparamters moves will be attempted, and
``hstep`` is the size of the step.
The remaining keyword parameters will be passed to
:meth:`~graph_tool.inference.BlockState.mcmc_sweep` or
:meth:`~graph_tool.inference.BlockState.multiflip_mcmc_sweep`,
if ``multiflip=True``.
"""
if np.random.random() < ph:
return self.h_mcmc_step(hstep=hstep, niter=kwargs.get("niter", 1),
entropy_args=kwargs.get("entropy_args", {}))
else:
return super().mcmc_sweep(pedges=pedges, multiflip=multiflip,
**kwargs)
def _mcmc_sweep(self, mcmc_state):
return libinference.mcmc_uncertain_sweep(mcmc_state,
self._state,
_get_rng())
[docs]
def marginal_multigraph_entropy(g, ecount):
r"""Compute the entropy of the marginal latent multigraph distribution.
Parameters
----------
g : :class:`~graph_tool.Graph`
Marginal multigraph.
ecount : :class:`~graph_tool.EdgePropertyMap`
Vector-valued edge property map containing the counts of edge
multiplicities.
Returns
-------
eh : :class:`~graph_tool.EdgePropertyMap`
Marginal entropy of edge multiplicities.
Notes
-----
The mean posterior marginal multiplicity distribution of a multi-edge
:math:`(i,j)` is defined as
.. math::
\pi_{ij}(w) = \sum_{\boldsymbol G}\delta_{w,G_{ij}}P(\boldsymbol G|\boldsymbol D)
where :math:`P(\boldsymbol G|\boldsymbol D)` is the posterior
probability of a multigraph :math:`\boldsymbol G` given the data.
The corresponding entropy is therefore given (in nats) by
.. math::
\mathcal{S}_{ij} = -\sum_w\pi_{ij}(w)\ln \pi_{ij}(w).
"""
eh = g.new_ep("double")
libinference.marginal_count_entropy(g._Graph__graph,
_prop("e", g, ecount),
_prop("e", g, eh))
return eh
def marginal_multigraph_sample(g, ews, ecount):
w = g.new_ep("int")
libinference.marginal_multigraph_sample(g._Graph__graph,
_prop("e", g, ews),
_prop("e", g, ecount),
_prop("e", g, w),
_get_rng())
return w
def marginal_multigraph_lprob(g, ews, ecount, ew):
L = libinference.marginal_multigraph_lprob(g._Graph__graph,
_prop("e", g, ews),
_prop("e", g, ecount),
_prop("e", g, ew))
return L
def marginal_graph_sample(g, ep):
w = g.new_ep("int")
libinference.marginal_graph_sample(g._Graph__graph,
_prop("e", g, ep),
_prop("e", g, w),
_get_rng())
return w
def marginal_graph_lprob(g, ep, w):
L = libinference.marginal_graph_lprob(g._Graph__graph,
_prop("e", g, ep),
_prop("e", g, w))
return L