Source code for pykbe.misc

from __future__ import annotations

import numba
import numpy as np
from numba import njit, prange


[docs] @njit(parallel=True) def parallel_set(arr, val): """Broadcast value to array. Set all elements of a contiguous array to a given value. Fails if the array is not contiguous. Parameters ---------- arr : array_like Array to set. val : scalar Value to set. """ assert arr.flags.contiguous if arr.flags.c_contiguous: arr1 = arr.ravel() for i in prange(arr1.size): arr1[i] = val elif arr.flags.f_contiguous: arr1 = arr.T.ravel() for i in prange(arr1.size): arr1[i] = val
[docs] @njit(parallel=True) def accum_mat_into_tens4(mat, tensor, p, q, alpha=1.0, conj=False, beta=0.0): """Accumulate a matrix into a tensor of rank 4. Performs the operation :code:`tensor[p, :, q, :] += alpha * mat`. Parameters ---------- mat : (N, M) array_like 2D array. tensor : (_, N, _, M) array_like 4D array. p : int First index of tens4. q : int Third index of tens4. alpha : scalar, optional Scalar multiplier. beta : scalar, optional Scalar to add entrywise. conj : bool, optional Whether to take the elementwise conjugate of mat. """ n, m = mat.shape a, b, c, d = tensor.shape assert b == n assert d == m assert p < a assert q < c if beta == 0.0: if conj: for i in prange(n): for j in prange(m): tensor[p, i, q, j] += alpha * np.conj(mat[i, j]) else: for i in prange(n): for j in prange(m): tensor[p, i, q, j] += alpha * mat[i, j] elif conj: for i in prange(n): for j in prange(m): tensor[p, i, q, j] += alpha * np.conj(mat[i, j]) + beta else: for i in prange(n): for j in prange(m): tensor[p, i, q, j] += alpha * mat[i, j] + beta
[docs] @njit(parallel=True) def accum_mat_into_tens3(mat, tensor, q, alpha=1.0, conj=False): """Accumulate a matrix into a tensor of rank 3. Performs the operation :code:`tensor[:, q, :] += alpha * mat`. Parameters ---------- mat : (N, M) array_like 2D array. tensor : (N, _, M) array_like 3D array. q : int Second index of tens4. alpha : scalar, optional Scalar multiplier. conj : bool, optional Whether to take the elementwise conjugate of mat. """ n, m = mat.shape b, c, d = tensor.shape assert b == n assert d == m assert q < c if conj: for i in prange(n): for j in prange(m): tensor[i, q, j] += alpha * np.conj(mat[i, j]) else: for i in prange(n): for j in prange(m): tensor[i, q, j] += alpha * mat[i, j]
[docs] @njit(parallel=True) def unpack_dim3_into_dim2(dest, src, q): """Unpack a slice of a 3D array into a 2D array. Unpacks a slice of a 3D array into a 2D array. Make sure everything is row-major. Parameters ---------- dest : (N, M) array_like 2D array. src : (N, _, M) array_like 3D array. q : int Second index of src. """ n, m = dest.shape a, b, c = src.shape assert a == n assert c == m assert q < b for i in prange(n): for j in prange(m): dest[i, j] = src[i, q, j]
[docs] @njit(parallel=True) def unpack_tril(tril, symm, filltriu=True): """Unpack matrix lower triangle. Unpacks the lower triangle of a matrix; optionally fills the upper triangle symmetrically. Parameters ---------- tril : (N*(N+1)/2,) ndarray Packed lower triangle. symm : (N, N) ndarray Unpacked matrix. filltriu : bool, optional Fill the upper triangle. """ n = symm.shape[0] assert symm.shape[1] == n assert tril.size == n * (n + 1) // 2 for i in prange(n): nii = i * (i + 1) // 2 for j in range(i + 1): symm[i, j] = tril[nii + j] if filltriu: for i in prange(n): for j in prange(i + 1, n): symm[i, j] = symm[j, i]
[docs] def fast_matexp(eigvals, eigvecs, out=None): """Compute the matrix exponential of a diagonalized matrix. Parameters ---------- eigvals : (N,) array_like Eigenvalues. eigvecs : (N, N) array_like Eigenvectors. Returns ------- matexp : (N, N) ndarray Matrix exponential. """ eigvals_exp = np.exp(eigvals) tmp = eigvecs * eigvals_exp.reshape(1, -1) return np.matmul(tmp, eigvecs.conj().T, out=out)
if __name__ == "__main__": n = 1024 import argparse parser = argparse.ArgumentParser() parser.add_argument("-n", type=int, default=8) args = parser.parse_args() numba.set_num_threads(args.n) import timeit X = np.zeros((4, n, 10, n)) rng = np.random.default_rng() A = rng.random((n, n)) t = timeit.timeit(lambda: accum_mat_into_tens3(A, X[0], 4), number=1000) t = timeit.timeit(lambda: accum_mat_into_tens3(A, X[0], 4), number=1000) print("accum_mat_into_tens3:", t, "ms") t = timeit.timeit(lambda: accum_mat_into_tens4(A, X, 0, 4), number=1000) t = timeit.timeit(lambda: accum_mat_into_tens4(A, X, 0, 4), number=1000) print("accum_mat_into_tens4:", t, "ms")