from __future__ import annotations
import numba
import numpy as np
from numba import njit, prange
[docs]
@njit(parallel=True)
def parallel_set(arr, val):
"""Broadcast value to array.
Set all elements of a contiguous array to a given value.
Fails if the array is not contiguous.
Parameters
----------
arr : array_like
Array to set.
val : scalar
Value to set.
"""
assert arr.flags.contiguous
if arr.flags.c_contiguous:
arr1 = arr.ravel()
for i in prange(arr1.size):
arr1[i] = val
elif arr.flags.f_contiguous:
arr1 = arr.T.ravel()
for i in prange(arr1.size):
arr1[i] = val
[docs]
@njit(parallel=True)
def accum_mat_into_tens4(mat, tensor, p, q, alpha=1.0, conj=False, beta=0.0):
"""Accumulate a matrix into a tensor of rank 4.
Performs the operation
:code:`tensor[p, :, q, :] += alpha * mat`.
Parameters
----------
mat : (N, M) array_like
2D array.
tensor : (_, N, _, M) array_like
4D array.
p : int
First index of tens4.
q : int
Third index of tens4.
alpha : scalar, optional
Scalar multiplier.
beta : scalar, optional
Scalar to add entrywise.
conj : bool, optional
Whether to take the elementwise conjugate of mat.
"""
n, m = mat.shape
a, b, c, d = tensor.shape
assert b == n
assert d == m
assert p < a
assert q < c
if beta == 0.0:
if conj:
for i in prange(n):
for j in prange(m):
tensor[p, i, q, j] += alpha * np.conj(mat[i, j])
else:
for i in prange(n):
for j in prange(m):
tensor[p, i, q, j] += alpha * mat[i, j]
elif conj:
for i in prange(n):
for j in prange(m):
tensor[p, i, q, j] += alpha * np.conj(mat[i, j]) + beta
else:
for i in prange(n):
for j in prange(m):
tensor[p, i, q, j] += alpha * mat[i, j] + beta
[docs]
@njit(parallel=True)
def accum_mat_into_tens3(mat, tensor, q, alpha=1.0, conj=False):
"""Accumulate a matrix into a tensor of rank 3.
Performs the operation
:code:`tensor[:, q, :] += alpha * mat`.
Parameters
----------
mat : (N, M) array_like
2D array.
tensor : (N, _, M) array_like
3D array.
q : int
Second index of tens4.
alpha : scalar, optional
Scalar multiplier.
conj : bool, optional
Whether to take the elementwise conjugate of mat.
"""
n, m = mat.shape
b, c, d = tensor.shape
assert b == n
assert d == m
assert q < c
if conj:
for i in prange(n):
for j in prange(m):
tensor[i, q, j] += alpha * np.conj(mat[i, j])
else:
for i in prange(n):
for j in prange(m):
tensor[i, q, j] += alpha * mat[i, j]
[docs]
@njit(parallel=True)
def unpack_dim3_into_dim2(dest, src, q):
"""Unpack a slice of a 3D array into a 2D array.
Unpacks a slice of a 3D array into a 2D array.
Make sure everything is row-major.
Parameters
----------
dest : (N, M) array_like
2D array.
src : (N, _, M) array_like
3D array.
q : int
Second index of src.
"""
n, m = dest.shape
a, b, c = src.shape
assert a == n
assert c == m
assert q < b
for i in prange(n):
for j in prange(m):
dest[i, j] = src[i, q, j]
[docs]
@njit(parallel=True)
def unpack_tril(tril, symm, filltriu=True):
"""Unpack matrix lower triangle.
Unpacks the lower triangle of a matrix;
optionally fills the upper triangle symmetrically.
Parameters
----------
tril : (N*(N+1)/2,) ndarray
Packed lower triangle.
symm : (N, N) ndarray
Unpacked matrix.
filltriu : bool, optional
Fill the upper triangle.
"""
n = symm.shape[0]
assert symm.shape[1] == n
assert tril.size == n * (n + 1) // 2
for i in prange(n):
nii = i * (i + 1) // 2
for j in range(i + 1):
symm[i, j] = tril[nii + j]
if filltriu:
for i in prange(n):
for j in prange(i + 1, n):
symm[i, j] = symm[j, i]
[docs]
def fast_matexp(eigvals, eigvecs, out=None):
"""Compute the matrix exponential of a diagonalized matrix.
Parameters
----------
eigvals : (N,) array_like
Eigenvalues.
eigvecs : (N, N) array_like
Eigenvectors.
Returns
-------
matexp : (N, N) ndarray
Matrix exponential.
"""
eigvals_exp = np.exp(eigvals)
tmp = eigvecs * eigvals_exp.reshape(1, -1)
return np.matmul(tmp, eigvecs.conj().T, out=out)
if __name__ == "__main__":
n = 1024
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("-n", type=int, default=8)
args = parser.parse_args()
numba.set_num_threads(args.n)
import timeit
X = np.zeros((4, n, 10, n))
rng = np.random.default_rng()
A = rng.random((n, n))
t = timeit.timeit(lambda: accum_mat_into_tens3(A, X[0], 4), number=1000)
t = timeit.timeit(lambda: accum_mat_into_tens3(A, X[0], 4), number=1000)
print("accum_mat_into_tens3:", t, "ms")
t = timeit.timeit(lambda: accum_mat_into_tens4(A, X, 0, 4), number=1000)
t = timeit.timeit(lambda: accum_mat_into_tens4(A, X, 0, 4), number=1000)
print("accum_mat_into_tens4:", t, "ms")