Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 187 additions & 0 deletions stumpy/sdp.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@

from . import config

try: # pragma: no cover
import pyfftw

PYFFTW_IS_AVAILABLE = True
except ImportError: # pragma: no cover
PYFFTW_IS_AVAILABLE = False


Copy link
Copy Markdown
Collaborator Author

@NimaSarajpoor NimaSarajpoor May 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment here that says: "The name of any callable object that computes the sliding dot product should end with sliding_dot_product"

@njit(fastmath=config.STUMPY_FASTMATH_TRUE)
def _njit_sliding_dot_product(Q, T):
Expand Down Expand Up @@ -109,6 +116,186 @@ def _pocketfft_sliding_dot_product(Q, T):
return c2r(False, np.multiply(fft_2d[0], fft_2d[1]), n=next_fast_n)[m - 1 : n]


def _make_pyfftw_sliding_dot_product(max_n=2**20, real_dtype="float64"):
"""
A closure to compute the sliding dot product using FFTW via pyfftw.

This closure uses FFTW (via pyfftw) to efficiently compute the sliding dot product
between a query sequence, Q, and a time series, T. It preallocates arrays and caches
FFTW objects to optimize repeated computations with similar-sized inputs.

Parameters
----------
max_n : int, default=2**20
Maximum length to preallocate arrays for. This will be the size of the
real-valued array. A complex-valued array of size 1 + (max_n // 2)
will also be preallocated. If inputs exceed this size, arrays will be
reallocated to accommodate larger sizes.

real_dtype : str, default="float64"
The real data type to use for the preallocated arrays. Must be either
"float64" or "longdouble". The complex data type will be set to
"complex128" or "clongdouble", respectively.

Returns
-------
sliding_dot_product : callable
A callable that computes the sliding dot product between Q and T using FFTW
via pyfftw, and caches FFTW objects if not already cached.

Notes
-----
The closure maintains internal caches of FFTW objects to avoid redundant planning
operations when called multiple times with similar-sized inputs and parameters.
Comment thread
NimaSarajpoor marked this conversation as resolved.
When planning_flag == "FFTW_ESTIMATE", there will be no planning operation.
However, caching FFTW objects is still beneficial as the overhead of creating
those objects can be avoided in subsequent calls.

References
----------
FFTW documentation: http://www.fftw.org/
pyfftw documentation: https://pyfftw.readthedocs.io/
"""
REAL_TO_COMPLEX_MAP = {
"float64": "complex128",
"longdouble": "clongdouble",
}
if real_dtype not in ["float64", "longdouble"]: # pragma: no cover
raise ValueError(
f"Invalid real_dtype: {real_dtype}. Must be 'float64' or 'longdouble'."
)
complex_dtype = REAL_TO_COMPLEX_MAP[real_dtype]

# Preallocate arrays
real_arr = pyfftw.empty_aligned(max_n, dtype=real_dtype)
complex_arr = pyfftw.empty_aligned(1 + (max_n // 2), dtype=complex_dtype)

# Store FFTW objects, keyed by (next_fast_n, n_threads, planning_flag)
rfft_objects = {}
irfft_objects = {}

def sliding_dot_product(Q, T, n_threads=1, planning_flag="FFTW_ESTIMATE"):
"""
Compute the sliding dot product between Q and T using FFTW via pyfftw,
and cache FFTW objects if not already cached.

Parameters
----------
Q : numpy.ndarray
Query array or subsequence.

T : numpy.ndarray
Time series or sequence.

n_threads : int, default=1
Number of threads to use for FFTW computations.

planning_flag : str, default="FFTW_ESTIMATE"
The planning flag that will be used in FFTW for planning.
See pyfftw documentation for details. Current options, ordered
ascendingly by the level of aggressiveness in planning, are:
"FFTW_ESTIMATE", "FFTW_MEASURE", "FFTW_PATIENT", and "FFTW_EXHAUSTIVE".
The more aggressive the planning, the longer the planning time, but
the faster the execution time.

Returns
-------
out : numpy.ndarray
Sliding dot product between Q and T.

Notes
-----
The planning_flag is defaulted to "FFTW_ESTIMATE" to be aligned with
MATLAB's FFTW usage (as of version R2025b)
See: https://www.mathworks.com/help/matlab/ref/fftw.html

This implementation is inspired by the answer on StackOverflow:
https://stackoverflow.com/a/30615425/2955541
"""
nonlocal real_arr, complex_arr

m = Q.shape[0]
n = T.shape[0]
next_fast_n = pyfftw.next_fast_len(n)

# Update preallocated arrays if needed
if next_fast_n > len(real_arr):
real_arr = pyfftw.empty_aligned(next_fast_n, dtype=real_arr.dtype)
complex_arr = pyfftw.empty_aligned(
1 + (next_fast_n // 2), dtype=complex_arr.dtype
)

real_view = real_arr[:next_fast_n]
complex_view = complex_arr[: 1 + (next_fast_n // 2)]

# Get or create FFTW objects
key = (next_fast_n, n_threads, planning_flag)

rfft_obj = rfft_objects.get(key, None)
irfft_obj = irfft_objects.get(key, None)

if rfft_obj is None or irfft_obj is None:
rfft_obj = pyfftw.FFTW(
input_array=real_view,
output_array=complex_view,
direction="FFTW_FORWARD",
flags=(planning_flag,),
threads=n_threads,
)
irfft_obj = pyfftw.FFTW(
input_array=complex_view,
output_array=real_view,
direction="FFTW_BACKWARD",
flags=(planning_flag, "FFTW_DESTROY_INPUT"),
threads=n_threads,
)
rfft_objects[key] = rfft_obj
irfft_objects[key] = irfft_obj
else:
# Update the input and output arrays of the cached FFTW objects
# in case their original input and output arrays were reallocated
# in a previous call
rfft_obj.update_arrays(real_view, complex_view)
irfft_obj.update_arrays(complex_view, real_view)

# Compute the (circular) convolution between T and Q[::-1],
# each zero-padded to length next_fast_n by performing
# the following three steps:

# Step 1
# Compute RFFT of T (zero-padded)
# Must make a copy of output to avoid losing it when the array is
# overwritten when computing the RFFT of Q in the next step
rfft_obj.input_array[:n] = T
rfft_obj.input_array[n:] = 0.0
rfft_obj.execute()
rfft_T = rfft_obj.output_array.copy()

# Step 2
# Compute RFFT of Q (reversed, scaled, and zero-padded)
# Scaling is required because the thin wrapper execute
# that will be called below does not perform normalization
np.multiply(Q[::-1], 1.0 / next_fast_n, out=rfft_obj.input_array[:m])
rfft_obj.input_array[m:] = 0.0
rfft_obj.execute()
rfft_Q = rfft_obj.output_array

# Step 3
# Convert back to time domain by taking the inverse RFFT
np.multiply(rfft_T, rfft_Q, out=irfft_obj.input_array)
irfft_obj.execute()

return irfft_obj.output_array[m - 1 : n] # valid portion

return sliding_dot_product


if PYFFTW_IS_AVAILABLE: # pragma: no cover
_pyfftw_sliding_dot_product = _make_pyfftw_sliding_dot_product(
max_n=2**20, real_dtype="float64"
)


def _sliding_dot_product(Q, T):
"""
Compute the sliding dot product between `Q` and `T`
Expand Down
49 changes: 40 additions & 9 deletions test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -154,27 +154,57 @@ gen_ray_coveragerc()
# Generate a .coveragerc_override file that excludes Ray functions and tests
gen_coveragerc_boilerplate
echo " def .*_ray_*" >> .coveragerc_override
echo " def ,*_ray\(*" >> .coveragerc_override
echo " def .*_ray\(*" >> .coveragerc_override
echo " def ray_.*" >> .coveragerc_override
echo " def test_.*_ray*" >> .coveragerc_override
}

set_ray_coveragerc()
check_fftw_pyfftw()
{
# If `ray` command is not found then generate a .coveragerc_override file
if ! command -v ray &> /dev/null
if ! python -c "import pyfftw" &> /dev/null;
then
echo "Ray Not Installed"
echo "pyFFTW cannot be imported"
else
echo "pyFFTW was successfully imported"
fi
}

gen_pyfftw_coveragerc()
{
gen_coveragerc_boilerplate
echo " def .*pyfftw.*" >> .coveragerc_override
echo " def test_.*pyfftw.*" >> .coveragerc_override
}

set_coveragerc()
{
fcoveragerc=""

if ! command -v ray &> /dev/null;
then
echo "Ray not installed"
gen_ray_coveragerc
fcoveragerc="--rcfile=.coveragerc_override"
else
echo "Ray Installed"
echo "Ray installed"
fi

if ! command -v fftw-wisdom &> /dev/null \
|| ! python -c "import pyfftw" &> /dev/null;
then
echo "FFTW and/or pyFFTW not Installed"
gen_pyfftw_coveragerc
else
echo "FFTW and pyFFTW Installed"
fi

if [ -f .coveragerc_override ]; then
fcoveragerc="--rcfile=.coveragerc_override"
fi
}

show_coverage_report()
{
set_ray_coveragerc
set_coveragerc
coverage report --show-missing --fail-under=100 --skip-covered --omit=fastmath.py,docstring.py,versions.py $fcoveragerc
check_errs $?
}
Expand Down Expand Up @@ -361,6 +391,7 @@ check_print
check_pkg_imports
check_naive
check_ray
check_fftw_pyfftw


if [[ -z $NUMBA_DISABLE_JIT || $NUMBA_DISABLE_JIT -eq 0 ]]; then
Expand Down Expand Up @@ -405,4 +436,4 @@ else
test_coverage
fi

clean_up
clean_up
87 changes: 85 additions & 2 deletions tests/test_sdp.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,10 @@

def get_sdp_function_names():
out = []
for func_name, func in inspect.getmembers(sdp, inspect.isfunction):
if func_name.endswith("sliding_dot_product"):
for func_name, func in inspect.getmembers(sdp, callable):
if func_name.endswith("sliding_dot_product") and inspect.signature(
func
).parameters.keys() >= {"Q", "T"}:
out.append(func_name)

return out
Expand Down Expand Up @@ -152,3 +154,84 @@ def test_sdp_power2():
raise e

return


def test_pyfftw_sdp_max_n():
if not sdp.PYFFTW_IS_AVAILABLE: # pragma: no cover
pytest.skip("Skipping Test pyFFTW Not Installed")

# When `len(T)` larger than `real_arr` in pyfftw_sdp,
# the internal preallocated arrays should be resized.
# This test checks that functionality.

max_n = 2**10
_pyfftw_sliding_dot_product = sdp._make_pyfftw_sliding_dot_product(
max_n=max_n, real_dtype="float64"
)

# len(T) > max_n to trigger array resizing
T = np.random.rand(max_n + 1)
Q = np.random.rand(2**2)

comp = _pyfftw_sliding_dot_product(Q, T)
ref = naive.rolling_window_dot_product(Q, T)

np.testing.assert_allclose(comp, ref)

return


def test_pyfftw_sdp_longdoube():
if not sdp.PYFFTW_IS_AVAILABLE: # pragma: no cover
pytest.skip("Skipping Test pyFFTW Not Installed")

# This test checks that the pyfftw_sdp can be initialized with longdouble data type
max_n = 2**10
sdp_func = sdp._make_pyfftw_sliding_dot_product(max_n, real_dtype="longdouble")

T = np.random.rand(max_n)
Q = np.random.rand(2**8)

comp = sdp_func(Q, T)
ref = naive.rolling_window_dot_product(Q, T)

np.testing.assert_allclose(comp, ref)

return


def test_pyfftw_sdp_multithreaded():
if not sdp.PYFFTW_IS_AVAILABLE: # pragma: no cover
pytest.skip("Skipping Test pyFFTW Not Installed")

# This test checks that the pyfftw_sdp can be initialized
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# This test checks that the pyfftw_sdp can be initialized
# This test checks that the pyfftw_sdp instantiated object
can be called in a multithreaded mode.

# with multiple threads
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# with multiple threads

T = np.random.rand(2**5)
Q = np.random.rand(2**4)

comp = sdp._pyfftw_sliding_dot_product(Q, T, n_threads=2)
ref = naive.rolling_window_dot_product(Q, T)

np.testing.assert_allclose(comp, ref)

return


def test_pyfftw_sdp_multithreaded_longdouble():
if not sdp.PYFFTW_IS_AVAILABLE: # pragma: no cover
pytest.skip("Skipping Test pyFFTW Not Installed")

# This test checks that the pyfftw_sdp can be initialized
# with multiple threads and longdouble data type
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# with multiple threads and longdouble data type
# longdouble data type, and then called in a multithreaded mode.

Note that the number of threads is not set when the instance is created

max_n = 2**10
sdp_func = sdp._make_pyfftw_sliding_dot_product(max_n, real_dtype="longdouble")

T = np.random.rand(max_n)
Q = np.random.rand(2**8)

comp = sdp_func(Q, T, n_threads=2)
ref = naive.rolling_window_dot_product(Q, T)

np.testing.assert_allclose(comp, ref)

return
Loading