"""Collection of functions that compute tapered estimates of the structure factor :math:`S(k)` of a stationary point process given one realization.
Some tapers (tapering function) are available in :ref:`tapers`.
"""
import warnings
import numpy as np
from structure_factor.tapers import BartlettTaper
from structure_factor.utils import meshgrid_to_column_matrix
[docs]def scattering_intensity(k, point_pattern, debiased=False, direct=True):
r"""Compute the scattering intensity estimator :math:`\widehat{S}_{\mathrm{SI}}` of the structure factor evaluated at at ``k``, from one realization of a stationary point process encapsulated in ``point_pattern``.
Args:
k (numpy.ndarray): Array of size :math:`n \times d`, where :math:`d` is the ambient dimension and :math:`n` the number of points where the estimator is evaluated.
point_pattern (:py:class:`~structure_factor.point_pattern.PointPattern`): Realization of the underlying stationary point process.
debiased (bool, optional): trigger the use of debiased tapered estimators. Defaults to False.
direct (bool, optional): if ``debiased`` is True, trigger the use of the tapered direct/undirect debiased tapered estimators. Defaults to True.
Returns:
numpy.ndarray: Vector of size :math:`n` containing the evaluation of scattering intensity estimator :math:`\widehat{S}_{\mathrm{SI}}(k)`.
.. proof:definition::
The scattering intensity estimator :math:`\widehat{S}_{\mathrm{SI}}, constructed from a realization :math:`\{x_1, \dots, x_N\} \subset \mathbb{R}^d`, is defined by,
.. math::
\widehat{S}_{\mathrm{SI}}(\mathbf{k}) =
\frac{1}{N}\left\lvert
\sum_{j=1}^N
\exp(- i \left\langle \mathbf{k}, \mathbf{x_j} \right\rangle)
\right\rvert^2 .
For more details we refer to :cite:`HGBLR:22`, (Section 3.1).
.. seealso::
- :py:func:`~structure_factor.tapered_estimators.allowed_k_scattering_intensity`
"""
n, d = k.shape
if d != point_pattern.dimension:
raise ValueError(
f"k must be of size (n, d) where d=point_pattern.dimension. Given k {k.shape} and d d = {point_pattern.dimension}"
)
estimator = select_tapered_estimator(debiased, direct)
taper = BartlettTaper()
estimated_sf_k = estimator(k, point_pattern, taper=taper)
return estimated_sf_k
[docs]def allowed_k_scattering_intensity(d, L, k_max=5, meshgrid_shape=None):
r"""Return a subset of the d-dimensional allowed wavevectors corresponding to a cubic window of length ``L``.
Args:
d (int): Dimension of the space containing the point process.
L (numpy.ndarray): 1d array of size ``d``, where each element correspond to the length of a side of the BoxWindow containing the point process realization.
k_max (float, optional): Supremum of the components of the allowed wavevectors on which the scattering intensity to be evaluated; i.e., for any allowed wavevector :math:`\mathbf{k}=(k_1,...,k_d)`, :math:`k_i \leq k\_max` for all i. This implies that the maximum of the output vector ``k_norm`` will be approximately equal to the norm of the vector :math:`(k\_max, ... k\_max)`. Defaults to 5.
meshgrid_shape (tuple, optional): Tuple of length `d`, where each element specifies the number of components over an axis. These axes are crossed to form a subset of :math:`\mathbb{Z}^d` used to construct a set of allowed wavevectors. i.g., if d=2, setting meshgid_shape=(2,3) will construct a meshgrid of allowed wavevectors formed by a vector of 2 values over the x-axis and a vector of 3 values over the y-axis. Defaults to None, which will run the calculation over **all** the allowed wavevectors. Defaults to None.
Returns:
tuple (numpy.ndarray, list):
- k : np.array with ``d`` columns where each row is an allowed wavevector.
.. proof:definition::
The set of the allowed wavevectors :math:`\{\mathbf{k}_i\}_i` is defined by
.. math::
\{\mathbf{k}_i\}_i = \{\frac{2 \pi}{L} \mathbf{n} ~ ; ~ \mathbf{n} \in (\mathbb{Z}^d)^\ast \}.
Note that the maximum ``n`` and the number of output allowed wavevectors returned by :py:func:`~structure_factor.tapered_estimators.allowed_k_scattering_intensity`, are specified by the input parameters ``k_max`` and ``meshgrid_shape``.
.. seealso::
- :py:func:`~structure_factor.tapered_estimators.scattering_intensity`
"""
assert isinstance(k_max, (float, int))
n_max = np.floor(k_max * L / (2 * np.pi)) # maximum of ``n``
#! todo refactoring needed, too complex and duplicated code
# warnings
if meshgrid_shape is None:
warnings.warn(
message="The computation on all allowed wavevectors may be time-consuming."
)
elif (np.array(meshgrid_shape) > (2 * n_max)).any():
warnings.warn(
message="Each component of the argument 'meshgrid_shape' should be less than or equal to the cardinality of the (total) set of allowed wavevectors."
)
# meshgrid_shape = np.fmin(meshgrid_shape, 2 * n_max)
# case d=1
if d == 1:
if meshgrid_shape is None or (meshgrid_shape > (2 * n_max)):
n = np.arange(-n_max, n_max + 1, step=1)
n = n[n != 0]
else:
n = np.linspace(-n_max, n_max, num=meshgrid_shape, dtype=int, endpoint=True)
if np.count_nonzero(n == 0) != 0:
n = np.linspace(
-n_max, n_max, num=meshgrid_shape + 1, dtype=int, endpoint=True
)
k = 2 * np.pi * n / L
k = k.reshape(-1, 1)
# case d>1
else:
if meshgrid_shape is None or (np.array(meshgrid_shape) > (2 * n_max)).any():
ranges = []
for n in n_max:
n_i = np.arange(-n, n + 1, step=1)
n_i = n_i[n_i != 0]
ranges.append(n_i)
n = meshgrid_to_column_matrix(np.meshgrid(*ranges, copy=False))
else:
ranges = []
i = 0
for s in meshgrid_shape:
n_i = np.linspace(-n_max[i], n_max[i], num=s, dtype=int, endpoint=True)
if np.count_nonzero(n_i == 0) != 0:
n_i = np.linspace(
-n_max[i], n_max[i], num=s + 1, dtype=int, endpoint=True
)
i += 1
n_i = n_i[n_i != 0]
ranges.append(n_i)
n = meshgrid_to_column_matrix(np.meshgrid(*ranges, copy=False))
k = 2 * np.pi * n / L.T
return k
[docs]def select_tapered_estimator(debiased, direct):
"""Select the tapered estimator of the structure factor.
Args:
debiased (bool): Trigger the use of a debiased tapered estimator.
direct (bool): If ``debiased`` is True, trigger the use of the direct/undirect debiased tapered estimator.
Returns:
callable: According to ``debiased`` and ``direct`` return
- :py:func:`~structure_factor.tapered_estimators.tapered_estimator_debiased_direct`
- :py:func:`~structure_factor.tapered_estimators.tapered_estimator_debiased_undirect`
- :py:func:`~structure_factor.tapered_estimators.tapered_estimator_core`
"""
if debiased:
if direct:
return tapered_estimator_debiased_direct
return tapered_estimator_debiased_undirect
return tapered_estimator_core
[docs]def tapered_estimator_core(k, point_pattern, taper):
r"""Compute the tapered estimator :math:`S_{TP}(t, k)` of the structure factor :math:`S(k)` evaluated at ``k`` w.r.t. the ``taper`` :math:`t` and the realization ``point_pattern`` of the underlying stationary point process.
Args:
k (numpy.ndarray): Array of size :math:`n \times d`, where :math:`d` is the ambient dimension and :math:`n` the number of points where the estimator is evaluated.
point_pattern (:py:class:`~structure_factor.point_pattern.PointPattern`): Realization of the underlying stationary point process.
taper (object): Class with static method or instance with method ``.taper(x, window)`` corresponding to :math:`t(x, W)`.
Returns:
numpy.ndarray: Evaluation of the tapered estimator :math:`S_{TP}(t, k)` at ``k``.
.. proof:definition::
The tapered estimator :math:`S_{TP}(t, k)`, constructed from a realization :math:`\{x_1, \dots, x_N\} \subset \mathbb{R}^d`, is defined by,
.. math::
S_{TP}(t, k) =
\frac{1}{\rho}
\left\lvert
\sum_{j=1}^N
t(x_j, W)
\exp(- i \left\langle k, x_j \right\rangle)
\right\rvert^2,
for :math:`k \in \mathbb{R}^d`.
"""
rho = point_pattern.intensity
dft = tapered_dft(k, point_pattern, taper)
estimated_sf_k = periodogram_from_dft(dft)
estimated_sf_k /= rho
return estimated_sf_k
[docs]def tapered_estimator_debiased_direct(k, point_pattern, taper):
r"""Compute the direct debiased tapered estimator :math:`S_{DDTP}(t, k)` of the structure factor :math:`S(k)` evaluated at ``k`` w.r.t. the ``taper`` :math:`t` and the realization ``point_pattern`` of the underlying stationary point process.
.. math::
S_{DDTP}(t, k) =
\frac{1}{\rho} \left\lvert
\sum_{j=1}^N
t(x_j, W)
\exp(- i \left\langle k, x_j \right\rangle)
- \rho F[t(\cdot, W)](k)
\right\rvert^2
where :math:`x_{1}, \dots, x_{N}` corresponds to ``point_pattern.points``, :math:`W` corresponds to the window ``point_pattern.window`` and :math:`\rho` is the intensity of the underlying stationary point process.
The direct debiased estimator :math:`S_{DDTP}(t, k)` is positive and asymptotically unbiased as the observation window :math:`W` grows to :math:`\mathbb{R}^d`.
Args:
k (numpy.ndarray): Array of size :math:`n \times d`, where :math:`d` is the ambient dimension and :math:`n` the number of points where the estimator is evaluated.
point_pattern (:py:class:`~structure_factor.point_pattern.PointPattern`): Realization of the underlying stationary point process.
taper (object): class instance with two methods:
- ``.taper(x, window)`` corresponding to the taper function :math:`t(x, W)` such that :math:`\|t(\cdot, W)\|_2 = 1`.
- ``.ft_taper(k, window)`` corresponding to the Fourier transform :math:`\mathcal{F}[t(\cdot, W)](k)` of the taper function.
Returns:
numpy.ndarray: Vector of size :math:`n` containing the evaluation of the direct debiased estimator :math:`S_{DDTP}(t, k)`.
"""
rho = point_pattern.intensity
window = point_pattern.window
# Debiased dft
dft = tapered_dft(k, point_pattern, taper)
dft -= rho * taper.ft_taper(k, window)
estimated_sf_k = periodogram_from_dft(dft)
estimated_sf_k /= rho
return estimated_sf_k
[docs]def tapered_estimator_debiased_undirect(k, point_pattern, taper):
r"""Compute the undirect debiased tapered estimator :math:`S_{UDTP}(t, k)` of the structure factor :math:`S(k)` evaluated at ``k`` w.r.t. the ``taper`` :math:`t` and the realization ``point_pattern`` of the underlying stationary point process.
.. math::
S_{UDTP}(t, k) =
\frac{1}{\rho}
\left\lvert
\sum_{j=1}^N
t(x_j, W)
\exp(- i \left\langle k, x_j \right\rangle)
\right\rvert^2
- \rho
\left\lvert
F[t(\cdot, W)](k))
\right\rvert^2
where :math:`x_{1}, \dots, x_{N}` corresponds to ``point_pattern.points``, :math:`W` corresponds to the window ``point_pattern.window`` and :math:`\rho` is the intensity of the underlying stationary point process.
The undirect debiased estimator :math:`S_{UDTP}(t, k)` is not guaranteed to be positive but is asymptotically unbiased as the observation window :math:`W` grows to :math:`\mathbb{R}^d`.
Args:
k (numpy.ndarray): Array of size :math:`n \times d`, where :math:`d` is the ambient dimension and :math:`n` the number of points where the estimator is evaluated.
point_pattern (:py:class:`~structure_factor.point_pattern.PointPattern`): Realization of the underlying stationary point process.
taper (object): class instance with two methods:
- ``.taper(x, window)`` corresponding to the taper function :math:`t(x, W)` such that :math:`\|t(\cdot, W)\|_2 = 1`.
- ``.ft_taper(k, window)`` corresponding to the Fourier transform :math:`\mathcal{F}[t(\cdot, W)](k)` of the taper function.
Returns:
numpy.ndarray: Vector of size :math:`n` containing the evaluation of the direct debiased estimator :math:`S_{UDTP}(t, k)`.
"""
window = point_pattern.window
rho = point_pattern.intensity
estimated_sf_k = tapered_estimator_core(k, point_pattern, taper)
Hk_2 = np.abs(taper.ft_taper(k, window))
np.square(Hk_2, out=Hk_2)
estimated_sf_k -= rho * Hk_2
return estimated_sf_k
[docs]def tapered_dft(k, point_pattern, taper):
r"""Compute the tapered discrete Fourier transform (tapered DFT) associated with ``point_pattern`` evaluated at ``k``, using ``taper`` :math:`t`.
.. math::
\sum_{j=1}^N t(x_j, W) \exp(- i \langle k, x_j \rangle),
where :math:`x_{1}, \dots, x_{N}` corresponds to ``point_pattern.points`` and :math:`W` corresponds to the window ``point_pattern.window``.
Args:
k (numpy.ndarray): Array of size :math:`n \times d`, where :math:`d` is the ambient dimension and :math:`n` the number of points where the tapered DFT is evaluated.
point_pattern (:py:class:`~structure_factor.point_pattern.PointPattern`): Realization of the underlying stationary point process.
taper (object): class with static method or instance with method ``.taper(x, window)`` corresponding to :math:`t(x, W)` such that :math:`\|t(\cdot, W)\|_2 = 1`.
Returns:
numpy.ndarray: Evaluation of the DFT of the taper h
"""
points = point_pattern.points
window = point_pattern.window
K = np.atleast_2d(k)
X = np.atleast_2d(points)
nb_k, _ = K.shape
nb_x, _ = X.shape
# dft = sum_x t(x) exp(- i <k, x>)
hx_exp_ikx = np.zeros((nb_k, nb_x), dtype=complex)
# i <k, x>
hx_exp_ikx.imag = np.dot(K, X.T)
# - i <k, x>
np.conj(hx_exp_ikx, out=hx_exp_ikx)
# exp(- i <k, x>)
np.exp(hx_exp_ikx, out=hx_exp_ikx)
# t(x) exp(- i <k, x>)
hx = taper.taper(X, window)
hx_exp_ikx *= hx
# sum_x t(x) exp(- i <k, x>)
dft = np.sum(hx_exp_ikx, axis=1)
return dft
[docs]def periodogram_from_dft(dft):
r"""Compute the square absolute value ``abs(dft)**2`` of the discrete Fourier transform ``dft``.
.. math::
\left\lvert
\sum_{j=1}^N t(x_j) \exp(- i \langle k, x_j \rangle)
\right\rvert ^ 2,
Args:
dft (numpy.ndarray): Discrete Fourier transform computed via :py:func:`~structure_factor.tapered_estimators.tapered_dft`.
Returns:
numpy.ndarray: ``abs(dft)**2``.
"""
periodogram = np.zeros_like(dft, dtype=float)
np.abs(dft, out=periodogram)
np.square(periodogram, out=periodogram)
return periodogram