#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""mmd.py: code for computing MMD (Maximum Mean Discrepancy),
kernel based statistical test used to determine whether given two
distribution are the same. Also contains functions to calculate
the EMD (Earth Mover's Distance) and the L2 distance between two
histograms, in addition to Gaussian kernels with these distances.
Adapted from Jo, J. & al (2022)
"""
import concurrent.futures
from functools import partial
from typing import Callable, Iterator, List, Optional, Tuple
import networkx as nx
import numpy as np
import pyemd
from scipy.linalg import toeplitz
from sklearn.metrics.pairwise import pairwise_kernels
from tqdm import tqdm
from ccsd.src.evaluation.eden import vectorize
[docs]
def emd(x: np.ndarray, y: np.ndarray, distance_scaling: float = 1.0) -> float:
"""Calculate the earth mover's distance (EMD) between two histograms
It corresponds to the Wasserstein metric (see Optimal transport theory)
The formula is (\inf_{\gama \in \Gama(\mu, \nu) \int_{M*M} d(x,y)^p d\gama(x,y))^(1/p).
Adapted from From Niu et al. (2020)
Args:
x (np.ndarray): histogram of first distribution
y (np.ndarray): histogram of second distribution
distance_scaling (float, optional): distance scaling factor. Defaults to 1.0.
Returns:
float: EMD value
"""
# Convert histogram values x and y to float, and make them equal len
x = x.astype(np.float64)
y = y.astype(np.float64)
support_size = max(len(x), len(y)) # support of the two vectors
# Diagonal-constant matrix
d_mat = toeplitz(range(support_size)).astype(np.float64)
distance_mat = d_mat / distance_scaling
x, y = process_tensor(x, y)
# Calculate EMD
emd_value = pyemd.emd(x, y, distance_mat)
return np.abs(emd_value)
[docs]
def l2(x: np.ndarray, y: np.ndarray) -> float:
"""Calculate the L2 distance between two histograms
Args:
x (np.ndarray): histogram of first distribution
y (np.ndarray): histogram of second distribution
Returns:
float: L2 distance
"""
dist = np.linalg.norm(x - y, 2)
return dist
[docs]
def gaussian_emd(
x: np.ndarray, y: np.ndarray, sigma: float = 1.0, distance_scaling: float = 1.0
) -> float:
"""Gaussian kernel with squared distance in exponential term replaced by EMD
The inputs are PMF (Probability mass function). The Gaussian kernel is defined as
k(x,y) = exp(-f(x,y)^2/(2*sigma^2)) where f(.,.) is the EMD function.
Args:
x (np.ndarray): 1D pmf of the first distribution with the same support
y (np.ndarray): 1D pmf of the second distribution with the same support
sigma (float, optional): standard deviation. Defaults to 1.0.
distance_scaling (float, optional): distance scaling factor. Defaults to 1.0.
Returns:
float: Gaussian kernel value
"""
emd_value = emd(x, y, distance_scaling)
return np.exp(-emd_value * emd_value / (2 * sigma * sigma))
[docs]
def gaussian(x: np.ndarray, y: np.ndarray, sigma: float = 1.0) -> float:
"""Gaussian kernel with squared distance in exponential term replaced by L2 distance
The inputs are PMF (Probability mass function). The Gaussian kernel is defined as
k(x,y) = exp(-N(x, y)^2/(2*sigma^2)) where N(.,.) is the L2 distance function.
Args:
x (np.ndarray): 1D pmf of the first distribution with the same support
y (np.ndarray): 1D pmf of the second distribution with the same support
sigma (float, optional): standard deviation. Defaults to 1.0.
Returns:
float: Gaussian kernel value
"""
# Convert histogram values x and y to float, and make them equal len
x = x.astype(np.float64)
y = y.astype(np.float64)
x, y = process_tensor(x, y)
dist = np.linalg.norm(x - y, 2)
return np.exp(-dist * dist / (2 * sigma * sigma))
[docs]
def gaussian_tv(x: np.ndarray, y: np.ndarray, sigma: float = 1.0) -> float:
"""Gaussian kernel with squared distance in exponential term replaced by total variation distance (half L1 distance, used in transportation theory)
The inputs are PMF (Probability mass function). The Gaussian kernel is defined as
k(x,y) = exp(-f(x, y)^2/(2*sigma^2)) where f(x, y) = 0.5 * N(x, y) is the total variation distance (half L1 distance N).
Args:
x (np.ndarray): 1D pmf of the first distribution with the same support
y (np.ndarray): 1D pmf of the second distribution with the same support
sigma (float, optional): standard deviation. Defaults to 1.0.
Returns:
float: Gaussian kernel value
"""
# Convert histogram values x and y to float, and make them equal len
x = x.astype(np.float64)
y = y.astype(np.float64)
x, y = process_tensor(x, y)
dist = np.abs(x - y).sum() / 2.0
return np.exp(-dist * dist / (2 * sigma * sigma))
[docs]
def kernel_parallel_unpacked(
x: np.ndarray,
samples2: Iterator[np.ndarray],
kernel: Callable[[np.ndarray, np.ndarray], float],
) -> float:
"""Calculate the sum of the kernel values between x and all the samples in samples2
Args:
x (np.ndarray): "true sample"
samples2 (Iterator[np.ndarray]): samples from the generator
kernel (Callable[[np.ndarray, np.ndarray], float]): kernel function
Returns:
float: sum of kernel values
"""
d = 0
for s2 in samples2:
d += kernel(x, s2)
return d
[docs]
def kernel_parallel_worker(
t: Tuple[
np.ndarray, Iterator[np.ndarray], Callable[[np.ndarray, np.ndarray], float]
]
) -> float:
"""Wrapper for kernel_parallel_unpacked
Args:
t (Tuple[np.ndarray, Iterator[np.ndarray], Callable[[np.ndarray, np.ndarray], float]]): tuple of arguments
Returns:
float: sum of kernel values
"""
return kernel_parallel_unpacked(*t)
[docs]
def disc(
samples1: Iterator[np.ndarray],
samples2: Iterator[np.ndarray],
kernel: Callable[[np.ndarray, np.ndarray], float],
is_parallel: bool = True,
max_workers: Optional[int] = None,
debug_mode: bool = False,
progress_bar: bool = False,
*args,
**kwargs
) -> float:
"""Calculate the discrepancy between 2 samples
Args:
samples1 (Iterator[np.ndarray]): samples 1
samples2 (Iterator[np.ndarray]): samples 2
kernel (Callable[[np.ndarray, np.ndarray], float]): kernel function
is_parallel (bool, optional): whether or not we use parallel processing. Defaults to True.
max_workers (Optional[int], optional): number of workers (if is_parallel). Defaults to None.
debug_mode (bool, optional): whether or not we print debug info for parallel computing. Defaults to False.
progress_bar (bool, optional): whether or not we print progress bar if is_parallel is set to False. Defaults to False.
Returns:
float: discrepancy
"""
if not is_parallel:
d = 0
if not progress_bar:
for s1 in samples1:
for s2 in samples2:
d += kernel(s1, s2, *args, **kwargs)
else:
for i_s1 in tqdm(range(len(samples1))):
s1 = samples1[i_s1]
for i_s2 in tqdm(range(len(samples2)), leave=False):
s2 = samples2[i_s2]
d += kernel(s1, s2, *args, **kwargs)
else: # parallel
all_dist = []
if debug_mode:
print("Start parallel computing for disc compute")
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
results = executor.map(
kernel_parallel_worker,
[(s1, samples2, partial(kernel, *args, **kwargs)) for s1 in samples1],
)
try:
for dist in results:
all_dist.append(dist)
except Exception as e:
raise e
d = np.sum(all_dist)
# Normalize
N = len(samples1) * len(samples2)
if N:
d /= N
return d
[docs]
def compute_mmd(
samples1: Iterator[np.ndarray],
samples2: Iterator[np.ndarray],
kernel: Callable[[np.ndarray, np.ndarray], float],
is_hist: bool = True,
*args,
**kwargs
) -> float:
"""Calculate the MMD (Maximum Mean Discrepancy) between two samples
Args:
samples1 (Iterator[np.ndarray]): samples 1
samples2 (Iterator[np.ndarray]): samples 2
kernel (Callable[[np.ndarray, np.ndarray], float]): kernel function
is_hist (bool, optional): whether or not we normalize the input to transform
it into histograms. Defaults to True.
Returns:
float: MMD
"""
if is_hist: # normalize histograms into pmf
samples1 = [s1 / np.sum(s1) if np.sum(s1) else s1 for s1 in samples1]
samples2 = [s2 / np.sum(s2) if np.sum(s2) else s2 for s2 in samples2]
return (
disc(samples1, samples1, kernel, *args, **kwargs)
+ disc(samples2, samples2, kernel, *args, **kwargs)
- 2 * disc(samples1, samples2, kernel, *args, **kwargs)
)
[docs]
def compute_emd(
samples1: Iterator[np.ndarray],
samples2: Iterator[np.ndarray],
kernel: Callable[[np.ndarray, np.ndarray], float],
is_hist: bool = True,
*args,
**kwargs
) -> Tuple[float, List[np.ndarray]]:
"""Calculate the EMD (Earth Mover Distance) between the average of two samples
Args:
samples1 (Iterator[np.ndarray]): samples 1
samples2 (Iterator[np.ndarray]): samples 2
kernel (Callable[[np.ndarray, np.ndarray], float]): kernel function
is_hist (bool, optional): whether or not we normalize the input to transform
it into histograms. Defaults to True.
Returns:
Tuple[float, List[np.ndarray]]: EMD and the average of the two samples
"""
if is_hist: # normalize histograms into pmf, take the average
samples1 = [np.mean(samples1)]
samples2 = [np.mean(samples2)]
return disc(samples1, samples2, kernel, *args, **kwargs), [samples1[0], samples2[0]]
[docs]
def preprocess(X: np.ndarray, max_len: int, is_hist: bool) -> np.ndarray:
"""Preprocess function for the kernel_compute function below
Args:
X (np.ndarray): input array
max_len (int): max row length of the new array
is_hist (bool): if the input array is an histogram
Returns:
np.ndarray: preprocessed output array
"""
X_p = np.zeros((len(X), max_len))
for i in range(len(X)):
X_p[i, : len(X[i])] = X[i]
if is_hist:
row_sum = np.sum(X_p, axis=1)
X_p = X_p / row_sum[:, None]
return X_p
[docs]
def kernel_compute(
X: List[nx.Graph],
Y: Optional[List[nx.Graph]] = None,
is_hist: bool = True,
metric: str = "linear",
n_jobs: Optional[int] = None,
) -> np.ndarray:
"""Function to compute the kernel matrix with list of graphs as inputs and
a custom metric
Adapted from https://github.com/idea-iitd/graphgen/blob/master/metrics/mmd.py
Args:
X (List[nx.Graph]): samples 1 (list of graphs)
Y (Optional[List[nx.Graph]], optional): samples 2 (list of graphs). Defaults to None.
is_hist (bool, optional): whether of not the input should be histograms (NOT IMPLEMENTED). Defaults to True.
metric (str, optional): metric. Defaults to "linear".
n_jobs (Optional[int], optional): number of jobs for parallel computing. Defaults to None.
Returns:
np.ndarray: kernel matrix
"""
if metric == "nspdk":
X = vectorize(X, complexity=4, discrete=True)
if Y is not None:
Y = vectorize(Y, complexity=4, discrete=True)
return pairwise_kernels(X, Y, metric="linear", n_jobs=n_jobs)
else:
max_len = max([len(x) for x in X])
if Y is not None:
max_len = max(max_len, max([len(y) for y in Y]))
X = preprocess(X, max_len, is_hist)
if Y is not None:
Y = preprocess(Y, max_len, is_hist)
return pairwise_kernels(X, Y, metric=metric, n_jobs=n_jobs)
[docs]
def compute_nspdk_mmd(
samples1: List[nx.Graph],
samples2: List[nx.Graph],
metric: str,
is_hist: bool = True,
n_jobs: Optional[int] = None,
) -> float:
"""
Compute the MMD between two samples of graphs using the NSPDK kernel
Adapted from https://github.com/idea-iitd/graphgen/blob/master/metrics/mmd.py
Args:
samples1 (List[nx.Graph]): samples 1 (list of graphs)
samples2 (List[nx.Graph]): samples 2 (list of graphs)
metric (str): metric
is_hist (bool, optional): whether of not the input should be histograms (NOT IMPLEMENTED). Defaults to True.
n_jobs (Optional[int], optional): number of jobs for parallel computing. Defaults to None.
"""
X = kernel_compute(samples1, is_hist=is_hist, metric=metric, n_jobs=n_jobs)
Y = kernel_compute(samples2, is_hist=is_hist, metric=metric, n_jobs=n_jobs)
Z = kernel_compute(
samples1, Y=samples2, is_hist=is_hist, metric=metric, n_jobs=n_jobs
)
return np.average(X) + np.average(Y) - 2 * np.average(Z)
[docs]
def process_tensor(x: np.ndarray, y: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""Process two tensors (vectors) to have the same size (support)
Args:
x (np.ndarray): vector 1
y (np.ndarray): vector 2
Returns:
Tuple[np.ndarray, np.ndarray]: processed vectors
"""
support_size = max(len(x), len(y))
if len(x) < len(y):
x = np.hstack((x, [0.0] * (support_size - len(x))))
elif len(y) < len(x):
y = np.hstack((y, [0.0] * (support_size - len(y))))
return x, y