Source code for ccsd.src.evaluation.stats

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""stats.py: code for computing statistics of graphs.

Adapted from Jo, J. & al (2022)
"""

import concurrent.futures
import os
import random
import subprocess as sp
from datetime import datetime
from time import perf_counter
from typing import Callable, Dict, List, Optional, Tuple

import networkx as nx
import numpy as np
import torch
from easydict import EasyDict
from scipy.linalg import eigvalsh

from ccsd.src.evaluation.mmd import (
    compute_mmd,
    compute_nspdk_mmd,
    gaussian,
    gaussian_emd,
    process_tensor,
)
from ccsd.src.utils.graph_utils import adjs_to_graphs

# Global variables
PRINT_TIME = False  # whether to print the time for computing statistics


[docs] def degree_worker(G: nx.Graph) -> np.ndarray: """Function for computing the degree histogram of a graph. Returns: np.ndarray: degree histogram """ return np.array(nx.degree_histogram(G))
[docs] def add_tensor(x: np.ndarray, y: np.ndarray) -> np.ndarray: """Function for extending the dimension of two tensors to make them having the same support and add them together. Args: x (np.ndarray): vector 1 y (np.ndarray): vector 2 Returns: np.ndarray: sum of vector 1 and vector 2 """ x, y = process_tensor(x, y) return x + y
[docs] def degree_stats( graph_ref_list: List[nx.Graph], graph_pred_list: List[nx.Graph], kernel: Callable[[np.ndarray, np.ndarray], float] = gaussian_emd, is_parallel: bool = True, max_workers: Optional[int] = None, debug_mode: bool = False, ) -> float: """Compute the MMD distance between the degree distributions of two unordered sets of graphs. Args: graph_ref_list (List[nx.Graph]): reference list of networkx graphs to be evaluated graph_pred_list (List[nx.Graph]): target list of networkx graphs to be evaluated kernel (Callable[[np.ndarray, np.ndarray], float], optional): kernel function. Defaults to gaussian_emd. is_parallel (bool, optional): if True, do parallel computing. Defaults to True. max_workers (Optional[int], optional): number of workers (if is_parallel). Defaults to None. debug_mode (bool, optional): whether or not we print debug info for parallel computing. Defaults to False. Returns: float: MMD distance """ sample_ref = [] sample_pred = [] # Remove empty graphs if generated graph_pred_list_remove_empty = [ G for G in graph_pred_list if not G.number_of_nodes() == 0 ] prev = datetime.now() if is_parallel: if debug_mode: print("Start parallel computing for degree mmd reference objects") with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: results = executor.map(degree_worker, graph_ref_list) try: for deg_hist in results: sample_ref.append(deg_hist) except Exception as e: raise e if debug_mode: print("Start parallel computing for degree mmd predicted objects") with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: results = executor.map(degree_worker, graph_pred_list_remove_empty) try: for deg_hist in results: sample_pred.append(deg_hist) except Exception as e: raise e else: for i in range(len(graph_ref_list)): degree_temp = np.array(nx.degree_histogram(graph_ref_list[i])) sample_ref.append(degree_temp) for i in range(len(graph_pred_list_remove_empty)): degree_temp = np.array(nx.degree_histogram(graph_pred_list_remove_empty[i])) sample_pred.append(degree_temp) # Compute MMD mmd_dist = compute_mmd(sample_ref, sample_pred, kernel=kernel) elapsed = datetime.now() - prev if PRINT_TIME: print("Time computing degree mmd: ", elapsed) return mmd_dist
[docs] def spectral_worker(G: nx.Graph) -> np.ndarray: """Function for computing the spectral density of a graph. Args: G (nx.Graph): input graph Returns: np.ndarray: spectral density """ eigs = eigvalsh(nx.normalized_laplacian_matrix(G).todense()) spectral_pmf, _ = np.histogram(eigs, bins=200, range=(-1e-5, 2), density=False) spectral_pmf = spectral_pmf / spectral_pmf.sum() return spectral_pmf
[docs] def spectral_stats( graph_ref_list: List[nx.Graph], graph_pred_list: List[nx.Graph], kernel: Callable[[np.ndarray, np.ndarray], float] = gaussian_emd, is_parallel: bool = True, max_workers: Optional[int] = None, debug_mode: bool = False, ) -> np.ndarray: """Compute the MMD distance between the spectral densities of two unordered sets of graphs. Args: graph_ref_list (List[nx.Graph]): reference list of networkx graphs to be evaluated graph_pred_list (List[nx.Graph]): target list of networkx graphs to be evaluated kernel (Callable[[np.ndarray, np.ndarray], float], optional): kernel function. Defaults to gaussian_emd. is_parallel (bool, optional): if True, do parallel computing. Defaults to True. max_workers (Optional[int], optional): number of workers (if is_parallel). Defaults to None. debug_mode (bool, optional): whether or not we print debug info for parallel computing. Defaults to False. Returns: np.ndarray: spectral distance """ sample_ref = [] sample_pred = [] # Remove empty graphs if generated graph_pred_list_remove_empty = [ G for G in graph_pred_list if not G.number_of_nodes() == 0 ] prev = datetime.now() if is_parallel: if debug_mode: print("Start parallel computing for spectral mmd reference objects") with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: results = executor.map(spectral_worker, graph_ref_list) try: for spectral_density in results: sample_ref.append(spectral_density) except Exception as e: raise e if debug_mode: print("Start parallel computing for spectral mmd predicted objects") with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: results = executor.map(spectral_worker, graph_pred_list_remove_empty) try: for spectral_density in results: sample_pred.append(spectral_density) except Exception as e: raise e else: for i in range(len(graph_ref_list)): spectral_temp = spectral_worker(graph_ref_list[i]) sample_ref.append(spectral_temp) for i in range(len(graph_pred_list_remove_empty)): spectral_temp = spectral_worker(graph_pred_list_remove_empty[i]) sample_pred.append(spectral_temp) mmd_dist = compute_mmd(sample_ref, sample_pred, kernel=kernel) elapsed = datetime.now() - prev if PRINT_TIME: print("Time computing degree mmd: ", elapsed) return mmd_dist
[docs] def clustering_worker(param: Tuple[nx.Graph, int]) -> np.ndarray: """Function for computing the histogram of clustering coefficient of a graph. Args: param (Tuple[nx.Graph, int]): input graph and number of bins Returns: np.ndarray: histogram of clustering coefficient """ G, bins = param clustering_coeffs_list = list(nx.clustering(G).values()) hist, _ = np.histogram( clustering_coeffs_list, bins=bins, range=(0.0, 1.0), density=False ) return hist
[docs] def clustering_stats( graph_ref_list: List[nx.Graph], graph_pred_list: List[nx.Graph], kernel: Callable[[np.ndarray, np.ndarray], float] = gaussian_emd, bins: int = 100, is_parallel: bool = True, max_workers: Optional[int] = None, debug_mode: bool = False, ) -> np.ndarray: """Compute the MMD distance between the clustering coefficients of two unordered sets of graphs. For unweighted graphs, the clustering coefficient of a node u is the fraction of possible triangles through that node that exist. Args: graph_ref_list (List[nx.Graph]): reference list of networkx graphs to be evaluated graph_pred_list (List[nx.Graph]): target list of networkx graphs to be evaluated kernel (Callable[[np.ndarray, np.ndarray], float], optional): kernel function. Defaults to gaussian_emd. bins (int, optional): number of bins for the histogram. Defaults to 100. is_parallel (bool, optional): if True, do parallel computing. Defaults to True. max_workers (Optional[int], optional): number of workers (if is_parallel). Defaults to None. debug_mode (bool, optional): whether or not we print debug info for parallel computing. Defaults to False. Returns: float: mmd distance """ sample_ref = [] sample_pred = [] # Remove empty graphs if generated graph_pred_list_remove_empty = [ G for G in graph_pred_list if not G.number_of_nodes() == 0 ] prev = datetime.now() if is_parallel: if debug_mode: print("Start parallel computing for clustering mmd reference objects") with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: results = executor.map( clustering_worker, [(G, bins) for G in graph_ref_list] ) try: for clustering_hist in results: sample_ref.append(clustering_hist) except Exception as e: raise e if debug_mode: print("Start parallel computing for clustering mmd predicted objects") with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: results = executor.map( clustering_worker, [(G, bins) for G in graph_pred_list_remove_empty] ) try: for clustering_hist in results: sample_pred.append(clustering_hist) except Exception as e: raise e else: for i in range(len(graph_ref_list)): clustering_coeffs_list = list(nx.clustering(graph_ref_list[i]).values()) hist, _ = np.histogram( clustering_coeffs_list, bins=bins, range=(0.0, 1.0), density=False ) sample_ref.append(hist) for i in range(len(graph_pred_list_remove_empty)): clustering_coeffs_list = list( nx.clustering(graph_pred_list_remove_empty[i]).values() ) hist, _ = np.histogram( clustering_coeffs_list, bins=bins, range=(0.0, 1.0), density=False ) sample_pred.append(hist) try: mmd_dist = compute_mmd( sample_ref, sample_pred, kernel=kernel, sigma=1.0 / 10, distance_scaling=bins, ) except: mmd_dist = compute_mmd(sample_ref, sample_pred, kernel=kernel, sigma=1.0 / 10) elapsed = datetime.now() - prev if PRINT_TIME: print("Time computing clustering mmd: ", elapsed) return mmd_dist
# Maps motif/orbit name string to its corresponding list of indices from orca output motif_to_indices = { "3path": [1, 2], "4cycle": [8], } # Format of the start of the orbit counts in orca output COUNT_START_STR = "orbit counts: \n"
[docs] def edge_list_reindexed(G: nx.Graph) -> List[Tuple[int, int]]: """Reindex the nodes of a graph to be contiguous integers starting from 0. Args: G (nx.Graph): input graph Returns: List[Tuple[int, int]]: list of edges (index_u, index_v) """ idx = 0 id2idx = dict() for u in G.nodes(): id2idx[str(u)] = idx idx += 1 edges = [] for u, v in G.edges(): edges.append((id2idx[str(u)], id2idx[str(v)])) return edges
[docs] def orca(graph: nx.Graph, orca_dir: str) -> np.ndarray: """Compute the orbit counts of a graph using orca. Args: graph (nx.Graph): input graph orca_dir (str): path to the orca directory where the executable are Returns: np.ndarray: orbit counts """ tmp_file_path = os.path.join(orca_dir, f"tmp-{random.random():.4f}.txt") f = open(tmp_file_path, "w") f.write(str(graph.number_of_nodes()) + " " + str(graph.number_of_edges()) + "\n") for u, v in edge_list_reindexed(graph): f.write(str(u) + " " + str(v) + "\n") f.close() output = sp.check_output( [os.path.join(orca_dir, "orca"), "node", "4", tmp_file_path, "std"] ) output = output.decode("utf8").strip() idx = output.find(COUNT_START_STR) + len(COUNT_START_STR) output = output[idx:] node_orbit_counts = np.array( [ list(map(int, node_cnts.strip().split(" "))) for node_cnts in output.strip("\n").split("\n") ] ) try: os.remove(tmp_file_path) except OSError: pass return node_orbit_counts
[docs] def orbit_stats_all( graph_ref_list: List[nx.Graph], graph_pred_list: List[nx.Graph], kernel: Callable[[np.ndarray, np.ndarray], float] = gaussian, folder: str = "./", ) -> float: """Compute the MMD distance between the orbits of two unordered sets of graphs. Args: graph_ref_list (List[nx.Graph]): reference list of networkx graphs to be evaluated graph_pred_list (List[nx.Graph]): target list of networkx graphs to be evaluated kernel (Callable[[np.ndarray, np.ndarray], float], optional): kernel function. Defaults to gaussian. folder (str, optional): path to the main folder where the ccsd/src/evaluation folders are to locate the orca executable. Defaults to "./". Returns: float: mmd distance """ total_counts_ref = [] total_counts_pred = [] prev = datetime.now() orca_dir = os.path.join( *[folder, "ccsd", "src", "evaluation", "orca"] ) # path to the orca dir for G in graph_ref_list: try: orbit_counts = orca(G, orca_dir) except Exception as e: print(e) continue orbit_counts_graph = np.sum(orbit_counts, axis=0) / G.number_of_nodes() total_counts_ref.append(orbit_counts_graph) for G in graph_pred_list: try: orbit_counts = orca(G, orca_dir) except: print("orca failed") continue orbit_counts_graph = np.sum(orbit_counts, axis=0) / G.number_of_nodes() total_counts_pred.append(orbit_counts_graph) total_counts_ref = np.array(total_counts_ref) total_counts_pred = np.array(total_counts_pred) mmd_dist = compute_mmd( total_counts_ref, total_counts_pred, kernel=kernel, is_hist=False, sigma=30.0 ) elapsed = datetime.now() - prev if PRINT_TIME: print("Time computing orbit mmd: ", elapsed) return mmd_dist
[docs] def nspdk_stats( graph_ref_list: List[nx.Graph], graph_pred_list: List[nx.Graph] ) -> float: """Compute the MMD distance between the NSPDK kernel of two unordered sets of graphs. Adapted from https://github.com/idea-iitd/graphgen/blob/master/metrics/stats.py Args: graph_ref_list (List[nx.Graph]): reference list of networkx graphs to be evaluated graph_pred_list (nx.Graph): target list of networkx graphs to be evaluated Returns: float: mmd distance """ graph_pred_list_remove_empty = [ G for G in graph_pred_list if not G.number_of_nodes() == 0 ] prev = datetime.now() mmd_dist = compute_nspdk_mmd( graph_ref_list, graph_pred_list_remove_empty, metric="nspdk", is_hist=False, n_jobs=20, ) elapsed = datetime.now() - prev if PRINT_TIME: print("Time computing degree mmd: ", elapsed) return mmd_dist
# Dictionary mapping method names to functions to compute different MMD distances METHOD_NAME_TO_FUNC = { "degree": degree_stats, "cluster": clustering_stats, "orbit": orbit_stats_all, "spectral": spectral_stats, "nspdk": nspdk_stats, }
[docs] def eval_graph_list( graph_ref_list: List[nx.Graph], graph_pred_list: List[nx.Graph], methods: Optional[List[str]] = None, kernels: Optional[ Dict[ str, Callable[[np.ndarray, np.ndarray], float], ] ] = None, folder: str = "./", ) -> Dict[str, float]: """Evaluate generated generic graphs against a reference set of graphs using a set of methods and their corresponding kernels. Args: graph_ref_list (List[nx.Graph]): reference list of networkx graphs to be evaluated graph_pred_list (List[nx.Graph]): target list of networkx graphs to be evaluated methods (Optional[List[str]], optional): methods to be evaluated. Defaults to None. kernels (Optional[Dict[str, Callable[[np.ndarray, np.ndarray], float]]], optional): kernels to be used for each methods. Defaults to None. folder (str, optional): path to the main folder where the ccsd/src/evaluation folders are to locate the orca executable. Defaults to "./". Returns: Dict[str, float]: dictionary mapping method names to their corresponding scores """ if ( methods is None ): # by default, evaluate the methods ["degree", "cluster", "orbit"] methods = ["degree", "cluster", "orbit"] results = {} for method_id, method in enumerate(methods): print(f"Evaluating method: {method} ({method_id+1}/{len(methods)}) ...") top = perf_counter() if ( method == "nspdk" ): # nspdk requires a different function signature as there is no kernel as input results[method] = METHOD_NAME_TO_FUNC[method]( graph_ref_list, graph_pred_list ) elif ( method == "orbit" ): # orbit requires a different function signature with the folder provided results[method] = round( METHOD_NAME_TO_FUNC[method]( graph_ref_list, graph_pred_list, kernels[method], folder=folder ), 6, ) else: results[method] = round( METHOD_NAME_TO_FUNC[method]( graph_ref_list, graph_pred_list, kernels[method] ), 6, ) print( "\033[91m" + f"{method:9s}" + "\033[0m" + " : " + "\033[94m" + f"{results[method]:.6f}" + "\033[0m" ) print(f"Time elapsed: {round(perf_counter() - top, 3)}s") return results
[docs] def eval_torch_batch( ref_batch: torch.Tensor, pred_batch: torch.Tensor, methods: Optional[List[str]] = None, folder: str = "./", ) -> Dict[str, float]: """Evaluate generated generic graphs against a reference set of graphs using a set of methods and their corresponding kernels, with the input graphs in torch.Tensor format (adjacency matrices). Args: ref_batch (torch.Tensor): reference batch of adjacency matrices pred_batch (torch.Tensor): target batch of adjacency matrices methods (Optional[List[str]], optional): methods to be evaluated. Defaults to None. folder (str, optional): path to the main folder where the ccsd/src/evaluation folders are to locate the orca executable. Defaults to "./". Returns: Dict[str, float]: dictionary mapping method names to their corresponding scores """ graph_ref_list = adjs_to_graphs(ref_batch.detach().cpu().numpy()) graph_pred_list = adjs_to_graphs(pred_batch.detach().cpu().numpy()) results = eval_graph_list( graph_ref_list, graph_pred_list, methods=methods, folder=folder ) return results