#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# graph_tool -- a general graph manipulation python module
#
# Copyright (C) 2006-2025 Tiago de Paula Peixoto <tiago@skewed.de>
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from .. import Graph, PropertyMap, _get_rng, Vector_int32_t, Vector_size_t, \
_parallel
from . blockmodel import DictState
from . base_states import *
from .. dl_import import dl_import
dl_import("from . import libgraph_tool_inference as libinference")
import numpy as np
import math
from functools import wraps
[docs]
class PartitionModeState(object):
r"""The random label model state for a set of labelled partitions, which attempts
to align them with a common group labelling.
Parameters
----------
bs : list of iterables
List of partitions to be aligned. If ``nested=True``, these should be
hierarchical partitions, composed each as a list of partitions.
relabel : ``bool`` (optional, default: ``True``)
If ``True``, an initial alignment of the partitions will be attempted during
instantiation, otherwise they will be incorporated as they are.
nested : ``bool`` (optional, default: ``False``)
If ``True``, the partitions will be assumed to be hierarchical.
converge : ``bool`` (optional, default: ``False``)
If ``True``, the label alignment will be iterated until convergence upon
initialization (otherwise
:meth:`~graph_tool.inference.PartitionModeState.replace_partitions`
needs to be called repeatedly).
References
----------
.. [peixoto-revealing-2021] Tiago P. Peixoto, "Revealing consensus and
dissensus between network partitions", Phys. Rev. X 11 021003 (2021)
:doi:`10.1103/PhysRevX.11.021003`, :arxiv:`2005.13977`
"""
def __init__(self, bs, relabel=True, nested=False, converge=False, **kwargs):
self.bs = {}
self.nested = nested
if len(kwargs) > 0:
self._base = kwargs["base"]
else:
self._base = libinference.PartitionModeState()
for b in bs:
self.add_partition(b, relabel)
if converge:
delta = 1
while np.abs(delta) > 1e-6:
delta = self.replace_partitions()
def __copy__(self):
return self.copy()
[docs]
def copy(self, bs=None):
r"""Copies the state. The parameters override the state properties, and
have the same meaning as in the constructor."""
if bs is None:
bs = list(self.get_nested_partitions().values())
if not self.nested:
bs = [bv[0] for bv in bs]
return PartitionModeState(bs=bs, relabel=False, nested=self.nested)
def __getstate__(self):
bs = list(self.get_nested_partitions().values())
if not self.nested:
bs = [bv[0] for bv in bs]
return dict(bs=bs, nested=self.nested)
def __setstate__(self, state):
self.__init__(**state, relabel=False)
[docs]
def add_partition(self, b, relabel=True):
r"""Adds partition ``b`` to the ensemble, after relabelling it if
``relabel=True``, and returns its index in the population."""
def c(x):
if isinstance(x, PropertyMap):
return x.fa
return x
if self.nested:
b = [Vector_int32_t(init=c(x)) for x in b]
else:
b = [Vector_int32_t(init=c(b))]
i = self._base.add_partition(b, relabel)
self.bs[i] = b
return i
[docs]
def remove_partition(self, i):
r"""Removes partition with index ``i`` from the ensemble."""
self._base.remove_partition(i)
if i in self.bs:
del self.bs[i]
[docs]
def virtual_add_partition(self, b, relabel=True):
r"""Computes the entropy difference (negative log probability) if partition ``b``
were inserted the ensemble, after relabelling it if ``relabel=True``.
"""
if self.nested:
b = [Vector_int32_t(init=x) for x in b]
else:
b = [Vector_int32_t(init=b)]
return self._base.virtual_add_partition(b, relabel)
[docs]
def virtual_remove_partition(self, b, relabel=True):
r"""Computes the entropy difference (negative log probability) if partition ``b``
were removed from the ensemble, after relabelling it if ``relabel=True``.
"""
if self.nested:
b = [Vector_int32_t(init=x) for x in b]
else:
b = [Vector_int32_t(init=b)]
return self._base.virtual_add_partition(b, relabel)
[docs]
def replace_partitions(self):
r"""Removes and re-adds every partition, after relabelling, and returns
the entropy difference (negative log probability)."""
return self._base.replace_partitions(_get_rng())
[docs]
def relabel_partition(self, b):
r"""Returns a relabelled copy of partition ``b``, according to its alignment with
the ensemble."""
if self.nested:
b = [Vector_int32_t(init=x) for x in b]
else:
b = [Vector_int32_t(init=b)]
self._base.relabel_partition(b)
if not self.nested:
return b[0].a.copy()
else:
return [x.a.copy() for x in b]
[docs]
def align_mode(self, mode):
r"""Relabel entire ensemble to align with another ensemble given by ``mode``,
which should be an instance of
:class:`~graph_tool.inference.PartitionModeState`."""
self._base.align_mode(mode._base)
[docs]
def get_partition(self, i):
r"""Returns partition with index ``i``."""
return self._base.get_partition(i)
[docs]
def get_nested_partition(self, i):
r"""Returns nested partition with index ``i``."""
return self._base.get_nested_partition(i)
[docs]
def get_partitions(self):
r"""Returns all partitions."""
return self._base.get_partitions()
[docs]
def get_nested_partitions(self):
r"""Returns all nested partitions."""
return self._base.get_nested_partitions()
[docs]
def relabel(self):
r"""Re-order group labels according to group sizes."""
return self._base.relabel()
@copy_state_wrap
def _entropy(self):
r"""Return the description length (negative joint log-likelihood)."""
return self._base.entropy()
[docs]
def posterior_entropy(self, MLE=True):
r"""Return the entropy of the random label model, using maximum likelihood
estimates for the marginal node probabilities if ```MLE=True```,
otherwise using posterior mean estimates.
"""
return self._base.posterior_entropy(MLE)
[docs]
def posterior_cdev(self, MLE=True):
r"""Return the uncertainty of the mode in the range :math:`[0,1]`, using maximum
likelihood estimates for the marginal node probabilities if
```MLE=True```, otherwise using posterior mean estimates.
"""
return self._base.posterior_cdev(MLE)
[docs]
def posterior_lprob(self, b, MLE=True):
r"""Return the log-probability of partition ``b``, using maximum
likelihood estimates for the marginal node probabilities if
```MLE=True```, otherwise using posterior mean estimates.
"""
if self.nested:
b = [Vector_int32_t(init=x) for x in b]
else:
b = [Vector_int32_t(init=b)]
return self._base.posterior_lprob(b, MLE)
[docs]
def get_coupled_state(self):
r"""Return the instance of :class:`~graph_tool.inference.PartitionModeState`
representing the model at the upper hierarchical level.
"""
base = self._base.get_coupled_state()
if base is None:
return None
return PartitionModeState(bs=None, base=base, nested=self.nested)
[docs]
def get_marginal(self, g):
r"""Return a :class:`~graph_tool.VertexPropertyMap` for :class:`~graph_tool.Graph`
``g``, with ``vector<int>`` values containing the marginal group
membership counts for each node. """
bm = g.new_vp("vector<int>")
self._base.get_marginal(g._Graph__graph, bm._get_any())
return bm
[docs]
def get_max(self, g):
r"""Return a :class:`~graph_tool.VertexPropertyMap` for
:class:`~graph_tool.Graph` ``g``, with ``int`` values containing the
maximum marginal group membership for each node."""
b = g.new_vp("int")
self._base.get_map(g._Graph__graph, b._get_any())
return b
[docs]
def get_max_nested(self):
r"""Return a hierarchical partition as a list of :class:`numpy.ndarray` objects,
containing the maximum marginal group membership for each node in every
level."""
return self._base.get_map_bs()
[docs]
def get_B(self):
r"""Return the total number of labels used."""
return self._base.get_B()
[docs]
def get_M(self):
r"""Return the number of partitions"""
return self._base.get_M()
[docs]
def sample_partition(self, MLE=True):
r"""Sampled a partition from the inferred model, using maximum likelihood
estimates for the marginal node probabilities if ```MLE=True```,
otherwise using posterior mean estimates."""
return self._base.sample_partition(MLE, _get_rng())
[docs]
def sample_nested_partition(self, MLE=True, fix_empty=True):
r"""Sampled a nested partition from the inferred model, using maximum likelihood
estimates for the marginal node probabilities if ```MLE=True```,
otherwise using posterior mean estimates."""
return self._base.sample_nested_partition(MLE, fix_empty, _get_rng())
[docs]
@entropy_state_signature
class ModeClusterState(MCMCState, MultiflipMCMCState, MultilevelMCMCState):
r"""The mixed random label model state for a set of labelled partitions, which
attempts to align them inside clusters with a common group labelling.
Parameters
----------
bs : list of iterables
List of partitions to be aligned. If ``nested=True``, these should be
hierarchical partitions, composed each as a list of partitions.
b : iterable (optional, default: ``None``)
Initial cluster membership for every partition. If ``None`` a random
division into ``B`` groups will be used.
B : ``int`` (optional, default: ``1``)
Number of groups for initial division.
relabel : ``bool`` (optional, default: ``True``)
If ``True``, an initial alignment of the partitions will be attempted during
instantiation, otherwise they will be incorporated as they are.
nested : ``bool`` (optional, default: ``False``)
If ``True``, the partitions will be assumed to be hierarchical.
References
----------
.. [peixoto-revealing-2021] Tiago P. Peixoto, "Revealing consensus and
dissensus between network partitions", Phys. Rev. X 11 021003 (2021)
:doi:`10.1103/PhysRevX.11.021003`, :arxiv:`2005.13977`
"""
def __init__(self, bs, b=None, B=1, nested=False, relabel=True, **kwargs):
EntropyState.__init__(self)
self.bs = []
self.nested = nested
for bv in bs:
if self.nested:
bv = [Vector_int32_t(init=x) for x in bv]
else:
bv = [Vector_int32_t(init=bv)]
self.bs.append(bv)
if b is None:
self.b = np.random.randint(0, B, len(self.bs), dtype="int32")
else:
self.b = np.asarray(b, dtype="int32")
if self.b.max() >= len(self.bs):
raise ValueError("supplied clustering 'b' has maximum label larger than number of partitions")
self.b = Vector_int32_t(init=self.b)
self.relabel_init = relabel
self.g = Graph()
self.g.add_vertex(len(self.b))
self.bg = self.g
self._abg = self.bg._get_any()
self.obs = self.bs
self._state = libinference.make_mode_cluster_state(self)
def __copy__(self):
return self.copy()
def __deepcopy__(self, memo):
b = copy.deepcopy(self.b, memo)
bs = copy.deepcopy(self.bs, memo)
return self.copy(bs=bs, b=b)
[docs]
def copy(self, bs=None, b=None):
r"""Copies the state. The parameters override the state properties, and
have the same meaning as in the constructor."""
if bs is None:
if not self.nested:
bs = [x[0] for x in self.bs]
else:
bs = self.bs
return ModeClusterState(bs=bs, b=b if b is not None else self.b,
relabel=False, nested=self.nested)
def __getstate__(self):
state = EntropyState.__getstate__(self)
return dict(state, bs=self.bs, b=self.b, nested=self.nested)
def __setstate__(self, state):
if not state["nested"]:
state["bs"] = [x[0] for x in state["bs"]]
self.__init__(**state, relabel=False)
[docs]
def get_mode(self, r):
r"""Return the mode in cluster ``r`` as an instance of
:class:`~graph_tool.inference.PartitionModeState`. """
base = self._state.get_mode(r);
return PartitionModeState(None, base=base, nested=self.nested)
[docs]
def get_modes(self, sort=True):
r"""Return the list of nonempty modes, as instances of
:class:`~graph_tool.inference.PartitionModeState`. If `sorted == True`,
the modes are retured in decreasing order with respect to their size.
"""
modes = [self.get_mode(r) for r in np.unique(self.b)]
if sort:
modes = list(sorted(modes, key=lambda m: -m.get_M()))
return modes
[docs]
def get_wr(self):
r"""Return cluster sizes. """
return np.array(np.bincount(self.b))
[docs]
def get_B(self):
r"Returns the total number of clusters."
return len(np.unique(self.b))
[docs]
def get_Be(self):
r"""Returns the effective number of clusters, defined as :math:`e^{H}`, with
:math:`H=-\sum_r\frac{n_r}{N}\ln \frac{n_r}{N}`, where :math:`n_r` is
the number of partitions in cluster :math:`r`.
"""
w = np.array(np.bincount(self.b), dtype="double")
w = w[w>0]
w /= w.sum()
return np.exp(-(w*np.log(w)).sum())
[docs]
def virtual_add_partition(self, b, r, relabel=True):
r"""Computes the entropy difference (negative log probability) if partition ``b``
were inserted in cluster ``r``, after relabelling it if ``relabel=True``.
"""
if self.nested:
b = [Vector_int32_t(init=x) for x in b]
else:
b = [Vector_int32_t(init=b)]
return self._state.virtual_add_partition(b, r, relabel)
[docs]
def add_partition(self, b, r, relabel=True):
r"""Add partition ``b`` in cluster ``r``, after relabelling it if ``relabel=True``."""
if self.nested:
b = [Vector_int32_t(init=x) for x in b]
else:
b = [Vector_int32_t(init=b)]
self._state.add_partition(b, r, relabel)
[docs]
def classify_partition(self, b, relabel=True, new_group=True, sample=False):
r"""Returns the cluster ``r`` to which partition ``b`` would belong, after
relabelling it if ``relabel=True``, according to the most probable
assignment, or randomly sampled according to the relative probabilities
if ``sample==True``. If ``new_group==True``, a new previously unoccupied
group is also considered for the classification. """
rs = list(np.unique(self.b))
if new_group:
rs.append(max(rs) + 1)
dS = [self.virtual_add_partition(b, r, relabel) for r in rs]
if not sample:
r = np.argmin(dS)
else:
Ps = -np.array(dS)
Ps -= Ps.max()
Ps = np.exp(Ps)
Ps /= Ps.sum()
r = np.random.choice(np.arange(len(Ps)), p=Ps)
return rs[r]
[docs]
def relabel(self, epsilon=1e-6, maxiter=100):
r"""Attempt to align group labels between clusters via a greedy algorithm. The
algorithm stops after ``maxiter`` iterations or when the entropy
improvement lies below ``epsilon``."""
return self._state.relabel_modes(epsilon, maxiter)
@copy_state_wrap
def _entropy(self):
r"""Return the description length (negative joint log-likelihood)."""
return self._state.entropy()
def _gen_eargs(self, eargs):
return ""
[docs]
def posterior_entropy(self, MLE=True):
r"""Return the entropy of the random label model, using maximum likelihood
estimates for the marginal node probabilities if ```MLE=True```,
otherwise using posterior mean estimates.
"""
return self._state.posterior_entropy(MLE)
[docs]
def posterior_lprob(self, r, b, MLE=True):
r"""Return the log-probability of partition ``b`` belonging to mode ``r``, using
maximum likelihood estimates for the marginal node probabilities if
```MLE=True```, otherwise using posterior mean estimates."""
if self.nested:
b = [Vector_int32_t(init=x) for x in b]
else:
b = [Vector_int32_t(init=b)]
return self._state.posterior_lprob(r, b, MLE)
[docs]
def replace_partitions(self):
r"""For every cluster, removes and re-adds every partition, after relabelling,
and returns the entropy difference (negative log probability)."""
return self._state.replace_partitions(_get_rng())
[docs]
def sample_partition(self, MLE=True):
r"""Sampled a cluster label and partition from the inferred model, using maximum
likelihood estimates for the marginal node probabilities if
```MLE=True```, otherwise using posterior mean estimates."""
return self._state.sample_partition(MLE, _get_rng())
[docs]
def sample_nested_partition(self, MLE=True, fix_empty=True):
r"""Sampled a cluster label and nested partition from the inferred model, using
maximum likelihood estimates for the marginal node probabilities if
```MLE=True```, otherwise using posterior mean estimates.
"""
return self._state.sample_nested_partition(MLE, fix_empty, _get_rng())
def _mcmc_sweep_dispatch(self, mcmc_state):
return libinference.mode_clustering_mcmc_sweep(mcmc_state, self._state,
_get_rng())
def _multiflip_mcmc_sweep_dispatch(self, mcmc_state):
return libinference.mode_clustering_multiflip_mcmc_sweep(mcmc_state,
self._state,
_get_rng())
def _multilevel_mcmc_sweep_dispatch(self, mcmc_state):
return libinference.mode_clustering_multilevel_mcmc_sweep(mcmc_state,
self._state,
_get_rng())
def _get_array(x):
if isinstance(x, PropertyMap):
return x.fa
return x
def _handle_props(func):
@wraps(func)
def wrapper(*args, **kwargs):
nargs = (_get_array(x) for x in args)
ret = func(*nargs, **kwargs)
if isinstance(ret, np.ndarray) and isinstance(args[0], PropertyMap):
nret = args[0].copy()
nret.fa = ret
return nret
return ret
return wrapper
def _handle_nprops(func):
@wraps(func)
def wrapper(*args, **kwargs):
nargs = ([_get_array(xl) for xl in x] for x in args)
ret = func(*nargs, **kwargs)
if isinstance(ret, list):
nret = []
for i in range(len(ret)):
val = ret[i]
if isinstance(val, np.ndarray) and isinstance(args[0][i], PropertyMap):
nval = args[0][i].copy()
nval.fa = val
nret.append(nval)
else:
nret.append(val)
return nret
return ret
return wrapper
[docs]
@_handle_props
def partition_overlap(x, y, norm=True):
r"""Returns the maximum overlap between partitions, according to an optimal
label alignment.
Parameters
----------
x : iterable of ``int`` values or :class:`~graph_tool.PropertyMap`
First partition.
y : iterable of ``int`` values or :class:`~graph_tool.PropertyMap`
Second partition.
norm : (optional, default: ``True``)
If ``True``, the result will be normalized in the range :math:`[0,1]`.
Returns
-------
w : ``float`` or ``int``
Maximum overlap value.
Notes
-----
The maximum overlap between partitions :math:`\boldsymbol x` and :math:`\boldsymbol y` is defined as
.. math::
\omega(\boldsymbol x,\boldsymbol y) = \underset{\boldsymbol\mu}{\max}\sum_i\delta_{x_i,\mu(y_i)},
where :math:`\boldsymbol\mu` is a bijective mapping between group labels. It
corresponds to solving an instance of the maximum weighted bipartite
matching problem, which is done with the Kuhn-Munkres algorithm
[kuhn_hungarian_1955]_ [munkres_algorithms_1957]_.
If ``norm == True``, the normalized value is returned:
.. math::
\frac{\omega(\boldsymbol x,\boldsymbol y)}{N}
which lies in the unit interval :math:`[0,1]`.
This algorithm runs in time :math:`O[N + (B_x+B_y)E_m]` where :math:`N` is
the length of :math:`\boldsymbol x` and :math:`\boldsymbol y`, :math:`B_x`
and :math:`B_y` are the number of labels in partitions :math:`\boldsymbol x`
and :math:`\boldsymbol y`, respectively, and :math:`E_m \le B_xB_y` is the
number of nonzero entries in the contingency table between both partitions.
Examples
--------
>>> x = np.random.randint(0, 10, 1000)
>>> y = np.random.randint(0, 10, 1000)
>>> print(gt.partition_overlap(x, y))
0.143
References
----------
.. [peixoto-revealing-2021] Tiago P. Peixoto, "Revealing consensus and
dissensus between network partitions", Phys. Rev. X 11 021003 (2021)
:doi:`10.1103/PhysRevX.11.021003`, :arxiv:`2005.13977`
.. [kuhn_hungarian_1955] H. W. Kuhn, "The Hungarian method for the
assignment problem," Naval Research Logistics Quarterly 2, 83–97 (1955)
:doi:`10.1002/nav.3800020109`
.. [munkres_algorithms_1957] James Munkres, "Algorithms for the Assignment
and Transportation Problems," Journal of the Society for Industrial and
Applied Mathematics 5, 32–38 (1957).
:doi:`10.1137/0105003`
"""
x = np.asarray(x, dtype="int32")
y = np.asarray(y, dtype="int32")
if len(x) != len(y):
raise ValueError("Partitions need to have the same number of elements")
m = libinference.partition_overlap(x, y)
if norm:
m /= max((len(x), len(y)))
return m
[docs]
@_handle_nprops
def nested_partition_overlap(x, y, norm=True):
r"""Returns the hierarchical maximum overlap between nested partitions, according
to an optimal recursive label alignment.
Parameters
----------
x : iterable of iterables of ``int`` values or iterable of :class:`~graph_tool.PropertyMap`
First partition.
y : iterable of iterables of ``int`` values or iterable of :class:`~graph_tool.PropertyMap`
Second partition.
norm : (optional, default: ``True``)
If ``True``, the result will be normalized in the range :math:`[0,1]`.
Returns
-------
w : ``float`` or ``int``
Maximum hierarchical overlap value.
Notes
-----
The maximum overlap between partitions :math:`\bar{\boldsymbol x}` and
:math:`\bar{\boldsymbol y}` is defined as
.. math::
\omega(\bar{\boldsymbol x},\bar{\boldsymbol y}) = \sum_l\underset{\boldsymbol\mu_l}{\max}\sum_i\delta_{x_i^l,\mu_l(\tilde y_i^l)},
where :math:`\boldsymbol\mu_l` is a bijective mapping between group labels
at level :math:`l`, and :math:`\tilde y_i^l = y^i_{\mu_{l-1}(i)}` are the
nodes reordered according to the lower level. It corresponds to solving an
instance of the maximum weighted bipartite matching problem for every
hierarchical level, which is done with the Kuhn-Munkres algorithm
[kuhn_hungarian_1955]_ [munkres_algorithms_1957]_.
If ``norm == True``, the normalized value is returned:
.. math::
1 - \frac{\left(\sum_lN_l\right) - \omega(\bar{\boldsymbol x}, \bar{\boldsymbol y})}{\sum_l\left(N_l - 1\right)}
which lies in the unit interval :math:`[0,1]`, where
:math:`N_l=\max(N_{{\boldsymbol x}^l}, N_{{\boldsymbol y}^l})` is the number of
nodes in level `l`.
This algorithm runs in time :math:`O[\sum_l N_l + (B_x^l+B_y^l)E_m^l]` where
:math:`B_x^l` and :math:`B_y^l` are the number of labels in partitions
:math:`\bar{\boldsymbol x}` and :math:`\bar{\boldsymbol y}` at level
:math:`l`, respectively, and :math:`E_m^l \le B_x^lB_y^l` is the number of
nonzero entries in the contingency table between both partitions.
Examples
--------
>>> x = [np.random.randint(0, 100, 1000), np.random.randint(0, 10, 100), np.random.randint(0, 3, 10)]
>>> y = [np.random.randint(0, 100, 1000), np.random.randint(0, 10, 100), np.random.randint(0, 3, 10)]
>>> print(gt.nested_partition_overlap(x, y))
0.140018...
References
----------
.. [peixoto-revealing-2021] Tiago P. Peixoto, "Revealing consensus and
dissensus between network partitions", Phys. Rev. X 11 021003 (2021)
:doi:`10.1103/PhysRevX.11.021003`, :arxiv:`2005.13977`
.. [kuhn_hungarian_1955] H. W. Kuhn, "The Hungarian method for the
assignment problem," Naval Research Logistics Quarterly 2, 83–97 (1955)
:doi:`10.1002/nav.3800020109`
.. [munkres_algorithms_1957] James Munkres, "Algorithms for the Assignment
and Transportation Problems," Journal of the Society for Industrial and
Applied Mathematics 5, 32–38 (1957).
:doi:`10.1137/0105003`
"""
y = order_nested_partition_labels(y)
x = align_nested_partition_labels(x, y)
L = min((len(x), len(y)))
m = 0
N = 0
for l in range(L):
xl = x[l]
yl = y[l]
Nl = min((len(xl), len(yl)))
null = np.logical_and(xl[:Nl] == -1, y[:Nl] == -1).sum()
ml = (xl[:Nl] == yl[:Nl]).sum() - null
Nl = max(((xl != -1).sum(), (yl != -1).sum()))
N += Nl
m += ml
if norm:
return 1 - (N - m) / (N - L)
return m
[docs]
@_handle_props
def contingency_graph(x, y):
r"""Returns the contingency graph between both partitions.
Parameters
----------
x : iterable of ``int`` values or :class:`~graph_tool.PropertyMap`
First partition.
y : iterable of ``int`` values or :class:`~graph_tool.PropertyMap`
Second partition.
Returns
-------
g : :class:`~graph_tool.Graph`
Contingency graph, containing an internal edge property map ``mrs`` with
the weights, an internal vertex property map ``label`` with the label
values, and an internal boolean vertex property map ``partition``
indicating the partition membership.
Notes
-----
The contingency graph is a bipartite graph with the labels of
:math:`\boldsymbol x` and :math:`\boldsymbol y` as vertices, and edge
weights given by
.. math::
m_{rs} = \sum_i\delta_{x_i,r}\delta_{y_i,s}.
This algorithm runs in time :math:`O(N)` where :math:`N` is
the length of :math:`\boldsymbol x` and :math:`\boldsymbol y`.
Examples
--------
>>> x = np.random.randint(0, 10, 1000)
>>> y = np.random.randint(0, 10, 1000)
>>> g = gt.contingency_graph(x, y)
>>> g.ep.mrs.a
PropertyArray([ 8, 10, 4, 11, 8, 11, 15, 12, 14, 11, 8, 12, 9, 9, 11,
14, 10, 9, 16, 8, 12, 16, 15, 13, 12, 7, 11, 13, 18, 9,
9, 9, 14, 10, 11, 8, 6, 7, 11, 11, 8, 11, 14, 12, 8,
7, 7, 8, 8, 12, 10, 11, 8, 15, 6, 13, 14, 14, 8, 10,
8, 11, 7, 6, 10, 13, 10, 13, 6, 11, 15, 5, 4, 10, 13,
8, 8, 9, 14, 8, 6, 11, 7, 8, 15, 10, 9, 7, 8, 11,
10, 11, 9, 8, 8, 10, 7, 8, 3, 9], dtype=int32)
"""
x = np.asarray(x, dtype="int32")
y = np.asarray(y, dtype="int32")
g = Graph(directed=False)
g.ep.mrs = g.new_ep("int32_t")
g.vp.label = g.new_vp("int32_t")
g.vp.partition = g.new_vp("bool")
libinference.get_contingency_graph(g._Graph__graph,
g.vp.label._get_any(),
g.ep.mrs._get_any(),
g.vp.partition._get_any(),
x, y)
return g
[docs]
@_handle_props
def shuffle_partition_labels(x):
r"""Returns a copy of partition ``x``, with the group labels randomly shuffled.
Parameters
----------
x : iterable of ``int`` values or :class:`~graph_tool.PropertyMap`
Partition.
Returns
-------
y : :class:`numpy.ndarray` or :class:`~graph_tool.PropertyMap`
Partition with shuffled labels.
Examples
--------
>>> x = [0, 0, 0, 1, 1, 1, 2, 2, 2]
>>> gt.shuffle_partition_labels(x)
array([0, 0, 0, 2, 2, 2, 1, 1, 1], dtype=int32)
"""
x = np.asarray(x, dtype="int32").copy()
libinference.partition_shuffle_labels(x, _get_rng())
return x
@_handle_nprops
def shuffle_nested_partition_labels(x):
r"""Returns a copy of nested partition ``x``, with the group labels randomly shuffled.
Parameters
----------
x : iterable iterable of ``int`` values or iterable of :class:`~graph_tool.PropertyMap`
Partition.
Returns
-------
y : list of :class:`numpy.ndarray` or list of :class:`~graph_tool.PropertyMap`
Nested partition with shuffled labels.
Examples
--------
>>> x = [[0, 0, 0, 1, 1, 1, 2, 2, 2], [0, 0, 1], [1, 0]]
>>> gt.shuffle_nested_partition_labels(x)
[array([0, 0, 0, 2, 2, 2, 1, 1, 1], dtype=int32), array([1, 0, 1], dtype=int32), array([1, 0], dtype=int32)]
"""
x = [np.asarray(xl, dtype="int32") for xl in x]
x = libinference.nested_partition_shuffle_labels(x, _get_rng())
return x
[docs]
@_handle_props
def order_partition_labels(x):
r"""Returns a copy of partition ``x``, with the group labels ordered decreasingly
according to group size.
Parameters
----------
x : iterable of ``int`` values or :class:`~graph_tool.PropertyMap`
Partition.
Returns
-------
y : :class:`numpy.ndarray` or :class:`~graph_tool.PropertyMap`
Partition with ordered labels.
Examples
--------
>>> x = [0, 2, 2, 1, 1, 1, 2, 2, 2]
>>> gt.order_partition_labels(x)
array([2, 0, 0, 1, 1, 1, 0, 0, 0], dtype=int32)
"""
x = np.asarray(x, dtype="int32").copy()
libinference.partition_order_labels(x)
return x
[docs]
@_handle_nprops
def order_nested_partition_labels(x):
r"""Returns a copy of nested partition ``x``, with the group labels ordered
decreasingly according to group size at each level.
Parameters
----------
x : iterable of iterables of ``int`` values or iterable of :class:`~graph_tool.PropertyMap`
Partition.
Returns
-------
y : list of :class:`numpy.ndarray` or list of :class:`~graph_tool.PropertyMap`
Nested partition with ordered labels.
Examples
--------
>>> x = [[0, 2, 2, 1, 1, 1, 2, 2, 2], [1, 1, 0], [1, 1]]
>>> gt.order_nested_partition_labels(x)
[array([2, 0, 0, 1, 1, 1, 0, 0, 0], dtype=int32), array([1, 0, 0], dtype=int32), array([0, 0], dtype=int32)]
"""
x = [np.asarray(xl, dtype="int32") for xl in x]
x = libinference.nested_partition_order_labels(x)
return x
[docs]
@_handle_props
def align_partition_labels(x, y):
r"""Returns a copy of partition ``x``, with the group labels aligned as to
maximize the overlap with ``y``.
Parameters
----------
x : iterable of ``int`` values or :class:`~graph_tool.PropertyMap`
Partition.
y : iterables of ``int`` values or :class:`~graph_tool.PropertyMap`
Reference partition.
Returns
-------
z : :class:`numpy.ndarray` or :class:`~graph_tool.PropertyMap`
Partition ``x`` with labels aligned to ``y``.
Notes
-----
This algorithm runs in time :math:`O[N + (B_x+B_y)E_m]` where :math:`N` is
the length of :math:`\boldsymbol x` and :math:`\boldsymbol y`, :math:`B_x`
and :math:`B_y` are the number of labels in partitions :math:`\boldsymbol x`
and :math:`\boldsymbol y`, respectively, and :math:`E_m \le B_xB_y` is the
number of nonzero entries in the contingency table between both partitions.
Examples
--------
>>> x = [0, 2, 2, 1, 1, 1, 2, 3, 2]
>>> y = gt.shuffle_partition_labels(x)
>>> print(y)
[3 1 1 2 2 2 1 0 1]
>>> gt.align_partition_labels(y, x)
array([0, 2, 2, 1, 1, 1, 2, 3, 2], dtype=int32)
References
----------
.. [peixoto-revealing-2021] Tiago P. Peixoto, "Revealing consensus and
dissensus between network partitions", Phys. Rev. X 11 021003 (2021)
:doi:`10.1103/PhysRevX.11.021003`, :arxiv:`2005.13977`
"""
x = np.asarray(x, dtype="int32").copy()
y = np.asarray(y, dtype="int32")
libinference.align_partition_labels(x, y)
return x
[docs]
@_handle_nprops
def align_nested_partition_labels(x, y):
r"""Returns a copy of nested partition ``x``, with the group labels aligned as to
maximize the overlap with ``y``.
Parameters
----------
x : iterable of iterables of ``int`` values or iterable of :class:`~graph_tool.PropertyMap`
Nested partition.
y : iterable of iterables of ``int`` values or iterable of :class:`~graph_tool.PropertyMap`
Reference nested partition.
Returns
-------
z : list of :class:`numpy.ndarray` or list of :class:`~graph_tool.PropertyMap`
Nested partition ``x`` with labels aligned to ``y``.
Notes
-----
This algorithm runs in time :math:`O[\sum_l N_l + (B_x^l+B_y^l)E_m^l]` where
:math:`B_x^l` and :math:`B_y^l` are the number of labels in partitions
:math:`\bar{\boldsymbol x}` and :math:`\bar{\boldsymbol y}` at level
:math:`l`, respectively, and :math:`E_m^l \le B_x^lB_y^l` is the number of
nonzero entries in the contingency table between both partitions.
Examples
--------
>>> x = [[0, 2, 2, 1, 1, 1, 2, 3, 2], [1, 0, 1, 0], [0,0]]
>>> y = gt.shuffle_nested_partition_labels(x)
>>> print(y)
[array([3, 1, 1, 2, 2, 2, 1, 0, 1], dtype=int32), array([0, 1, 0, 1], dtype=int32), array([0, 0], dtype=int32)]
>>> gt.align_nested_partition_labels(y, x)
[array([0, 2, 2, 1, 1, 1, 2, 3, 2], dtype=int32), array([1, 0, 1, 0], dtype=int32), array([0, 0], dtype=int32)]
"""
x = [np.asarray(xl, dtype="int32") for xl in x]
y = [np.asarray(yl, dtype="int32") for yl in y]
x = libinference.align_nested_partition_labels(x, y)
return x
[docs]
@_parallel
def partition_overlap_center(bs, init=None, relabel_bs=False):
r"""Find a partition with a maximal overlap to all items of the list of
partitions given.
Parameters
----------
bs : list of iterables of ``int`` values or list of :class:`~graph_tool.PropertyMap`
List of partitions.
init : iterable of ``int`` values (optional, default: ``None``)
If given, it will determine the initial partition.
relabel_bs : ``bool`` (optional, default: ``False``)
If ``True`` the given list of partitions will be updated with
relabelled values.
Returns
-------
c : :class:`numpy.ndarray`
Partition containing the overlap consensus.
r : ``float``
Uncertainty in range :math:`[0,1]`.
Notes
-----
This algorithm obtains a partition :math:`\hat{\boldsymbol b}` that has a
maximal sum of overlaps with all partitions given in ``bs``. It is obtained
by performing the double maximization:
.. math::
\begin{aligned}
\hat b_i &= \underset{r}{\operatorname{argmax}}\;\sum_m \delta_{\mu_m(b^m_i), r}\\
\boldsymbol\mu_m &= \underset{\boldsymbol\mu}{\operatorname{argmax}} \sum_rm_{r,\mu(r)}^{(m)},
\end{aligned}
where :math:`\boldsymbol\mu` is a bijective mapping between group labels,
and :math:`m_{rs}^{(m)}` is the contingency table between
:math:`\hat{\boldsymbol b}` and :math:`\boldsymbol b ^{(m)}`. This algorithm
simply iterates the above equations, until no further improvement is
possible.
The uncertainty is given by:
.. math::
r = 1 - \frac{1}{NM} \sum_i \sum_m \delta_{\mu_m(b^m_i), \hat b_i}
This algorithm runs in time :math:`O[M(N + B^3)]` where :math:`M` is the
number of partitions, :math:`N` is the length of the partitions and
:math:`B` is the number of labels used.
@parallel@
Examples
--------
>>> x = [5, 5, 2, 0, 1, 0, 1, 0, 0, 0, 0]
>>> bs = []
>>> for m in range(100):
... y = np.array(x)
... y[np.random.randint(len(y))] = np.random.randint(5)
... bs.append(y)
>>> bs[:3]
[array([5, 5, 2, 0, 1, 0, 1, 0, 0, 0, 3]), array([5, 5, 2, 0, 4, 0, 1, 0, 0, 0, 0]), array([5, 5, 1, 0, 1, 0, 1, 0, 0, 0, 0])]
>>> c, r = gt.partition_overlap_center(bs)
>>> print(c, r)
[1 1 2 0 3 0 3 0 0 0 0] 0.06818181...
>>> gt.align_partition_labels(c, x)
array([5, 5, 2, 0, 1, 0, 1, 0, 0, 0, 0], dtype=int32)
References
----------
.. [peixoto-revealing-2021] Tiago P. Peixoto, "Revealing consensus and
dissensus between network partitions", Phys. Rev. X 11 021003 (2021)
:doi:`10.1103/PhysRevX.11.021003`, :arxiv:`2005.13977`
"""
bs = _get_array(bs)
if not relabel_bs:
bs = np.array(bs, dtype="int32")
if init is None:
c = np.zeros(len(bs[0]), dtype="int32")
else:
c = np.asarray(init, dtype="int32")
r = libinference.partition_overlap_center(bs, c)
return c, r
[docs]
@_parallel
def nested_partition_overlap_center(bs, init=None, return_bs=False):
r"""Find a nested partition with a maximal overlap to all items of the list of
nested partitions given.
Parameters
----------
bs : list of list of iterables of ``int`` values or list of list of :class:`~graph_tool.PropertyMap`
List of nested partitions.
init : iterable of iterables of ``int`` values (optional, default: ``None``)
If given, it will determine the initial nested partition.
return_bs : ``bool`` (optional, default: ``False``)
If ``True`` the an update list of nested partitions will be return with
relabelled values.
Returns
-------
c : List of :class:`numpy.ndarray`
Nested partition containing the overlap consensus.
r : ``float``
Uncertainty in range :math:`[0,1]`.
bs : List of lists of :class:`numpy.ndarray`
List of relabelled nested partitions.
Notes
-----
This algorithm obtains a nested partition :math:`\hat{\bar{\boldsymbol b}}`
that has a maximal sum of overlaps with all nested partitions given in
``bs``. It is obtained by performing the double maximization:
.. math::
\begin{aligned}
\hat b_i^l &= \underset{r}{\operatorname{argmax}}\;\sum_m \delta_{\mu_m^l(b^{l,m}_i), r}\\
\boldsymbol\mu_m^l &= \underset{\boldsymbol\mu}{\operatorname{argmax}} \sum_rm_{r,\mu(r)}^{(l,m)},
\end{aligned}
where :math:`\boldsymbol\mu` is a bijective mapping between group labels,
and :math:`m_{rs}^{(l,m)}` is the contingency table between
:math:`\hat{\boldsymbol b}_l` and :math:`\boldsymbol b ^{(m)}_l`. This algorithm
simply iterates the above equations, until no further improvement is
possible.
The uncertainty is given by:
.. math::
r = 1 - \frac{1}{N-L}\sum_l\frac{N_l-1}{N_l}\sum_i\frac{1}{M}\sum_m \delta_{\mu_m(b^{l,m}_i), \hat b_i^l}.
This algorithm runs in time :math:`O[M\sum_l(N_l + B_l^3)]` where :math:`M`
is the number of partitions, :math:`N_l` is the length of the partitions and
:math:`B_l` is the number of labels used, in level :math:`l`.
@parallel@
Examples
--------
>>> x = [[5, 5, 2, 0, 1, 0, 1, 0, 0, 0, 0], [0, 1, 0, 1, 1, 1]]
>>> bs = []
>>> for m in range(100):
... y = [np.array(xl) for xl in x]
... y[0][np.random.randint(len(y[0]))] = np.random.randint(5)
... y[1][np.random.randint(len(y[1]))] = np.random.randint(2)
... bs.append(y)
>>> bs[:3]
[[array([5, 5, 2, 0, 1, 0, 1, 0, 0, 0, 3]), array([0, 1, 0, 1, 1, 1])], [array([5, 5, 2, 0, 1, 0, 4, 0, 0, 0, 0]), array([0, 1, 1, 1, 1, 1])], [array([5, 5, 2, 0, 1, 0, 1, 0, 0, 0, 2]), array([0, 1, 0, 1, 1, 1])]]
>>> c, r = gt.nested_partition_overlap_center(bs)
>>> print(c, r)
[array([1, 1, 2, 0, 3, 0, 3, 0, 0, 0, 0], dtype=int32), array([0, 1, 0, 1, 1], dtype=int32)] 0.069385...
>>> gt.align_nested_partition_labels(c, x)
[array([5, 5, 2, 0, 1, 0, 1, 0, 0, 0, 0], dtype=int32), array([ 0, 1, 0, -1, -1, 1], dtype=int32)]
References
----------
.. [peixoto-revealing-2021] Tiago P. Peixoto, "Revealing consensus and
dissensus between network partitions", Phys. Rev. X 11 021003 (2021)
:doi:`10.1103/PhysRevX.11.021003`, :arxiv:`2005.13977`
"""
bs = [[np.asarray(_get_array(bs[m][l]), dtype="int32") for l in range(len(bs[m]))] for m in range(len(bs))]
if init is None:
c = [np.zeros(len(bs[0][l]), dtype="int32") for l in range(len(bs[0]))]
else:
c = [np.asarray(init[l], dtype="int32") for l in range(len(init))]
c, bs, r = libinference.nested_partition_overlap_center(bs, c)
if return_bs:
return c, r, bs
else:
return c, r
[docs]
@_handle_nprops
def nested_partition_clear_null(x):
r"""Returns a copy of nested partition ``x`` where the null values ``-1`` are
replaced with ``0``.
Parameters
----------
x : iterable of iterables of ``int`` values or list of :class:`~graph_tool.PropertyMap`
Partition.
Returns
-------
y : list of :class:`numpy.ndarray`
Nested partition with null values removed.
Notes
-----
This is useful to pass hierarchical partitions to
:class:`~graph_tool.inference.NestedBlockState`.
Examples
--------
>>> x = [[5, 5, 2, 0, 1, 0, 1, 0, 0, 0, 0], [0, 1, 0, -1, -1, 1]]
>>> gt.nested_partition_clear_null(x)
[array([5, 5, 2, 0, 1, 0, 1, 0, 0, 0, 0], dtype=int32), array([0, 1, 0, 0, 0, 1], dtype=int32)]
"""
x = [np.asarray(x[l], dtype="int32") for l in range(len(x))]
return libinference.nested_partition_clear_null(x)