# Source code for structure_factor.tapered_estimators

"""Collection of functions that compute tapered estimates of the structure factor :math:S(k) of a stationary point process given one realization.

Some tapers (tapering function) are available in :ref:tapers.
"""

import warnings

import numpy as np

from structure_factor.tapers import BartlettTaper
from structure_factor.utils import meshgrid_to_column_matrix

[docs]def scattering_intensity(k, point_pattern, debiased=False, direct=True):
r"""Compute the scattering intensity estimator :math:\widehat{S}_{\mathrm{SI}} of the structure factor evaluated at at k, from one realization of a stationary point process encapsulated in point_pattern.

Args:
k (numpy.ndarray): Array of size :math:n \times d, where :math:d is the ambient dimension and :math:n the number of points where the estimator is evaluated.

point_pattern (:py:class:~structure_factor.point_pattern.PointPattern): Realization of the underlying stationary point process.

debiased (bool, optional): trigger the use of debiased tapered estimators. Defaults to False.

direct (bool, optional): if debiased is True, trigger the use of the tapered direct/undirect debiased tapered estimators. Defaults to True.

Returns:
numpy.ndarray: Vector of size :math:n containing the evaluation of scattering intensity estimator :math:\widehat{S}_{\mathrm{SI}}(k).

.. proof:definition::

The scattering intensity estimator :math:\widehat{S}_{\mathrm{SI}}, constructed from a realization :math:\{x_1, \dots, x_N\} \subset \mathbb{R}^d, is defined by,

.. math::
\widehat{S}_{\mathrm{SI}}(\mathbf{k}) =
\frac{1}{N}\left\lvert
\sum_{j=1}^N
\exp(- i \left\langle \mathbf{k}, \mathbf{x_j} \right\rangle)
\right\rvert^2 .

For more details we refer to :cite:HGBLR:22, (Section 3.1).

.. seealso::

- :py:func:~structure_factor.tapered_estimators.allowed_k_scattering_intensity
"""
n, d = k.shape
if d != point_pattern.dimension:
raise ValueError(
f"k must be of size (n, d) where d=point_pattern.dimension. Given k {k.shape} and d d = {point_pattern.dimension}"
)
estimator = select_tapered_estimator(debiased, direct)
taper = BartlettTaper()
estimated_sf_k = estimator(k, point_pattern, taper=taper)
return estimated_sf_k

[docs]def allowed_k_scattering_intensity(d, L, k_max=5, meshgrid_shape=None):
r"""Return a subset of the d-dimensional allowed wavevectors corresponding to a cubic window of length L.

Args:
d (int): Dimension of the space containing the point process.

L (numpy.ndarray): 1d array of size d, where each element correspond to the length of a side of the BoxWindow containing the point process realization.

k_max (float, optional): Supremum of the components of the allowed wavevectors on which the scattering intensity to be evaluated; i.e., for any allowed wavevector :math:\mathbf{k}=(k_1,...,k_d), :math:k_i \leq k\_max for all i. This implies that the maximum of the output vector k_norm will be approximately equal to the norm of the vector :math:(k\_max, ... k\_max). Defaults to 5.

meshgrid_shape (tuple, optional): Tuple of length d, where each element specifies the number of components over an axis. These axes are crossed to form a subset of :math:\mathbb{Z}^d used to construct a set of allowed wavevectors. i.g., if d=2, setting meshgid_shape=(2,3) will construct a meshgrid of allowed wavevectors formed by a vector of 2 values over the x-axis and a vector of 3 values over the y-axis. Defaults to None, which will run the calculation over **all** the allowed wavevectors. Defaults to None.

Returns:
tuple (numpy.ndarray, list):
- k : np.array with d columns where each row is an allowed wavevector.

.. proof:definition::

The set of the allowed wavevectors :math:\{\mathbf{k}_i\}_i is defined by

.. math::

\{\mathbf{k}_i\}_i = \{\frac{2 \pi}{L} \mathbf{n} ~ ; ~ \mathbf{n} \in (\mathbb{Z}^d)^\ast \}.

Note that the maximum n and the number of output allowed wavevectors returned by :py:func:~structure_factor.tapered_estimators.allowed_k_scattering_intensity, are specified by the input parameters k_max and meshgrid_shape.

.. seealso::

- :py:func:~structure_factor.tapered_estimators.scattering_intensity
"""
assert isinstance(k_max, (float, int))

n_max = np.floor(k_max * L / (2 * np.pi))  # maximum of n

#! todo refactoring needed, too complex and duplicated code
# warnings
if meshgrid_shape is None:
warnings.warn(
message="The computation on all allowed wavevectors may be time-consuming."
)
elif (np.array(meshgrid_shape) > (2 * n_max)).any():
warnings.warn(
message="Each component of the argument 'meshgrid_shape' should be less than or equal to the cardinality of the (total) set of allowed wavevectors."
)

# meshgrid_shape = np.fmin(meshgrid_shape, 2 * n_max)
# case d=1
if d == 1:
if meshgrid_shape is None or (meshgrid_shape > (2 * n_max)):
n = np.arange(-n_max, n_max + 1, step=1)
n = n[n != 0]
else:
n = np.linspace(-n_max, n_max, num=meshgrid_shape, dtype=int, endpoint=True)
if np.count_nonzero(n == 0) != 0:
n = np.linspace(
-n_max, n_max, num=meshgrid_shape + 1, dtype=int, endpoint=True
)
k = 2 * np.pi * n / L
k = k.reshape(-1, 1)
# case d>1
else:
if meshgrid_shape is None or (np.array(meshgrid_shape) > (2 * n_max)).any():
ranges = []
for n in n_max:
n_i = np.arange(-n, n + 1, step=1)
n_i = n_i[n_i != 0]
ranges.append(n_i)
n = meshgrid_to_column_matrix(np.meshgrid(*ranges, copy=False))

else:
ranges = []
i = 0
for s in meshgrid_shape:
n_i = np.linspace(-n_max[i], n_max[i], num=s, dtype=int, endpoint=True)
if np.count_nonzero(n_i == 0) != 0:
n_i = np.linspace(
-n_max[i], n_max[i], num=s + 1, dtype=int, endpoint=True
)
i += 1
n_i = n_i[n_i != 0]
ranges.append(n_i)
n = meshgrid_to_column_matrix(np.meshgrid(*ranges, copy=False))

k = 2 * np.pi * n / L.T
return k

[docs]def select_tapered_estimator(debiased, direct):
"""Select the tapered estimator of the structure factor.

Args:
debiased (bool): Trigger the use of a debiased tapered estimator.
direct (bool): If debiased is True, trigger the use of the direct/undirect debiased tapered estimator.

Returns:
callable: According to debiased and direct return

- :py:func:~structure_factor.tapered_estimators.tapered_estimator_debiased_direct
- :py:func:~structure_factor.tapered_estimators.tapered_estimator_debiased_undirect
- :py:func:~structure_factor.tapered_estimators.tapered_estimator_core
"""
if debiased:
if direct:
return tapered_estimator_debiased_direct
return tapered_estimator_debiased_undirect
return tapered_estimator_core

[docs]def tapered_estimator_core(k, point_pattern, taper):
r"""Compute the tapered estimator :math:S_{TP}(t, k) of the structure factor :math:S(k) evaluated at k w.r.t. the taper :math:t and the realization point_pattern of the underlying stationary point process.

Args:
k (numpy.ndarray): Array of size :math:n \times d, where :math:d is the ambient dimension and :math:n the number of points where the estimator is evaluated.

point_pattern (:py:class:~structure_factor.point_pattern.PointPattern): Realization of the underlying stationary point process.

taper (object): Class with static method or instance with method .taper(x, window) corresponding to :math:t(x, W).

Returns:
numpy.ndarray: Evaluation of the tapered estimator :math:S_{TP}(t, k) at k.

.. proof:definition::

The tapered estimator :math:S_{TP}(t, k), constructed from a realization :math:\{x_1, \dots, x_N\} \subset \mathbb{R}^d, is defined by,

.. math::

S_{TP}(t, k) =
\frac{1}{\rho}
\left\lvert
\sum_{j=1}^N
t(x_j, W)
\exp(- i \left\langle k, x_j \right\rangle)
\right\rvert^2,

for :math:k \in \mathbb{R}^d.
"""
rho = point_pattern.intensity
dft = tapered_dft(k, point_pattern, taper)
estimated_sf_k = periodogram_from_dft(dft)
estimated_sf_k /= rho
return estimated_sf_k

[docs]def tapered_estimator_debiased_direct(k, point_pattern, taper):
r"""Compute the direct debiased tapered estimator :math:S_{DDTP}(t, k) of the structure factor :math:S(k) evaluated at k w.r.t. the taper :math:t and the realization point_pattern of the underlying stationary point process.

.. math::

S_{DDTP}(t, k) =
\frac{1}{\rho} \left\lvert
\sum_{j=1}^N
t(x_j, W)
\exp(- i \left\langle k, x_j \right\rangle)
- \rho F[t(\cdot, W)](k)
\right\rvert^2

where :math:x_{1}, \dots, x_{N} corresponds to point_pattern.points,  :math:W corresponds to the window point_pattern.window and :math:\rho is the intensity of the underlying stationary point process.

The direct debiased estimator :math:S_{DDTP}(t, k) is positive and asymptotically unbiased as the observation window :math:W grows to :math:\mathbb{R}^d.

Args:
k (numpy.ndarray): Array of size :math:n \times d, where :math:d is the ambient dimension and :math:n the number of points where the estimator is evaluated.

point_pattern (:py:class:~structure_factor.point_pattern.PointPattern): Realization of the underlying stationary point process.

taper (object): class instance with two methods:

- .taper(x, window) corresponding to the taper function :math:t(x, W) such that :math:\|t(\cdot, W)\|_2 = 1.

- .ft_taper(k, window) corresponding to the Fourier transform :math:\mathcal{F}[t(\cdot, W)](k) of the taper function.

Returns:
numpy.ndarray: Vector of size :math:n containing the evaluation of the direct debiased estimator :math:S_{DDTP}(t, k).
"""
rho = point_pattern.intensity
window = point_pattern.window

# Debiased dft
dft = tapered_dft(k, point_pattern, taper)
dft -= rho * taper.ft_taper(k, window)

estimated_sf_k = periodogram_from_dft(dft)
estimated_sf_k /= rho
return estimated_sf_k

[docs]def tapered_estimator_debiased_undirect(k, point_pattern, taper):
r"""Compute the undirect debiased tapered estimator :math:S_{UDTP}(t, k) of the structure factor :math:S(k) evaluated at k w.r.t. the taper :math:t and the realization point_pattern of the underlying stationary point process.

.. math::

S_{UDTP}(t, k) =
\frac{1}{\rho}
\left\lvert
\sum_{j=1}^N
t(x_j, W)
\exp(- i \left\langle k, x_j \right\rangle)
\right\rvert^2
- \rho
\left\lvert
F[t(\cdot, W)](k))
\right\rvert^2

where :math:x_{1}, \dots, x_{N} corresponds to point_pattern.points, :math:W corresponds to the window point_pattern.window and :math:\rho is the intensity of the underlying stationary point process.

The undirect debiased estimator :math:S_{UDTP}(t, k) is not guaranteed to be positive but is asymptotically unbiased as the observation window :math:W grows to :math:\mathbb{R}^d.

Args:
k (numpy.ndarray): Array of size :math:n \times d, where :math:d is the ambient dimension and :math:n the number of points where the estimator is evaluated.

point_pattern (:py:class:~structure_factor.point_pattern.PointPattern): Realization of the underlying stationary point process.

taper (object): class instance with two methods:

- .taper(x, window) corresponding to the taper function :math:t(x, W) such that :math:\|t(\cdot, W)\|_2 = 1.
- .ft_taper(k, window) corresponding to the Fourier transform :math:\mathcal{F}[t(\cdot, W)](k) of the taper function.

Returns:
numpy.ndarray: Vector of size :math:n containing the evaluation of the direct debiased estimator :math:S_{UDTP}(t, k).
"""
window = point_pattern.window
rho = point_pattern.intensity

estimated_sf_k = tapered_estimator_core(k, point_pattern, taper)
Hk_2 = np.abs(taper.ft_taper(k, window))
np.square(Hk_2, out=Hk_2)
estimated_sf_k -= rho * Hk_2
return estimated_sf_k

[docs]def tapered_dft(k, point_pattern, taper):
r"""Compute the tapered discrete Fourier transform (tapered DFT) associated with point_pattern evaluated at k, using taper :math:t.

.. math::

\sum_{j=1}^N t(x_j, W) \exp(- i \langle k, x_j \rangle),

where :math:x_{1}, \dots, x_{N} corresponds to point_pattern.points and :math:W corresponds to the window point_pattern.window.

Args:
k (numpy.ndarray): Array of size :math:n \times d, where :math:d is the ambient dimension and :math:n the number of points where the tapered DFT is evaluated.

point_pattern (:py:class:~structure_factor.point_pattern.PointPattern): Realization of the underlying stationary point process.

taper (object): class with static method or instance with method .taper(x, window) corresponding to :math:t(x, W) such that :math:\|t(\cdot, W)\|_2 = 1.

Returns:
numpy.ndarray: Evaluation of the DFT of the taper h
"""
points = point_pattern.points
window = point_pattern.window
K = np.atleast_2d(k)
X = np.atleast_2d(points)
nb_k, _ = K.shape
nb_x, _ = X.shape

# dft = sum_x t(x) exp(- i <k, x>)
hx_exp_ikx = np.zeros((nb_k, nb_x), dtype=complex)
# i <k, x>
hx_exp_ikx.imag = np.dot(K, X.T)
# - i <k, x>
np.conj(hx_exp_ikx, out=hx_exp_ikx)
# exp(- i <k, x>)
np.exp(hx_exp_ikx, out=hx_exp_ikx)
# t(x) exp(- i <k, x>)
hx = taper.taper(X, window)
hx_exp_ikx *= hx
# sum_x t(x) exp(- i <k, x>)
dft = np.sum(hx_exp_ikx, axis=1)
return dft

[docs]def periodogram_from_dft(dft):
r"""Compute the square absolute value abs(dft)**2 of the discrete Fourier transform dft.

.. math::

\left\lvert
\sum_{j=1}^N t(x_j) \exp(- i \langle k, x_j \rangle)
\right\rvert ^ 2,

Args:
dft (numpy.ndarray): Discrete Fourier transform computed via :py:func:~structure_factor.tapered_estimators.tapered_dft.

Returns:
numpy.ndarray: abs(dft)**2.
"""
periodogram = np.zeros_like(dft, dtype=float)
np.abs(dft, out=periodogram)
np.square(periodogram, out=periodogram)
return periodogram
`