Source code for graph_tool.inference.latent_layers

#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# graph_tool -- a general graph manipulation python module
#
# Copyright (C) 2006-2021 Tiago de Paula Peixoto <tiago@skewed.de>
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from .. import _prop, Graph, GraphView, _get_rng, PropertyMap, \
    edge_endpoint_property, Vector_size_t
from .. generation import generate_triadic_closure
from .. stats import remove_parallel_edges,  remove_self_loops
from .. spectral import adjacency

from .. dl_import import dl_import
dl_import("from . import libgraph_tool_inference as libinference")

from . blockmodel import *
from . nested_blockmodel import *
from . blockmodel import _bm_test
from . uncertain_blockmodel import get_uentropy_args, UncertainBaseState

import numpy.random

[docs]class LatentLayerBaseState(object): r"""Base state for uncertain network inference."""
[docs] def get_ec(self, ew=None): """Return edge property map with layer membership.""" if ew is None: ew = self.ew ec = [] for u, ew in zip(self.us, ew): w = self.g.copy_property(ew, g=u) ec.append(w) return ec
[docs] def collect_marginal(self, gs=None, total=False): r"""Collect marginal inferred network during MCMC runs. Parameters ---------- g : list of :class:`~graph_tool.Graph` (optional, default: ``None``) Previous marginal graphs. Returns ------- g : list :class:`~graph_tool.Graph` New list of marginal graphs, each with internal edge :class:`~graph_tool.EdgePropertyMap` ``"eprob"``, containing the marginal probabilities for each edge. Notes ----- The posterior marginal probability of an edge :math:`(i,j)` is defined as .. math:: \pi_{ij} = \sum_{\boldsymbol A}A_{ij}P(\boldsymbol A|\boldsymbol D) where :math:`P(\boldsymbol A|\boldsymbol D)` is the posterior probability given the data. This function returns a list with the marginal graphs for every layer. """ if gs is None: gs = [] L = len(self.us) if total: L += 1 for l in range(L): g = Graph(directed=self.g.is_directed()) g.add_vertex(self.g.num_vertices()) g.gp.count = g.new_gp("int", 0) g.ep.count = g.new_ep("int") gs.append(g) for l, g in enumerate(gs): if "eprob" not in g.ep: g.ep.eprob = g.new_ep("double") u = self.us[l] if l < len(self.us) else self.g if l == 0: es = edge_endpoint_property(u, u.vertex_index, "source") et = edge_endpoint_property(u, u.vertex_index, "target") u = GraphView(u, efilt=es.fa != et.fa) libinference.collect_marginal(g._Graph__graph, u._Graph__graph, _prop("e", g, g.ep.count)) g.gp.count += 1 g.ep.eprob.fa = g.ep.count.fa g.ep.eprob.fa /= g.gp.count return gs
[docs] def collect_marginal_multigraph(self, gs=None): r"""Collect marginal latent multigraphs during MCMC runs. Parameters ---------- g : list of :class:`~graph_tool.Graph` (optional, default: ``None``) Previous marginal multigraphs. Returns ------- g : list of :class:`~graph_tool.Graph` New marginal multigraphs, each with internal edge :class:`~graph_tool.EdgePropertyMap` ``"w"`` and ``"wcount"``, containing the edge multiplicities and their respective counts. Notes ----- The mean posterior marginal multiplicity distribution of a multi-edge :math:`(i,j)` is defined as .. math:: \pi_{ij}(w) = \sum_{\boldsymbol G}\delta_{w,G_{ij}}P(\boldsymbol G|\boldsymbol D) where :math:`P(\boldsymbol G|\boldsymbol D)` is the posterior probability of a multigraph :math:`\boldsymbol G` given the data. This function returns a list with the marginal graphs for every layer. """ if gs is None or len(gs) != len(self.us): gs = [] for l in range(len(self.us)): g = Graph(directed=self.g.is_directed()) g.add_vertex(self.g.num_vertices()) g.ep.w = g.new_ep("vector<int>") g.ep.wcount = g.new_ep("vector<int>") gs.append(g) for l, g in enumerate(gs): u = self.us[l] ew = self.ew[l] libinference.collect_marginal_count(g._Graph__graph, u._Graph__graph, _prop("e", u, ew), _prop("e", g, g.ep.w), _prop("e", g, g.ep.wcount)) return gs
def _mcmc_sweep(self, mcmc_state): return libinference.mcmc_latent_layers_sweep(mcmc_state, self._state, _get_rng()) def _algo_sweep(self, algo, r=.5, **kwargs): kwargs = kwargs.copy() beta = kwargs.get("beta", 1.) niter = kwargs.get("niter", 1) verbose = kwargs.get("verbose", False) if isinstance(self.bstates[0], NestedBlockState): eargs = self.bstates[0].levels[0]._entropy_args else: eargs = self.bstates[0]._entropy_args dentropy_args = dict(eargs, **kwargs.get("entropy_args", {})) entropy_args = get_uentropy_args(dentropy_args) kwargs.get("entropy_args", {}).pop("latent_edges", None) kwargs.get("entropy_args", {}).pop("density", None) state = self._state mcmc_state = DictState(dict(kwargs, **locals())) if _bm_test(): Si = self.entropy(**dentropy_args) if numpy.random.random() < r: for s in self.bstates: s._clear_egroups() dS, nattempts, nmoves = self._mcmc_sweep(mcmc_state) else: bstate = numpy.random.choice(self.bstates) bstate._clear_egroups() dS, nattempts, nmoves = algo(bstate, **dict(kwargs, test=False)) if _bm_test(): Sf = self.entropy(**dentropy_args) assert math.isclose(dS, (Sf - Si), abs_tol=1e-8), \ "inconsistent entropy delta %g (%g): %s" % (dS, Sf - Si, str(dentropy_args)) return dS, nattempts, nmoves
[docs] def mcmc_sweep(self, r=.5, multiflip=True, **kwargs): r"""Perform sweeps of a Metropolis-Hastings acceptance-rejection sampling MCMC to sample network partitions and latent edges. The parameter ``r`` controls the probability with which edge move will be attempted, instead of partition moves. The remaining keyword parameters will be passed to :meth:`~graph_tool.inference.blockmodel.BlockState.mcmc_sweep` or :meth:`~graph_tool.inference.blockmodel.BlockState.multiflip_mcmc_sweep`, if ``multiflip=True``. """ if multiflip: return self._algo_sweep(lambda s, **kw: s.multiflip_mcmc_sweep(**kw), r=r, **kwargs) else: return self._algo_sweep(lambda s, **kw: s.mcmc_sweep(**kw), r=r, **kwargs)
[docs] def multiflip_mcmc_sweep(self, **kwargs): r"""Alias for :meth:`~LatentLayerBaseState.mcmc_sweep` with ``multiflip=True``.""" return self.mcmc_sweep(multiflip=True, **kwargs)
[docs]class LatentClosureBlockState(LatentLayerBaseState): r"""Inference state of the stochastic block model with latent triadic closure edges. Parameters ---------- g : :class:`~graph_tool.Graph` Observed graph. L : ``int`` (optional, default: ``1``) Maximum number of triadic closure generations. b : :class:`~graph_tool.VertexPropertyMap` (optional, default: ``None``) Inital partition (or hierarchical partition ``nested=True``). aE : ``float`` (optional, default: ``NaN``) Expected total number of edges used in prior. If ``NaN``, a flat prior will be used instead. nested : ``boolean`` (optional, default: ``True``) If ``True``, a :class:`~graph_tool.inference.nested_blockmodel.NestedBlockState` will be used, otherwise :class:`~graph_tool.inference.blockmodel.BlockState`. state_args : ``dict`` (optional, default: ``{}``) Arguments to be passed to :class:`~graph_tool.inference.nested_blockmodel.NestedBlockState` or :class:`~graph_tool.inference.blockmodel.BlockState`. g_orig : :class:`~graph_tool.Graph` (optional, default: ``None``) Original graph, if ``g`` is used to initialize differently from a graph with no triadic closure edges. ew : list of :class:`~graph_tool.EdgePropertyMap` (optional, default: ``None``) List of edge property maps of ``g``, containing the initial weights (counts) at each triadic generation. ex : list of :class:`~graph_tool.EdgePropertyMap` (optional, default: ``None``) List of edge property maps of ``g``, each containing a list of integers with the ego graph memberships of every edge, for every triadic generation. References ---------- .. [peixoto-disentangling-2021] Tiago P. Peixoto, "Disentangling homophily, community structure and triadic closure in networks", :arxiv:`2101.02510` """ def __init__(self, g, L=1, b=None, aE=numpy.nan, nested=True, state_args={}, g_orig=None, ew=None, ex=None, **kwargs): self.g = g self.us = [] self.ew = [] self.ex = [] if ew is None: ew = [g.new_ep("int", val=1)] + [g.new_ep("int", val=0) for l in range(L)] if ex is None: ex = [g.new_ep("vector<int>") for l in range(L+1)] for w, x in zip(ew, ex): u = GraphView(g, efilt=w.fa > 0) u.ep.w = w u.ep.x = x u = Graph(u, prune=True) self.us.append(u) self.ew.append(u.ep.w) self.ex.append(u.ep.x) self.nested = nested self.state_args = state_args if nested: self.bstate = NestedBlockState(self.us[0], eweight=self.ew[0], bs=b, **state_args) self.ew[0] = self.bstate.levels[0].eweight self.b = self.bstate.levels[0].b else: self.bstate = BlockState(self.us[0], eweight=self.ew[0], b=b, **state_args) self.ew[0] = self.bstate.eweight self.b = self.bstate.b if nested: self.pstate = self.bstate.levels[0] else: self.pstate = self.bstate self.aE = aE if numpy.isnan(aE): self.E_prior = False else: self.E_prior = True if g_orig is None: self.g_orig = g self.g = g = g.copy() else: self.g_orig = g_orig self.og = self.g._get_any() self.eweight = self.g.new_ep("int") self.oa = [u._get_any() for u in self.us] self.oaw = [w._get_any() for w in self.ew] self.ox = [x._get_any() for x in self.ex] self.L = len(self.us) self.m = [u.new_ep("vector<int>") for u in self.us] self.om = [m._get_any() for m in self.m] self.M = [u.new_vp("int") for u in self.us] self.oM = [M._get_any() for M in self.M] self.E = [u.new_vp("int") for u in self.us] self.oE = [E._get_any() for E in self.E] self.bstates = [self.bstate] self.measured = kwargs.get("measured", False) self.ag_orig = self.g_orig._get_any() self.n = kwargs.get("n", self.g_orig.new_ep("int", 1)) self.x = kwargs.get("x", self.g_orig.new_ep("int", 1)) self.n_default = kwargs.get("n_default", 1) self.x_default = kwargs.get("x_default", 0) fn_params = kwargs.get("fn_params", {}) fp_params = kwargs.get("fp_params", {}) self.alpha = fn_params.get("alpha", 1) self.beta = fn_params.get("beta", 1) self.mu = fp_params.get("mu", 1) self.nu = fp_params.get("nu", 1) self.self_loops = True if nested: bstate = self.bstate.levels[0]._state else: bstate = self.bstate._state ret = libinference.make_latent_closure_state(bstate, self.pstate._state, self, self.L) self._cstates = ret[:self.L] self._state = ret[-1] cstate = self._cstates[0] if nested: bstate = self.bstate.levels[0]._state else: bstate = self.bstate._state pstate = self.pstate._state def __getstate__(self): return dict(g=self.g, L=self.L-1, b=self.bstate.get_bs() if self.nested else self.bstate.b.copy(), aE=self.aE, nested=self.nested, state_args=self.state_args, g_orig=self.g_orig, ew=self.get_ec(self.ew), ex=self.get_ec(self.ex)) def __setstate__(self, state): self.__init__(**state)
[docs] def copy(self, **kwargs): """Return a copy of the state.""" return LatentClosureBlockState(**dict(self.__getstate__(), **kwargs))
def __copy__(self): return self.copy() def __repr__(self): return "<LatentClosureBlockState object with (%s) closure edges, and %s, at 0x%x>" % \ (", ".join([str(w.fa.sum()) for w in self.ew[1:]]), repr(self.bstate), id(self))
[docs] def entropy(self, latent_edges=True, density=True, **kwargs): """Return the entropy, i.e. negative log-likelihood.""" S = self._state.entropy(latent_edges, density) S += self.bstates[0].entropy(**kwargs) for s in self._cstates[1:]: S += s.entropy() if kwargs.get("test", True) and _bm_test(): args = kwargs.copy() assert not isnan(S) and not isinf(S), \ "invalid entropy %g (%s) " % (S, str(args)) args["test"] = False state_copy = self.copy() Salt = state_copy.entropy(latent_edges, density, **args) assert math.isclose(S, Salt, abs_tol=1e-8), \ "entropy discrepancy after copying (%g %g %g)" % (S, Salt, S - Salt) return S
[docs] def sample_graph(self, sample_sbm=True, canonical_sbm=False, sample_params=True, canonical_closure=True): """Sample graph from inferred model. Parameters ---------- sample_sbm : ``boolean`` (optional, default: ``True``) If ``True``, the substrate network will be sampled anew from the SBM parameters. Otherwise, it will be the same as the current posterior state. canonical_sbm : ``boolean`` (optional, default: ``False``) If ``True``, the canonical SBM will be used, otherwise the microcanonical SBM will be used. sample_params : ``bool`` (optional, default: ``True``) If ``True``, and ``canonical_sbm == True`` the count parameters (edges between groups and node degrees) will be sampled from their posterior distribution conditioned on the actual state. Otherwise, their maximum-likelihood values will be used. canonical_closure : ``boolean`` (optional, default: ``True``) If ``True``, the canonical version of triadic clousre will be used (i.e. conditioned on a probability), otherwise the microcanonical version will be used (i.e. conditional on the count number). Returns ------- u : list :class:`~graph_tool.Graph` Sampled graph, with internal edge :class:`~graph_tool.EdgePropertyMap` ``"gen"``, containing the triadic generation of each edge. """ if sample_sbm: if self.nested: bstate = self.bstate.levels[0] else: bstate = self.bstate u = bstate.sample_graph(self_loops=False, multigraph=False, canonical=canonical_sbm, sample_params=sample_params) else: u = self.us[0].copy() u.ep.gen = u.new_ep("int") for l, (g, w) in enumerate(zip(self.us[1:], self.ew[1:])): t = u.own_property(self.E[l + 1]) if canonical_closure: M = self.M[l + 1] t = t.copy("double") idx = t.a > 0 t.fa[idx] = numpy.random.beta(t.a[idx] + 1, (M.a - t.a)[idx] + 1) if t.a.sum() == 0: break old = u.new_ep("bool", True) curr = u.new_ep("bool", vals=u.ep.gen.fa == l) generate_triadic_closure(u, curr=curr, t=t, probs=canonical_closure) new = GraphView(u, efilt=numpy.logical_not(old.fa)) remove_parallel_edges(new) gen = new.own_property(u.ep.gen) gen.fa = l + 1 return u
def _mcmc_sweep(self, mcmc_state): return libinference.mcmc_latent_closure_sweep(mcmc_state, self._state, _get_rng()) def _algo_sweep(self, algo, r=.5, **kwargs): kwargs = kwargs.copy() beta = kwargs.get("beta", 1.) niter = kwargs.get("niter", 1) verbose = kwargs.get("verbose", False) if isinstance(self.bstates[0], NestedBlockState): eargs = self.bstates[0].levels[0]._entropy_args else: eargs = self.bstates[0]._entropy_args dentropy_args = dict(eargs, **kwargs.get("entropy_args", {})) entropy_args = get_uentropy_args(dentropy_args) kwargs.get("entropy_args", {}).pop("latent_edges", None) kwargs.get("entropy_args", {}).pop("density", None) state = self._state mcmc_state = DictState(dict(kwargs, **locals())) if _bm_test(): Si = self.entropy(**dentropy_args) if numpy.random.random() < r: for s in self.bstates: s._clear_egroups() mcmc_state.niter *= len(self.us) dS, nattempts, nmoves = self._mcmc_sweep(mcmc_state) else: bstate = self.bstates[0] bstate._clear_egroups() dS, nattempts, nmoves = algo(bstate, **dict(kwargs, test=False)) if _bm_test(): Sf = self.entropy(**dentropy_args) assert math.isclose(dS, (Sf - Si), abs_tol=1e-8), \ "inconsistent entropy delta %g (%g): %s" % (dS, Sf - Si, str(dentropy_args)) return dS, nattempts, nmoves
[docs] def collect_marginal(self, gs=None): r"""Collect marginal inferred network during MCMC runs. Parameters ---------- g : list of :class:`~graph_tool.Graph` (optional, default: ``None``) Previous marginal graphs. Returns ------- g : list :class:`~graph_tool.Graph` New list of marginal graphs, each with internal :class:`~graph_tool.EdgePropertyMap` ``"eprob"``, containing the marginal probabilities for each edge, as well as :class:`~graph_tool.VertexPropertyMap` ``"t"``, ``"m"``, ``"c"``, containing the average number of closures, open triads, and fraction of closed triads on each node. Notes ----- The posterior marginal probability of an edge :math:`(i,j)` is defined as .. math:: \pi_{ij} = \sum_{\boldsymbol A}A_{ij}P(\boldsymbol A|\boldsymbol D) where :math:`P(\boldsymbol A|\boldsymbol D)` is the posterior probability given the data. This function returns a list with the marginal graphs for every layer. """ gs = LatentLayerBaseState.collect_marginal(self, gs, total=self.measured) for l in range(len(self.us)): E = self.E[l] M = self.M[l] u = gs[l] tsum = u.vp.get("tsum", None) if tsum is None: tsum = u.vp.tsum = u.new_vp("int") u.vp.msum = u.new_vp("int") u.vp.t = u.new_vp("double") u.vp.m = u.new_vp("double") u.vp.csum = u.new_vp("double") u.vp.c = u.new_vp("double") msum = u.vp.msum t = u.vp.t m = u.vp.m csum = u.vp.csum c = u.vp.c tsum.a += E.a msum.a += M.a idx = M.a > 0 csum.a[idx] += E.a[idx] / M.a[idx] t.a = tsum.a / u.gp.count m.a = msum.a / u.gp.count c.a = csum.a / u.gp.count return gs
[docs]class MeasuredClosureBlockState(LatentClosureBlockState, UncertainBaseState): r"""Inference state of a measured graph, using the stochastic block model with triadic closure as a prior. Parameters ---------- g : :class:`~graph_tool.Graph` Measured graph. n : :class:`~graph_tool.EdgePropertyMap` Edge property map of type ``int``, containing the total number of measurements for each edge. x : :class:`~graph_tool.EdgePropertyMap` Edge property map of type ``int``, containing the number of positive measurements for each edge. n_default : ``int`` (optional, default: ``1``) Total number of measurements for each non-edge. x_default : ``int`` (optional, default: ``0``) Total number of positive measurements for each non-edge. L : ``int`` (optional, default: ``1``) Maximum number of triadic closure generations. b : :class:`~graph_tool.VertexPropertyMap` (optional, default: ``None``) Inital partition (or hierarchical partition ``nested=True``). fn_params : ``dict`` (optional, default: ``dict(alpha=1, beta=1)``) Beta distribution hyperparameters for the probability of missing edges (false negatives). fp_params : ``dict`` (optional, default: ``dict(mu=1, nu=1)``) Beta distribution hyperparameters for the probability of spurious edges (false positives). aE : ``float`` (optional, default: ``NaN``) Expected total number of edges used in prior. If ``NaN``, a flat prior will be used instead. nested : ``boolean`` (optional, default: ``True``) If ``True``, a :class:`~graph_tool.inference.nested_blockmodel.NestedBlockState` will be used, otherwise :class:`~graph_tool.inference.blockmodel.BlockState`. state_args : ``dict`` (optional, default: ``{}``) Arguments to be passed to :class:`~graph_tool.inference.nested_blockmodel.NestedBlockState` or :class:`~graph_tool.inference.blockmodel.BlockState`. bstate : :class:`~graph_tool.inference.nested_blockmodel.NestedBlockState` or :class:`~graph_tool.inference.blockmodel.BlockState` (optional, default: ``None``) If passed, this will be used to initialize the block state directly. g_orig : :class:`~graph_tool.Graph` (optional, default: ``None``) Original graph, if ``g`` is used to initialize differently from a graph with no triadic closure edges. ew : list of :class:`~graph_tool.EdgePropertyMap` (optional, default: ``None``) List of edge property maps of ``g``, containing the initial weights (counts) at each triadic generation. ex : list of :class:`~graph_tool.EdgePropertyMap` (optional, default: ``None``) List of edge property maps of ``g``, each containing a list of integers with the ego graph memberships of every edge, for every triadic generation. References ---------- .. [peixoto-disentangling-2021] Tiago P. Peixoto, "Disentangling homophily, community structure and triadic closure in networks", :arxiv:`2101.02510` """ def __init__(self, g, n, x, n_default=1, x_default=0, L=1, b=None, fn_params=dict(alpha=1, beta=1), fp_params=dict(mu=1, nu=1), aE=numpy.nan, nested=True, state_args={}, bstate=None, g_orig=None, ew=None, ex=None, **kwargs): UncertainBaseState.__init__(self, g, nested=nested, state_args=state_args, bstate=bstate, **kwargs) LatentClosureBlockState.__init__(self, g, L=L, b=b, aE=aE, nested=nested, state_args=state_args, g_orig=g_orig, ew=ew, ex=ex, n=n, x=x, n_default=n_default, x_default=x_default, fn_params=fn_params, fp_params=fp_params, measured=True) def __getstate__(self): return dict(g=self.g, n=self.n, x=self.x, n_default=self.n_default, x_default=self.x_default, L=self.L-1, b=self.bstate.get_bs() if self.nested else self.bstate.b.copy(), fn_params=dict(alpha=self.alpha, beta=self.beta), fp_params=dict(mu=self.mu, nu=self.nu), aE=self.aE, nested=self.nested, state_args=self.state_args, g_orig=self.g_orig, ew=self.get_ec(self.ew), ex=self.get_ec(self.ex)) def __setstate__(self, state): self.__init__(**state)
[docs] def copy(self, **kwargs): """Return a copy of the state.""" return MeasuredClosureBlockState(**dict(self.__getstate__(), **kwargs))
def __repr__(self): return "<MeasuredClosureBlockState object with (%s) closure edges, and %s, at 0x%x>" % \ (", ".join([str(w.fa.sum()) for w in self.ew[1:]]), repr(self.bstate), id(self))
[docs] def get_graph(self): r"""Return the current inferred graph.""" es = edge_endpoint_property(self.g, self.g.vertex_index, "source") et = edge_endpoint_property(self.g, self.g.vertex_index, "target") u = GraphView(self.g, efilt=numpy.logical_and(self.eweight.fa > 0, es.fa != et.fa)) return u
[docs] def set_hparams(self, alpha, beta, mu, nu): """Set edge and non-edge hyperparameters.""" self._state.set_hparams(alpha, beta, mu, nu) self.alpha = alpha self.beta = beta self.mu = mu self.nu = nu
[docs] def get_p_posterior(self): """Get beta distribution parameters for the posterior probability of missing edges.""" T = self._state.get_T() M = self._state.get_M() return M - T + self.alpha, T + self.beta
[docs] def get_q_posterior(self): """Get beta distribution parameters for the posterior probability of spurious edges.""" N = self._state.get_N() X = self._state.get_X() T = self._state.get_T() M = self._state.get_M() return X - T + self.mu, N - X - (M - T) + self.nu