Source code for tt_sketch.tensor

"""Implements various types of tensors"""
from __future__ import annotations

from abc import ABC, abstractmethod, abstractproperty
from copy import deepcopy
from dataclasses import dataclass
from functools import cached_property
from typing import (
    Iterable,
    List,
    Optional,
    Sequence,
    Dict,
    Tuple,
    Union,
    TypeVar,
    Generic,
)
import warnings

import numpy as np
from numpy.random import SeedSequence
import numpy.typing as npt

from tt_sketch.utils import ArrayList, TTRank, process_tt_rank, random_normal

TType = TypeVar("TType", bound="Tensor")


[docs]class Tensor(ABC): """Abstract base class for tensors.""" #: The shape of the tensor shape: Tuple[int, ...] @abstractproperty def T(self: TType) -> TType: """Transpose of the tensor. If a tensor has shape (n1,n2,...,nd), then transpose of the tensor has shape (nd,...,n2,n1). """ @abstractproperty def size(self) -> int: """Number of floating point elements used to store the tensor."""
[docs] @abstractmethod def to_numpy(self) -> npt.NDArray[np.float64]: """Converts the tensor to a (dense) numpy array of same shape."""
[docs] def error( self: TType, other: TType, relative: bool = False, rmse: bool = False, fast: bool = False, ) -> float: """L2 error of the tensor. If ``fast=True``, then error is computed using inner product formula. This is not numerically stable, and gives inaccurate results below relative errors of around 1e-8. """ if isinstance(other, np.ndarray): other = DenseTensor(other) other_norm = other.norm() if fast: self_norm = self.norm() dot = self.dot(other) norm_sum = self_norm**2 + other_norm**2 error = np.sqrt(norm_sum) * np.sqrt(np.abs(1 - 2 * dot / norm_sum)) else: error = np.linalg.norm(self.to_numpy() - other.to_numpy()) if relative: if other_norm == 0: return np.inf error /= other_norm if rmse: error /= np.sqrt(np.prod(self.shape)) return error
@property def ndim(self) -> int: """Number of modes of the tensor.""" return len(self.shape)
[docs] def dense(self) -> DenseTensor: """Converts to ``DenseTensor`` object""" dense_tensor = self.to_numpy() return DenseTensor(dense_tensor)
def __add__(self, other) -> TensorSum: """Addition of two tensors produces ``TensorSum`` object""" if isinstance(other, TensorSum): if not isinstance(self, TensorSum): return TensorSum([self] + other.tensors) else: return TensorSum(self.tensors + other.tensors) elif isinstance(self, TensorSum): return TensorSum(self.tensors + [other]) else: return TensorSum([self, other]) @abstractmethod def __mul__(self: TType, other: float) -> TType: """Multiplication by a scalar""" def __rmul__(self: TType, other: float) -> TType: return self.__mul__(other) def __truediv__(self, other: float): return self.__mul__(1 / other) def __sub__(self, other: Tensor) -> Tensor: return self + (-other) def __neg__(self) -> Tensor: return self * -1
[docs] def dot(self: TType, other: TType, reverse=False) -> float: """Dot product of two tensors""" if isinstance(other, TensorSum): return other.dot(self) if not reverse: # try first to see if other can dot self return other.dot(self, reverse=True) self_np = self.to_numpy().reshape(-1) other_np = other.to_numpy().reshape(-1) return np.dot(self_np, other_np)
[docs] def norm(self) -> float: """L2 norm of the tensor""" # np.abs because dot can be negative due to numerical errors return np.sqrt(np.abs(self.dot(self)))
def __matmul__(self: TType, other: TType) -> float: return self.dot(other)
[docs]class DenseTensor(Tensor): shape: Tuple[int, ...] data: npt.NDArray[np.float64] def __init__(self, data: npt.NDArray) -> None: self.shape = data.shape self.data = data
[docs] def to_sparse(self) -> SparseTensor: """ Converts to a sparse tensor. This is mainly used for testing sketching algorithms, as there is otherwise no reason to convert a dense tensor to a sparse tensor. """ X = self.data n_dims = len(X.shape) inds = np.indices(X.shape).reshape(n_dims, -1) entries = X.reshape(-1) return SparseTensor(X.shape, inds, entries)
@property def T(self) -> DenseTensor: permutation = tuple(range(len(self.shape))[::-1]) new_data = np.transpose(self.data, permutation) return self.__class__(new_data) @property def size(self) -> int: return np.prod(self.shape)
[docs] def to_numpy(self) -> npt.NDArray[np.float64]: return self.data
def __repr__(self) -> str: return f"<Dense tensor of shape {self.shape} at {hex(id(self))}>" def __mul__(self, other: float) -> DenseTensor: return self.__class__(self.data * other)
[docs] @classmethod def random(cls, shape: Tuple[int, ...]) -> DenseTensor: return cls(random_normal(shape))
[docs]@dataclass class SparseTensor(Tensor): #: The shape of the tensor shape: Tuple[int, ...] #: The indices of the non-zero entries indices: Union[npt.NDArray, Tuple[npt.NDArray, ...]] #: The entries of the non-zero entries entries: npt.NDArray[np.float64] def __post_init__(self): if isinstance(self.indices, tuple): self.indices = np.stack(self.indices) @property def T(self) -> SparseTensor: new_indices = self.indices[::-1] new_shape = self.shape[::-1] return self.__class__(new_shape, new_indices, self.entries) @property def size(self) -> int: return self.nnz * (self.ndim + 1) @property def nnz(self) -> int: """Number of non-zero entries.""" return len(self.entries)
[docs] def split(self, n_summands: int) -> TensorSum: """ Splits the tensor a ``TensorSum`` containing ``n_summands`` tensors. """ block_size = self.nnz // n_summands summands: List[Tensor] = [] for i in range(n_summands): if i < n_summands - 1: summand_slice = slice(i * block_size, (i + 1) * block_size) else: summand_slice = slice(i * block_size, self.nnz) summand_indices = tuple(ind[summand_slice] for ind in self.indices) summands.append( SparseTensor( self.shape, summand_indices, self.entries[summand_slice], ) ) return TensorSum(summands)
[docs] def to_numpy(self) -> npt.NDArray[np.float64]: X = np.zeros(self.shape) X[tuple(self.indices)] = self.entries return X
[docs] def norm(self) -> float: return float(np.linalg.norm(self.entries))
def __repr__(self) -> str: return ( f"<Sparse tensor of shape {self.shape} with {self.nnz} non-zero" f" entries at {hex(id(self))}>" )
[docs] def dot(self, other: Tensor, reverse=False) -> float: try: other_entries = other.gather(self.indices) return np.dot(other_entries, self.entries) except AttributeError: return super().dot(other, reverse=reverse)
[docs] @classmethod def random( cls, shape: Tuple[int, ...], nnz: int, seed: Optional[int] = None ) -> SparseTensor: """ Generates a random sparse tensor with `nnz` non-zero gaussian entries """ if seed is not None: np.random.seed(seed) total_size = np.prod(shape) indices_flat = np.random.choice(total_size, size=nnz, replace=False) indices = np.unravel_index(indices_flat, shape) entries = random_normal(shape=(nnz,), seed=seed) return cls(shape, indices, entries)
def __mul__(self, other: float) -> SparseTensor: return self.__class__(self.shape, self.indices, self.entries * other)
[docs] def gather( self, indices: Tuple[npt.NDArray, ...] ) -> npt.NDArray[np.float64]: """ Gathers the entries corresponding to the given indices. """ dense_indices = np.ravel_multi_index(indices, self.shape) out = np.zeros(len(dense_indices)) dic = self.dict for i, ind in enumerate(dense_indices): out[i] = dic.get(ind, 0.0) return out
@cached_property def dict(self) -> Dict[int, float]: dense_indices = np.ravel_multi_index(self.indices, self.shape) return {i: v for i, v in zip(dense_indices, self.entries)}
[docs]class TensorTrain(Tensor): #: The shape of the tensor shape: Tuple[int, ...] #: Tuple encoding the tensor train rank rank: Tuple[int, ...] #: A list containing the cores of the tensor train cores: ArrayList def __init__(self, cores: ArrayList) -> None: self.cores = cores self.shape = tuple(C.shape[1] for C in cores) self.rank = tuple(C.shape[0] for C in cores[1:]) @property def T(self) -> TensorTrain: new_cores = [np.transpose(C, (2, 1, 0)) for C in self.cores[::-1]] return self.__class__(new_cores)
[docs] def to_numpy(self) -> npt.NDArray: dense_tensor = self.cores[0] dense_tensor = dense_tensor.reshape(dense_tensor.shape[1:]) for C in self.cores[1:]: dense_tensor = np.einsum("...j,jkl->...kl", dense_tensor, C) dense_tensor = dense_tensor.reshape(dense_tensor.shape[:-1]) return dense_tensor
[docs] @classmethod def random( cls, shape: Tuple[int, ...], rank: TTRank, seed: Optional[int] = None, orthog: bool = False, trim: Optional[bool] = None, norm_goal: str = "norm-1", ) -> TensorTrain: """ Generate random orthogonal tensor train cores By default, a core of ``(r1, n, r2)`` has Gaussian entries with zero mean and variance ``1 / r1 * n * r2``, so that expected Frobenius norm is 1. If ``trim=True``, the ranks are trimmed; i.e. we enforce that r1*n>=r2 and r2*n>=r1a. If ``orthog`` is set to ``True``, all cores except the last are left-orthogonalized. Trim must be enabled in this case. """ d = len(shape) if trim is None: trim = True if orthog else False if orthog and not trim: raise ValueError( "Trimming must be enabled if orthogonalization is enabled." ) rank = process_tt_rank(rank, shape, trim=trim) rank_augmented = (1,) + tuple(rank) + (1,) cores = [] seq = SeedSequence(seed) seeds = seq.generate_state(d) for i in range(d): r1 = rank_augmented[i] r2 = rank_augmented[i + 1] n = shape[i] core = random_normal(shape=(r1 * n, r2), seed=seeds[i]) if orthog and i < d - 1: core, _ = np.linalg.qr(core, mode="reduced") elif norm_goal == "norm-1": core /= np.sqrt(r1 * n) elif norm_goal == "norm-preserve": core /= np.sqrt(r1) else: raise ValueError(f"Unknown norm goal: {norm_goal}") core = core.reshape(r1, n, r2) cores.append(core) return cls(cores)
[docs] @classmethod def zero(cls, shape: Tuple[int, ...], rank: TTRank) -> TensorTrain: d = len(shape) rank = process_tt_rank(rank, shape, trim=False) cores = [] for (r1, d, r2) in zip((1,) + rank, shape, rank + (1,)): cores.append(np.zeros((r1, d, r2))) return cls(cores)
[docs] def partial_dense(self, dir: str = "lr") -> ArrayList: """Do partial contractions to dense tensor; ``X[0].X[1]...X[mu]``""" if dir == "lr": partial_cores = [self.cores[0].reshape(-1, self.cores[0].shape[-1])] for core in self.cores[1:-1]: new_core = np.einsum("ij,jkl->ikl", partial_cores[-1], core) new_core = new_core.reshape(-1, new_core.shape[-1]) partial_cores.append(new_core) elif dir == "rl": partial_cores = [ self.cores[-1].reshape(self.cores[-1].shape[0], -1) ] for core in self.cores[-2:0:-1]: new_core = np.einsum("ijk,kl->ijl", core, partial_cores[-1]) new_core = new_core.reshape(new_core.shape[0], -1) partial_cores.append(new_core) return partial_cores
def __getitem__(self, index: int) -> npt.NDArray: return self.cores[index] def __setitem__(self, index: int, data: npt.NDArray) -> None: self.cores[index] = data
[docs] def gather( self, idx: Union[npt.NDArray[np.int64], Tuple[npt.NDArray[np.int64], ...]], ) -> npt.NDArray: """Gather entries of dense tensor according to indices. For each row of `idx` this returns one number. This number is obtained by multiplying the slices of each core corresponding to each index (in a left-to-right fashion). """ if not isinstance(idx, np.ndarray): idx_array = np.stack(idx) else: idx_array = np.array(idx) N = idx_array.shape[1] result = np.take( self[0].reshape(self[0].shape[1:]), idx_array[0], axis=0 ) for i in range(1, self.ndim): r = self[i].shape[2] next_step = np.zeros((N, r)) for j in range(self.shape[i]): idx_mask = np.where(idx_array[i] == j) mat = self[i][:, j, :] next_step[idx_mask] = result[idx_mask] @ mat result = next_step return result.reshape(-1)
[docs] def norm(self) -> float: self_orth = self.orthogonalize() return np.linalg.norm(self_orth.cores[-1])
[docs] def round( self, eps: Optional[float] = None, max_rank: Optional[TTRank] = None, orthogonalized: bool = False, ) -> TensorTrain: """Standard TT-SVD rounding scheme. First left orthogonalize in LR sweep, then apply SVD-based rounding in a RL sweep. Leaves the tensor right-orthogonalized. If the tensor is already orthogonalized, then pass ``orthogonalized=True`` to avoid unnecessary re-orthogonalization.""" if not orthogonalized: tt = self.orthogonalize() # left-orthogonalize else: tt = self if eps is None: eps = 0 if max_rank is None: max_rank = tt.rank max_rank = process_tt_rank(max_rank, tt.shape, trim=True) new_cores = [] US_trunc: npt.NDArray for mu, C in list(enumerate(tt.cores))[::-1]: if mu < tt.ndim - 1: C = np.einsum("ijk,kl->ijl", C, US_trunc) if mu > 0: C_mat = C.reshape(C.shape[0], C.shape[1] * C.shape[2]) U, S, Vt = np.linalg.svd(C_mat) r = max(1, min(np.sum(S > S[0] * eps), max_rank[mu - 1])) US_trunc = U[:, :r] @ np.diag(S[:r]) Vt_trunc = Vt[:r, :] new_cores.append(Vt_trunc.reshape(r, C.shape[1], C.shape[2])) else: new_cores.append(C) return self.__class__(new_cores[::-1])
[docs] def svdvals(self) -> List[npt.NDArray]: """Return singular value of each mode""" tt = self.orthogonalize() svdvals = [] U: npt.NDArray S: npt.NDArray for mu, C in list(enumerate(tt.cores))[::-1]: if mu < tt.ndim - 1: US = U @ np.diag(S) C = np.einsum("ijk,kl->ijl", C, US) if mu > 0: C_mat = C.reshape(C.shape[0], C.shape[1] * C.shape[2]) else: C_mat = C.reshape(C.shape[0] * C.shape[1], C.shape[2]) U, S, _ = np.linalg.svd(C_mat) svdvals.append(S) svdvals = svdvals[::-1] return svdvals
def __mul__(self, other: float) -> TensorTrain: new_cores = deepcopy(self.cores) # Absorb number into last core, since if we do orthogonalize, we left # orthogonalize. new_cores[-1] = new_cores[-1] * other return self.__class__(new_cores) __rmul__ = __mul__ @property def size(self) -> int: return sum(core.size for core in self.cores)
[docs] def add(self, other: TensorTrain) -> TensorTrain: """ Add two ``TensorTrain`` by taking direct sums of cores. Note, this does not overload the addition operator ``+``; the addition operator instead returns a lazy ``TensorSum`` object. """ new_cores = [np.concatenate((self.cores[0], other.cores[0]), axis=2)] for C1, C2 in zip(self.cores[1:-1], other.cores[1:-1]): r1, d, r2 = C1.shape r3, _, r4 = C2.shape zeros1 = np.zeros((r1, d, r4)) zeros2 = np.zeros((r3, d, r2)) row1 = np.concatenate((C1, zeros1), axis=2) row2 = np.concatenate((zeros2, C2), axis=2) new_cores.append(np.concatenate((row1, row2), axis=0)) new_cores.append( np.concatenate((self.cores[-1], other.cores[-1]), axis=0) ) new_tt = self.__class__(new_cores) return new_tt
[docs] def dot(self, other: Tensor, reverse=False) -> float: """ Compute the dot product of two tensor trains with the same shape. Result is computed in a left-to-right sweep. """ if isinstance(other, TensorTrain): result = np.einsum("ijk,ljm->km", self.cores[0], other.cores[0]) for core1, core2 in zip(self.cores[1:], other.cores[1:]): # optimize reduces complexity from r^4*n to r^3*n result = np.einsum( "ij,ika,jkb->ab", result, core1, core2, optimize="optimal" ) return np.sum(result) else: return super().dot(other, reverse=reverse)
[docs] def orthogonalize(self) -> TensorTrain: """Do QR sweep to left-orthogonalize""" new_cores = [] R: npt.NDArray for mu, C in enumerate(self.cores): if mu > 0: C = np.einsum("ij,jkl->ikl", R, C) if mu < self.ndim - 1: C_mat = C.reshape(C.shape[0] * C.shape[1], C.shape[2]) Q, R = np.linalg.qr(C_mat) new_cores.append(Q.reshape(C.shape[0], C.shape[1], -1)) else: new_cores.append(C) return self.__class__(new_cores)
def __repr__(self) -> str: return ( f"<Tensor train of shape {self.shape} with rank {self.rank}" f" at {hex(id(self))}>" )
[docs] def error( self: TType, other: TType, relative: bool = False, rmse: bool = False, fast: bool = False, ) -> float: """ L2 error of tensor, see docs for ``Tensor::error``. Overloads only the error between two Tensor Trains using much faster accurate method. """ try: # Try coercing to TensorTrain other = other.to_tt() except AttributeError: pass if isinstance(other, TensorTrain): error = self.add(-other).norm() if relative: other_norm = other.norm() if other_norm == 0: return np.inf error /= other_norm if rmse: error /= np.sqrt(np.prod(self.shape)) return error else: return super().error(other, relative=relative, rmse=rmse, fast=fast)
[docs]class TensorSum(Generic[TType], Tensor): """Container for sums of tensors""" shape: Tuple[int, ...] tensors: List[TType] def __init__(self, tensors: List[TType], shape=None) -> None: if shape is None: shape = tensors[0].shape self.shape = shape self.tensors = tensors @property def size(self) -> int: return sum(tensor.size for tensor in self.tensors) @property def T(self) -> TensorSum: new_tensors: List[TType] = [X.T for X in self.tensors] return self.__class__(new_tensors)
[docs] def to_numpy(self) -> npt.NDArray[np.float64]: s = np.zeros(self.shape) for X in self.tensors: s += X.to_numpy() return s
def __add__(self, other: TType) -> TensorSum: if isinstance(other, TensorSum): return self.__class__(self.tensors + other.tensors) else: return self.__class__(self.tensors + [other]) def __iadd__(self, other: TType) -> TensorSum: if isinstance(other, TensorSum): self.tensors.extend(other.tensors) else: self.tensors.append(other) return self def __repr__(self) -> str: return ( f"<Sum of {self.num_summands} tensors of shape {self.shape}" f" at {hex(id(self))}>" ) @property def num_summands(self) -> int: return len(self.tensors) def __mul__(self, other: Union[float, Iterable[float]]) -> TensorSum: try: return self.__class__( [X * c for X, c in zip(self.tensors, other, strict=True)] ) except TypeError: return self.__class__([X * other for X in self.tensors])
[docs] def dot(self, other: Tensor, reverse=False) -> float: return sum(X.dot(other, reverse) for X in self.tensors)
[docs]class CPTensor(Tensor): """Implements CP tensors. The cores are stored as list of shape ``(shape[i], rank)``.""" shape: Tuple[int, ...] rank: int cores: ArrayList def __init__(self, cores: ArrayList) -> None: self.cores = cores self.rank = cores[0].shape[1] self.shape = tuple(C.shape[0] for C in cores)
[docs] def size(self) -> int: return sum(C.size for C in self.cores)
@property def T(self) -> CPTensor: new_cores = self.cores[::-1] return self.__class__(new_cores)
[docs] def to_numpy(self) -> npt.NDArray: dense_tensor = self.cores[0] # shape (n0,r) for C in self.cores[1:]: dense_tensor = np.einsum( "...j,ij->...ij", dense_tensor, C ) # shape (n0,...,nk,r) dense_tensor = np.sum(dense_tensor, axis=-1) return dense_tensor
[docs] @classmethod def random( cls, shape: Tuple[int, ...], rank: int, seed: Optional[int] = None ) -> CPTensor: d = len(shape) seq = SeedSequence(seed) seeds = seq.generate_state(d) cores = [] for i in range(d): core = random_normal(shape=(shape[i], rank), seed=seeds[i]) core /= np.sqrt(shape[i]) cores.append(core) return cls(cores)
def __getitem__(self, index: int) -> npt.NDArray: return self.cores[index] def __setitem__(self, index: int, data: npt.NDArray) -> None: self.cores[index] = data
[docs] def gather(self, idx: Tuple[npt.NDArray[np.int64], ...]) -> npt.NDArray: """Obtain the values of the tensor at the given indices.""" res = 1 for C, id in zip(self.cores, idx): res *= C[id] return np.sum(res, axis=1)
def __repr__(self) -> str: return ( f"<CP tensor of shape {self.shape} and rank {self.rank} " f"at {hex(id(self))}>" ) def __mul__(self, other: float) -> CPTensor: new_cores = [c for c in self.cores] new_cores[0] = new_cores[0] * other return self.__class__(new_cores)
[docs]class TuckerTensor(Tensor): """Implements Tucker tensors. This consists of a core tensor of shape ``(s1, ..., sd)`` and ``d`` factor matrices of shape ``(si, ni)``, where ``(n1, ..., nd)`` is the overall shape of the tensor.""" def __init__(self, factors: ArrayList, core: npt.NDArray) -> None: self.core = core self.factors = factors self.shape = tuple(U.shape[1] for U in factors) self.rank = tuple(U.shape[0] for U in factors) @property def T(self) -> TuckerTensor: new_factors = self.factors[::-1] permutation = tuple(range(len(self.shape))[::-1]) new_core = np.transpose(self.core, permutation) return self.__class__(new_factors, new_core) @property def size(self) -> int: return self.core.size + sum(U.size for U in self.factors)
[docs] def to_numpy(self) -> npt.NDArray[np.float64]: core_contracted = self.core for i, U in enumerate(self.factors): left_dim = np.prod(self.shape[:i], dtype=np.int64) right_dim = np.prod(self.rank[i + 1 :], dtype=np.int64) core_mat = core_contracted.reshape( left_dim, self.rank[i], right_dim ) core_contracted = np.einsum("ijk,jl->ilk", core_mat, U) return core_contracted.reshape(self.shape)
def __mul__(self, other: float) -> TuckerTensor: new_core = self.core * other return self.__class__(self.factors, new_core) def __repr__(self) -> str: return ( f"<Tucker tensor of shape {self.shape} and rank {self.rank} " f"at {hex(id(self))}>" )
[docs] @classmethod def random( cls, shape: Tuple[int, ...], rank: Union[int, Tuple[int, ...]], seed: Optional[int] = None, ) -> TuckerTensor: d = len(shape) try: rank_tuple = tuple(rank) # type: ignore except TypeError: rank_tuple = (rank,) * d # type: ignore rank_tuple = tuple(min(r1, r2) for r1, r2 in zip(rank_tuple, shape)) seq = SeedSequence(seed) core_seed = seq.generate_state(1)[0] core = random_normal(shape=rank_tuple, seed=core_seed) factors = [] seeds = seq.generate_state(d) for r, n, seed in zip(rank_tuple, shape, seeds): U = random_normal(shape=(r, n), seed=seed) U = np.linalg.qr(U.T)[0].T factors.append(U) return cls(factors, core)