Source code for graph_tool.inference.clique_decomposition

#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# graph_tool -- a general graph manipulation python module
#
# Copyright (C) 2006-2024 Tiago de Paula Peixoto <tiago@skewed.de>
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from .. import Graph, GraphView, _get_rng, Vector_double, Vector_int64_t

from .. dl_import import dl_import
dl_import("from . import libgraph_tool_inference as libinference")

from . util import *
from . base_states import *
from . blockmodel import lbinom

from .. topology import max_cliques

from scipy.special import gammaln
from itertools import combinations

import numpy as np

[docs] class CliqueState(object): r"""The state of a clique decomposition of a given graph. Parameters ---------- g : :class:`~graph_tool.Graph` Graph to be modelled. init_edges : ``bool`` (optional, default: ``False``) If ``True``, the state will be initialized with edges being occupied. init_max_cliques : ``bool`` (optional, default: ``True``) If ``True``, the state will be initialized with the maximal cliques. init_list : ``dict`` (optional, default: ``{}``) If given, this will give the initialization state. Keys are the clique nodes and values are the counts. Examples -------- .. testsetup:: clique_decomposition gt.seed_rng(42) np.random.seed(42) .. doctest:: clique_decomposition >>> g = gt.collection.data["polbooks"] >>> state = gt.CliqueState(g) >>> state.mcmc_sweep(niter=10000) (...) >>> cliques = [] >>> for v in state.f.vertices(): # iterate through factor graph ... if state.is_fac[v]: ... continue # skip over factors ... print(state.c[v], state.x[v]) # clique occupation ... if state.x[v] > 0: ... cliques.append(state.c[v]) ... if len(cliques) > 10: ... break array([0, 2, 4, 5], dtype=int32) 1 array([0, 1, 3, 5], dtype=int32) 1 array([0, 1, 5, 6], dtype=int32) 1 array([0, 4, 5, 6], dtype=int32) 0 array([2, 5, 7], dtype=int32) 1 array([ 4, 28], dtype=int32) 1 array([ 4, 6, 29], dtype=int32) 1 array([ 4, 30, 31], dtype=int32) 0 array([5, 6, 7], dtype=int32) 0 array([ 7, 71], dtype=int32) 1 array([ 7, 14, 58], dtype=int32) 1 array([ 7, 58, 85], dtype=int32) 0 array([ 7, 30, 58], dtype=int32) 1 array([ 6, 10, 12], dtype=int32) 0 array([ 6, 12, 18], dtype=int32) 1 array([ 8, 12, 13, 32], dtype=int32) 1 References ---------- .. [young-hypergraph-2021] Young, JG., Petri, G., Peixoto, T.P. "Hypergraph reconstruction from network data", Commun Phys 4, 135 (2021), :doi:`10.1038/s42005-021-00637-w`, :arxiv:`2008.04948` """ def __init__(self, g, init_edges=False, init_max_cliques=True, init_list=None): self.g = GraphView(g, directed=False) self.f = Graph(directed=False) # factor graph self.f.set_fast_edge_removal(True) self.is_fac = self.f.new_vp("bool") # is a factor self.c = self.f.new_vp("vector<int32_t>") # coordinates self.is_max = self.f.new_vp("bool") # is a max clique self.x = self.f.new_vp("int") # activation status edges = {} for e in g.edges(): v = self.f.add_vertex() self.is_fac[v] = 1 st = tuple(sorted([int(e.source()), int(e.target())])) self.c[v] = st edges[st] = v for q in max_cliques(g): d = len(q) cs = tuple(sorted(q)) v = self.f.add_vertex() self.c[v] = cs self.is_max[v] = True for s, t in combinations(q, 2): if s > t: s, t = t, s u = edges[(s,t)] self.f.add_edge(v, u) if init_max_cliques: self.x[v] = 1 if init_edges: coords = {} for v in self.f.vertices(): if self.is_fac[v]: continue coords[tuple(self.c[v].a)] = v for e in g.edges(): st = tuple(sorted([int(e.source()), int(e.target())])) if st in coords: v = coords[st] else: v = self.f.add_vertex() self.c[v] = st u = edges[st] self.f.add_edge(v, u) self.x[v] = 1 if init_list is not None: coords = {} for v in self.f.vertices(): if self.is_fac[v]: continue coords[tuple(self.c[v].a)] = v for c, count in init_list.items(): st = tuple(sorted(c)) if st in coords: v = coords[st] else: v = self.f.add_vertex() self.c[v] = st for s, t in combinations(q, 2): if s > t: s, t = t, s u = edges[(s, t)] self.f.add_edge(v, u) self.x[v] = count self.reset_factors() self.lpd = [] self.D = max([len(self.c[v]) for v in self.f.vertices()]) self.N = self.g.num_vertices() self.E = self.g.num_edges() self.reset_Ed() def __copy__(self): return self.copy()
[docs] def copy(self, **kwargs): r"""Copies the state. The parameters override the state properties, and have the same meaning as in the constructor.""" return CliqueState(**dict(self.__getstate__(), **kwargs))
def __getstate__(self): init_list = {} for v in self.f.vertices(): if self.is_fac[v]: continue init_list[tuple(self.c[v])] = self.x[v] return dict(g=self.g, init_list=init_list) def __setstate__(self, state): self.__init__(**state)
[docs] def reset_factors(self): """Reset factor constraints""" for v in self.f.vertices(): if self.is_fac[v] == 1: self.x[v] = sum(self.x[w] for w in v.out_neighbors())
[docs] def reset_Ed(self): """Reset edge counts""" self.Ed = np.zeros(self.D + 1, dtype="int32") for v in self.f.vertices(): if self.is_fac[v]: continue d = len(self.c[v]) self.Ed[d] += self.x[v]
[docs] def get_nEd(self): """Get fraction of edge counts per clique size""" Ed = asarray(self.Ed.copy(), dtype="float") for d in range(2, len(Ed)): Ed[d] *= (d * (d-1)) // 2 Ed /= self.g.num_edges() return Ed
[docs] @copy_state_wrap def entropy(self, **kwargs): """Get the description length, a.k.a. the negative log-likelihood.""" L = 0 for d in range(2, len(self.Ed)): L += libinference.clique_L_over(self.N, d, self.Ed[d], self.D, self.E) L -= gammaln(self.x.fa[np.logical_not(self.is_fac.fa)] + 1).sum() return -L
[docs] @mcmc_sweep_wrap def mcmc_sweep(self, niter=1, beta=1): """Runs ``niter`` iterations of a Metropolis-Hastings MCMC, with inverse temperature ``beta``, to sample clique decompositions. This returns the change in description length and the number of moves.""" return libinference.clique_iter_mh(self.f._Graph__graph, self.c._get_any(), self.x._get_any(), self.is_fac._get_any(), self.is_max._get_any(), self.Ed, self.N, self.E, beta, niter, _get_rng())