Skip to content

Quantum Graph API

The quantum graph module provides a complete implementation of k-means clustering on metric graphs where points can exist anywhere on edges.

Quantum Graph Space

The main class representing a quantum graph as a metric space.

kmeanssa_ng.quantum_graph.space.QuantumGraph

QuantumGraph(incoming_graph_data=None, **attr)

Bases: Graph, Space

A quantum graph is a metric graph where points can lie on edges.

This class extends NetworkX Graph to provide: - Distance computation between points on edges - Sampling of random points and centers - k-means clustering support

Each edge should have a 'length' attribute representing its metric length. Nodes and edges can have 'weight' and 'distribution' attributes for sampling.

Attributes: diameter: The diameter of the graph (max distance between nodes). node_position: Layout positions for visualization.

Example:

graph = QuantumGraph()
graph.add_edge(0, 1, length=1.0)
graph.add_edge(1, 2, length=2.0)
graph.precomputing()  # Cache pairwise distances

points = graph.sample_points(100)
centers = graph.sample_centers(5)

Initialize a quantum graph.

Args: incoming_graph_data: Input graph data (see networkx.Graph). **attr: Additional graph attributes.

Source code in src/kmeanssa_ng/quantum_graph/space.py
def __init__(self, incoming_graph_data=None, **attr) -> None:
    """Initialize a quantum graph.

    Args:
        incoming_graph_data: Input graph data (see networkx.Graph).
        **attr: Additional graph attributes.
    """
    super().__init__(incoming_graph_data, **attr)
    self._pairwise_nodes_distance: dict[int, dict[int, float]] | None = None
    self._diameter: float = 0.0
    self._node_position: dict | None = None

distance

distance(p1: QGPoint, p2: QGPoint) -> float

Compute distance between two points on the graph.

Args: p1: First point. p2: Second point.

Returns: Geodesic distance.

Source code in src/kmeanssa_ng/quantum_graph/space.py
def distance(self, p1: QGPoint, p2: QGPoint) -> float:
    """Compute distance between two points on the graph.

    Args:
        p1: First point.
        p2: Second point.

    Returns:
        Geodesic distance.
    """
    return self.quantum_path(p1, p2)["distance"]

sample_points

sample_points(n: int, where: str = 'Node') -> list[QGPoint]

Sample n random points from the graph.

Args: n: Number of points to sample. where: Sampling mode ("Node" or "Edge").

Returns: List of n sampled points.

Source code in src/kmeanssa_ng/quantum_graph/space.py
def sample_points(self, n: int, where: str = "Node") -> list[QGPoint]:
    """Sample n random points from the graph.

    Args:
        n: Number of points to sample.
        where: Sampling mode ("Node" or "Edge").

    Returns:
        List of n sampled points.
    """
    nx.set_node_attributes(self, 0, "nb_obs")
    return [self._sample_point(where) for _ in range(n)]

sample_centers

sample_centers(
    k: int, where: str = "Node"
) -> list[QGCenter]

Sample k random centers.

Args: k: Number of centers to sample. where: Sampling mode ("Node" or "Edge").

Returns: List of k sampled centers.

Source code in src/kmeanssa_ng/quantum_graph/space.py
def sample_centers(self, k: int, where: str = "Node") -> list[QGCenter]:
    """Sample k random centers.

    Args:
        k: Number of centers to sample.
        where: Sampling mode ("Node" or "Edge").

    Returns:
        List of k sampled centers.
    """
    return [self._sample_center(where) for _ in range(k)]

precomputing

precomputing() -> None

Precompute and cache all pairwise node distances.

This significantly speeds up distance queries. Should be called once after graph construction.

Raises: ValueError: If graph is not connected or has invalid edge lengths.

Source code in src/kmeanssa_ng/quantum_graph/space.py
def precomputing(self) -> None:
    """Precompute and cache all pairwise node distances.

    This significantly speeds up distance queries.
    Should be called once after graph construction.

    Raises:
        ValueError: If graph is not connected or has invalid edge lengths.
    """
    # Validate edge lengths first
    self.validate_edge_lengths()

    # Check connectivity
    if self.number_of_nodes() > 0 and not nx.is_connected(self):
        num_components = nx.number_connected_components(self)
        raise ValueError(
            f"Graph must be connected for distance precomputing. "
            f"Found {num_components} connected components."
        )

    if self._pairwise_nodes_distance is None:
        self._pairwise_nodes_distance = dict(
            nx.all_pairs_dijkstra_path_length(self, weight="length")
        )

compute_clusters

compute_clusters(centers: list[QGCenter]) -> None

Assign each node to its nearest center.

Updates node 'cluster' attribute.

Args: centers: List of cluster centers.

Source code in src/kmeanssa_ng/quantum_graph/space.py
def compute_clusters(self, centers: list[QGCenter]) -> None:
    """Assign each node to its nearest center.

    Updates node 'cluster' attribute.

    Args:
        centers: List of cluster centers.
    """
    for node in self.nodes:
        distances = np.array(
            [self.distance_between_nodes(center.edge[0], node) for center in centers]
        )
        nx.set_node_attributes(self, {node: {"cluster": np.argmin(distances)}})

Points and Centers

Classes representing points and centers on quantum graphs.

kmeanssa_ng.quantum_graph.point.QGPoint

QGPoint(
    quantum_graph: QuantumGraph,
    edge: tuple[int, int],
    position: float,
)

Bases: Point

A point on a quantum graph.

A quantum graph point is located on an edge at a specific position. The edge is represented as a tuple (node1, node2) and the position is the distance from node1 along the edge.

Attributes: space: The quantum graph this point belongs to. edge: The edge (node1, node2) containing this point. position: Distance from node1 along the edge.

Example:

graph = QuantumGraph(...)
point = QGPoint(graph, edge=(0, 1), position=0.5)

Initialize a point on a quantum graph.

Args: quantum_graph: The quantum graph containing this point. edge: Tuple (node1, node2) representing the edge. position: Distance from node1 along the edge.

Raises: ValueError: If quantum_graph is None, edge doesn't exist in graph, or position is outside [0, edge_length].

Source code in src/kmeanssa_ng/quantum_graph/point.py
def __init__(
    self,
    quantum_graph: QuantumGraph,
    edge: tuple[int, int],
    position: float,
) -> None:
    """Initialize a point on a quantum graph.

    Args:
        quantum_graph: The quantum graph containing this point.
        edge: Tuple (node1, node2) representing the edge.
        position: Distance from node1 along the edge.

    Raises:
        ValueError: If quantum_graph is None, edge doesn't exist in graph,
            or position is outside [0, edge_length].
    """
    if quantum_graph is None:
        raise ValueError("quantum_graph cannot be None")

    if not isinstance(edge, tuple) or len(edge) != 2:
        raise ValueError(f"edge must be a tuple of two nodes, got {edge}")

    # Check edge exists in graph (allow self-loops)
    if edge[0] != edge[1] and edge not in quantum_graph.edges:
        # Try reversed edge
        if (edge[1], edge[0]) not in quantum_graph.edges:
            raise ValueError(f"Edge {edge} does not exist in the graph")

    self._quantum_graph = quantum_graph
    self._edge = edge

    # Validate and set position
    self._validate_and_set_position(position)

__str__

__str__() -> str

String representation of the point.

Source code in src/kmeanssa_ng/quantum_graph/point.py
def __str__(self) -> str:
    """String representation of the point."""
    name = f" '{self.space.name}'" if hasattr(self.space, "name") and self.space.name else ""
    return f"QGPoint on{name} edge {self.edge} at position {self.position:.3f}"

__repr__

__repr__() -> str

Detailed string representation.

Source code in src/kmeanssa_ng/quantum_graph/point.py
def __repr__(self) -> str:
    """Detailed string representation."""
    return f"QGPoint(edge={self.edge}, position={self.position})"

kmeanssa_ng.quantum_graph.center.QGCenter

QGCenter(point: QGPoint, rng: Generator | None = None)

Bases: QGPoint, Center

A movable cluster center on a quantum graph.

Centers can perform: - Brownian motion: Random walk for exploration - Drift: Directed movement toward target points

Attributes: space: The quantum graph this center belongs to. edge: The edge containing this center. position: Position along the edge.

Example:

graph = QuantumGraph(...)
point = QGPoint(graph, edge=(0, 1), position=0.5)
center = QGCenter(point)
center.brownian_motion(0.1)
center.drift(target_point, 0.5)

Initialize a center from a point.

Args: point: The initial point location. rng: Random number generator. If None, creates a new default_rng().

Source code in src/kmeanssa_ng/quantum_graph/center.py
def __init__(
    self,
    point: QGPoint,
    rng: Generator | None = None,
) -> None:
    """Initialize a center from a point.

    Args:
        point: The initial point location.
        rng: Random number generator. If None, creates a new default_rng().
    """
    super().__init__(point.space, point.edge, point.position)
    self._rng = rng if rng is not None else default_rng()

brownian_motion

brownian_motion(time_to_travel: float) -> None

Perform Brownian motion on the quantum graph.

The center performs a random walk with step size proportional to sqrt(time_to_travel).

Args: time_to_travel: Time parameter (distance ~ sqrt(time)).

Raises: ValueError: If time_to_travel is negative or not numeric.

Source code in src/kmeanssa_ng/quantum_graph/center.py
def brownian_motion(self, time_to_travel: float) -> None:
    """Perform Brownian motion on the quantum graph.

    The center performs a random walk with step size proportional
    to sqrt(time_to_travel).

    Args:
        time_to_travel: Time parameter (distance ~ sqrt(time)).

    Raises:
        ValueError: If time_to_travel is negative or not numeric.
    """
    # Validate time_to_travel
    try:
        time_float = float(time_to_travel)
    except (TypeError, ValueError) as e:
        raise ValueError(
            f"time_to_travel must be a number, got {type(time_to_travel).__name__}"
        ) from e

    if time_float < 0:
        raise ValueError(f"time_to_travel must be non-negative, got {time_float}")

    dist_to_travel = np.sqrt(time_float) * self._rng.standard_normal()
    edge_length = self.space.get_edge_length(*self.edge)

    forward = dist_to_travel > 0
    if forward:
        next_node = self.edge[1]
        dist_to_next_node = edge_length - self.position
    else:
        next_node = self.edge[0]
        dist_to_next_node = self.position

    remaining_dist = abs(dist_to_travel)

    # Traverse edges if motion exceeds current edge
    while remaining_dist >= dist_to_next_node:
        remaining_dist -= dist_to_next_node
        cur_node = next_node
        next_node = rd.choice(list(self.space.neighbors(cur_node)))
        self.edge = (cur_node, next_node)
        edge_length = self.space.get_edge_length(*self.edge)
        self.position = 0
        dist_to_next_node = edge_length

    # Final position update
    self.position += remaining_dist if forward else -remaining_dist

__repr__

__repr__() -> str

Detailed string representation.

Source code in src/kmeanssa_ng/quantum_graph/center.py
def __repr__(self) -> str:
    """Detailed string representation."""
    return f"QGCenter(edge={self.edge}, position={self.position:.3f})"

Quantum Graph Simulated Annealing

Specialized simulated annealing implementation with quantum graph-specific features.

kmeanssa_ng.quantum_graph.qg_simulated_annealing.QGSimulatedAnnealing

QGSimulatedAnnealing(
    observations: list[Point],
    k: int,
    lambda_param: int = 1,
    beta: float = 1.0,
    step_size: float = 0.1,
)

Bases: SimulatedAnnealing

Simulated annealing specialized for quantum graphs.

This class extends the core SimulatedAnnealing with quantum graph specific methods that rely on node proximity information.

Example:

from kmeanssa_ng import generate_sbm, QGSimulatedAnnealing

graph = generate_sbm(sizes=[50, 50], p=[[0.7, 0.1], [0.1, 0.7]])
points = graph.sample_points(100)

sa = QGSimulatedAnnealing(points, k=2)
node_ids = sa.run_for_kmeans(robust_prop=0.1)

Source code in src/kmeanssa_ng/core/simulated_annealing.py
def __init__(
    self,
    observations: list[Point],
    k: int,
    lambda_param: int = 1,
    beta: float = 1.0,
    step_size: float = 0.1,
) -> None:
    """Initialize the simulated annealing algorithm.

    Args:
        observations: List of points to cluster, all in the same metric space.
        k: Number of clusters.
        lambda_param: Intensity parameter for Poisson process (must be > 0).
        beta: Inverse temperature parameter (must be > 0, higher = faster convergence).
        step_size: Time step for updating centers (must be > 0).

    Raises:
        ValueError: If observations is empty, k <= 0, points are in different spaces,
            or hyperparameters are invalid.
    """
    if not observations:
        raise ValueError("Observations must be a non-empty list of points.")
    if k <= 0:
        raise ValueError("Number of clusters 'k' must be greater than zero.")
    if any(obs.space != observations[0].space for obs in observations):
        raise ValueError("All observations must belong to the same metric space.")

    # Validate lambda_param
    try:
        lambda_float = float(lambda_param)
    except (TypeError, ValueError) as e:
        raise ValueError(
            f"lambda_param must be a number, got {type(lambda_param).__name__}"
        ) from e
    if lambda_float <= 0:
        raise ValueError(f"lambda_param must be positive, got {lambda_float}")

    # Validate beta
    try:
        beta_float = float(beta)
    except (TypeError, ValueError) as e:
        raise ValueError(f"beta must be a number, got {type(beta).__name__}") from e
    if beta_float <= 0:
        raise ValueError(f"beta must be positive, got {beta_float}")

    # Validate step_size
    try:
        step_size_float = float(step_size)
    except (TypeError, ValueError) as e:
        raise ValueError(f"step_size must be a number, got {type(step_size).__name__}") from e
    if step_size_float <= 0:
        raise ValueError(f"step_size must be positive, got {step_size_float}")

    self._space = observations[0].space
    self._observations = observations.copy()
    self._k = k
    self._lambda = lambda_float
    self._beta = beta_float
    self._step_size = step_size_float

    rd.shuffle(self._observations)
    self._centers: list[Center] = []

run_for_kmeans

run_for_kmeans(robust_prop: float = 0.0) -> list

Run simulated annealing and return most frequent closest nodes for each center.

This method is specific to quantum graphs and returns the node IDs that appear most frequently as closest nodes for each center during the robustification period.

Args: robust_prop: Proportion of final iterations for robustification.

Returns: List of k node IDs (most frequent for each center).

Raises: ValueError: If robust_prop not in [0, 1].

Source code in src/kmeanssa_ng/quantum_graph/qg_simulated_annealing.py
def run_for_kmeans(self, robust_prop: float = 0.0) -> list:
    """Run simulated annealing and return most frequent closest nodes for each center.

    This method is specific to quantum graphs and returns the node IDs
    that appear most frequently as closest nodes for each center during
    the robustification period.

    Args:
        robust_prop: Proportion of final iterations for robustification.

    Returns:
        List of k node IDs (most frequent for each center).

    Raises:
        ValueError: If robust_prop not in [0, 1].
    """
    if robust_prop < 0 or robust_prop > 1:
        raise ValueError("The proportion must be in [0,1]")

    i0 = int(np.floor((self.n - 1) * (1 - robust_prop)))
    self._centers = self._initialize_kpp_centers()

    # Use object dtype to support both int and string node IDs
    central_nodes = np.empty((self.n - i0, self._k), dtype=object)
    times = self._initialize_times(self.n)
    time = 0.0

    for i, point in enumerate(self._observations):
        T = times[i]

        while time <= T - self._step_size:
            h = min(time + self._step_size, T) - time
            prop = min(h * self._beta * np.log(1 + time), 1)

            closest_center = None
            min_distance = float("inf")

            for center in self._centers:
                center.brownian_motion(h)
                dist = self.space.distance(center, point)
                if dist < min_distance:
                    closest_center, min_distance = center, dist

            if closest_center is not None:
                closest_center.drift(point, prop)

            time += h

        if i >= i0:
            for m, center in enumerate(self._centers):
                central_nodes[i - i0, m] = center._closest_node()

    robust_nodes = []
    for m in range(central_nodes.shape[1]):
        nodes_list = list(central_nodes[:, m])
        robust_nodes.append(Counter(nodes_list).most_common(1)[0][0])

    return robust_nodes

run_for_mean

run_for_mean(robust_prop: float = 0.0) -> int

Run simulated annealing for k=1 and return most frequent closest node.

This is a special case for computing a robust mean (single cluster) on a quantum graph. The result is the node ID that appears most frequently as the closest node during the robustification period.

Args: robust_prop: Proportion of final iterations for robustification.

Returns: Index of the most frequent closest node during robustification period.

Raises: ValueError: If robust_prop not in [0, 1] or k != 1.

Source code in src/kmeanssa_ng/quantum_graph/qg_simulated_annealing.py
def run_for_mean(self, robust_prop: float = 0.0) -> int:
    """Run simulated annealing for k=1 and return most frequent closest node.

    This is a special case for computing a robust mean (single cluster)
    on a quantum graph. The result is the node ID that appears most
    frequently as the closest node during the robustification period.

    Args:
        robust_prop: Proportion of final iterations for robustification.

    Returns:
        Index of the most frequent closest node during robustification period.

    Raises:
        ValueError: If robust_prop not in [0, 1] or k != 1.
    """
    if robust_prop < 0 or robust_prop > 1:
        raise ValueError("The proportion must be in [0,1]")
    if self._k != 1:
        raise ValueError("run_for_mean only works with k=1")

    i0 = int(np.floor((self.n - 1) * (1 - robust_prop)))
    self._centers = self._initialize_kpp_centers()

    central_nodes = []
    times = self._initialize_times(self.n)
    time = 0.0

    for i, point in enumerate(self._observations):
        T = times[i]

        while time <= T - self._step_size:
            h = min(time + self._step_size, T) - time
            prop = min(h * self._beta * np.log(1 + time), 1)

            closest_center = None
            min_distance = float("inf")

            for center in self._centers:
                center.brownian_motion(h)
                dist = self.space.distance(center, point)
                if dist < min_distance:
                    closest_center, min_distance = center, dist

            if closest_center is not None:
                closest_center.drift(point, prop)

            time += h

        if i >= i0:
            central_nodes.append(self._centers[0]._closest_node())

    return Counter(central_nodes).most_common(1)[0][0]