diff --git a/stumpy/sdp.py b/stumpy/sdp.py index bd1d8fea7..6b0a663dd 100644 --- a/stumpy/sdp.py +++ b/stumpy/sdp.py @@ -6,6 +6,13 @@ from . import config +try: # pragma: no cover + import pyfftw + + PYFFTW_IS_AVAILABLE = True +except ImportError: # pragma: no cover + PYFFTW_IS_AVAILABLE = False + @njit(fastmath=config.STUMPY_FASTMATH_TRUE) def _njit_sliding_dot_product(Q, T): @@ -109,6 +116,186 @@ def _pocketfft_sliding_dot_product(Q, T): return c2r(False, np.multiply(fft_2d[0], fft_2d[1]), n=next_fast_n)[m - 1 : n] +def _make_pyfftw_sliding_dot_product(max_n=2**20, real_dtype="float64"): + """ + A closure to compute the sliding dot product using FFTW via pyfftw. + + This closure uses FFTW (via pyfftw) to efficiently compute the sliding dot product + between a query sequence, Q, and a time series, T. It preallocates arrays and caches + FFTW objects to optimize repeated computations with similar-sized inputs. + + Parameters + ---------- + max_n : int, default=2**20 + Maximum length to preallocate arrays for. This will be the size of the + real-valued array. A complex-valued array of size 1 + (max_n // 2) + will also be preallocated. If inputs exceed this size, arrays will be + reallocated to accommodate larger sizes. + + real_dtype : str, default="float64" + The real data type to use for the preallocated arrays. Must be either + "float64" or "longdouble". The complex data type will be set to + "complex128" or "clongdouble", respectively. + + Returns + ------- + sliding_dot_product : callable + A callable that computes the sliding dot product between Q and T using FFTW + via pyfftw, and caches FFTW objects if not already cached. + + Notes + ----- + The closure maintains internal caches of FFTW objects to avoid redundant planning + operations when called multiple times with similar-sized inputs and parameters. + When planning_flag == "FFTW_ESTIMATE", there will be no planning operation. + However, caching FFTW objects is still beneficial as the overhead of creating + those objects can be avoided in subsequent calls. + + References + ---------- + FFTW documentation: http://www.fftw.org/ + pyfftw documentation: https://pyfftw.readthedocs.io/ + """ + REAL_TO_COMPLEX_MAP = { + "float64": "complex128", + "longdouble": "clongdouble", + } + if real_dtype not in ["float64", "longdouble"]: # pragma: no cover + raise ValueError( + f"Invalid real_dtype: {real_dtype}. Must be 'float64' or 'longdouble'." + ) + complex_dtype = REAL_TO_COMPLEX_MAP[real_dtype] + + # Preallocate arrays + real_arr = pyfftw.empty_aligned(max_n, dtype=real_dtype) + complex_arr = pyfftw.empty_aligned(1 + (max_n // 2), dtype=complex_dtype) + + # Store FFTW objects, keyed by (next_fast_n, n_threads, planning_flag) + rfft_objects = {} + irfft_objects = {} + + def sliding_dot_product(Q, T, n_threads=1, planning_flag="FFTW_ESTIMATE"): + """ + Compute the sliding dot product between Q and T using FFTW via pyfftw, + and cache FFTW objects if not already cached. + + Parameters + ---------- + Q : numpy.ndarray + Query array or subsequence. + + T : numpy.ndarray + Time series or sequence. + + n_threads : int, default=1 + Number of threads to use for FFTW computations. + + planning_flag : str, default="FFTW_ESTIMATE" + The planning flag that will be used in FFTW for planning. + See pyfftw documentation for details. Current options, ordered + ascendingly by the level of aggressiveness in planning, are: + "FFTW_ESTIMATE", "FFTW_MEASURE", "FFTW_PATIENT", and "FFTW_EXHAUSTIVE". + The more aggressive the planning, the longer the planning time, but + the faster the execution time. + + Returns + ------- + out : numpy.ndarray + Sliding dot product between Q and T. + + Notes + ----- + The planning_flag is defaulted to "FFTW_ESTIMATE" to be aligned with + MATLAB's FFTW usage (as of version R2025b) + See: https://www.mathworks.com/help/matlab/ref/fftw.html + + This implementation is inspired by the answer on StackOverflow: + https://stackoverflow.com/a/30615425/2955541 + """ + nonlocal real_arr, complex_arr + + m = Q.shape[0] + n = T.shape[0] + next_fast_n = pyfftw.next_fast_len(n) + + # Update preallocated arrays if needed + if next_fast_n > len(real_arr): + real_arr = pyfftw.empty_aligned(next_fast_n, dtype=real_arr.dtype) + complex_arr = pyfftw.empty_aligned( + 1 + (next_fast_n // 2), dtype=complex_arr.dtype + ) + + real_view = real_arr[:next_fast_n] + complex_view = complex_arr[: 1 + (next_fast_n // 2)] + + # Get or create FFTW objects + key = (next_fast_n, n_threads, planning_flag) + + rfft_obj = rfft_objects.get(key, None) + irfft_obj = irfft_objects.get(key, None) + + if rfft_obj is None or irfft_obj is None: + rfft_obj = pyfftw.FFTW( + input_array=real_view, + output_array=complex_view, + direction="FFTW_FORWARD", + flags=(planning_flag,), + threads=n_threads, + ) + irfft_obj = pyfftw.FFTW( + input_array=complex_view, + output_array=real_view, + direction="FFTW_BACKWARD", + flags=(planning_flag, "FFTW_DESTROY_INPUT"), + threads=n_threads, + ) + rfft_objects[key] = rfft_obj + irfft_objects[key] = irfft_obj + else: + # Update the input and output arrays of the cached FFTW objects + # in case their original input and output arrays were reallocated + # in a previous call + rfft_obj.update_arrays(real_view, complex_view) + irfft_obj.update_arrays(complex_view, real_view) + + # Compute the (circular) convolution between T and Q[::-1], + # each zero-padded to length next_fast_n by performing + # the following three steps: + + # Step 1 + # Compute RFFT of T (zero-padded) + # Must make a copy of output to avoid losing it when the array is + # overwritten when computing the RFFT of Q in the next step + rfft_obj.input_array[:n] = T + rfft_obj.input_array[n:] = 0.0 + rfft_obj.execute() + rfft_T = rfft_obj.output_array.copy() + + # Step 2 + # Compute RFFT of Q (reversed, scaled, and zero-padded) + # Scaling is required because the thin wrapper execute + # that will be called below does not perform normalization + np.multiply(Q[::-1], 1.0 / next_fast_n, out=rfft_obj.input_array[:m]) + rfft_obj.input_array[m:] = 0.0 + rfft_obj.execute() + rfft_Q = rfft_obj.output_array + + # Step 3 + # Convert back to time domain by taking the inverse RFFT + np.multiply(rfft_T, rfft_Q, out=irfft_obj.input_array) + irfft_obj.execute() + + return irfft_obj.output_array[m - 1 : n] # valid portion + + return sliding_dot_product + + +if PYFFTW_IS_AVAILABLE: # pragma: no cover + _pyfftw_sliding_dot_product = _make_pyfftw_sliding_dot_product( + max_n=2**20, real_dtype="float64" + ) + + def _sliding_dot_product(Q, T): """ Compute the sliding dot product between `Q` and `T` diff --git a/test.sh b/test.sh index 194288574..7d1b30c1e 100755 --- a/test.sh +++ b/test.sh @@ -154,27 +154,57 @@ gen_ray_coveragerc() # Generate a .coveragerc_override file that excludes Ray functions and tests gen_coveragerc_boilerplate echo " def .*_ray_*" >> .coveragerc_override - echo " def ,*_ray\(*" >> .coveragerc_override + echo " def .*_ray\(*" >> .coveragerc_override echo " def ray_.*" >> .coveragerc_override echo " def test_.*_ray*" >> .coveragerc_override } -set_ray_coveragerc() +check_fftw_pyfftw() { - # If `ray` command is not found then generate a .coveragerc_override file - if ! command -v ray &> /dev/null + if ! python -c "import pyfftw" &> /dev/null; then - echo "Ray Not Installed" + echo "pyFFTW cannot be imported" + else + echo "pyFFTW was successfully imported" + fi +} + +gen_pyfftw_coveragerc() +{ + gen_coveragerc_boilerplate + echo " def .*pyfftw.*" >> .coveragerc_override + echo " def test_.*pyfftw.*" >> .coveragerc_override +} + +set_coveragerc() +{ + fcoveragerc="" + + if ! command -v ray &> /dev/null; + then + echo "Ray not installed" gen_ray_coveragerc - fcoveragerc="--rcfile=.coveragerc_override" else - echo "Ray Installed" + echo "Ray installed" + fi + + if ! command -v fftw-wisdom &> /dev/null \ + || ! python -c "import pyfftw" &> /dev/null; + then + echo "FFTW and/or pyFFTW not Installed" + gen_pyfftw_coveragerc + else + echo "FFTW and pyFFTW Installed" + fi + + if [ -f .coveragerc_override ]; then + fcoveragerc="--rcfile=.coveragerc_override" fi } show_coverage_report() { - set_ray_coveragerc + set_coveragerc coverage report --show-missing --fail-under=100 --skip-covered --omit=fastmath.py,docstring.py,versions.py $fcoveragerc check_errs $? } @@ -361,6 +391,7 @@ check_print check_pkg_imports check_naive check_ray +check_fftw_pyfftw if [[ -z $NUMBA_DISABLE_JIT || $NUMBA_DISABLE_JIT -eq 0 ]]; then @@ -405,4 +436,4 @@ else test_coverage fi -clean_up +clean_up \ No newline at end of file diff --git a/tests/test_sdp.py b/tests/test_sdp.py index b8d98eeb9..0c1021a3b 100644 --- a/tests/test_sdp.py +++ b/tests/test_sdp.py @@ -70,8 +70,10 @@ def get_sdp_function_names(): out = [] - for func_name, func in inspect.getmembers(sdp, inspect.isfunction): - if func_name.endswith("sliding_dot_product"): + for func_name, func in inspect.getmembers(sdp, callable): + if func_name.endswith("sliding_dot_product") and inspect.signature( + func + ).parameters.keys() >= {"Q", "T"}: out.append(func_name) return out @@ -152,3 +154,84 @@ def test_sdp_power2(): raise e return + + +def test_pyfftw_sdp_max_n(): + if not sdp.PYFFTW_IS_AVAILABLE: # pragma: no cover + pytest.skip("Skipping Test pyFFTW Not Installed") + + # When `len(T)` larger than `real_arr` in pyfftw_sdp, + # the internal preallocated arrays should be resized. + # This test checks that functionality. + + max_n = 2**10 + _pyfftw_sliding_dot_product = sdp._make_pyfftw_sliding_dot_product( + max_n=max_n, real_dtype="float64" + ) + + # len(T) > max_n to trigger array resizing + T = np.random.rand(max_n + 1) + Q = np.random.rand(2**2) + + comp = _pyfftw_sliding_dot_product(Q, T) + ref = naive.rolling_window_dot_product(Q, T) + + np.testing.assert_allclose(comp, ref) + + return + + +def test_pyfftw_sdp_longdoube(): + if not sdp.PYFFTW_IS_AVAILABLE: # pragma: no cover + pytest.skip("Skipping Test pyFFTW Not Installed") + + # This test checks that the pyfftw_sdp can be initialized with longdouble data type + max_n = 2**10 + sdp_func = sdp._make_pyfftw_sliding_dot_product(max_n, real_dtype="longdouble") + + T = np.random.rand(max_n) + Q = np.random.rand(2**8) + + comp = sdp_func(Q, T) + ref = naive.rolling_window_dot_product(Q, T) + + np.testing.assert_allclose(comp, ref) + + return + + +def test_pyfftw_sdp_multithreaded(): + if not sdp.PYFFTW_IS_AVAILABLE: # pragma: no cover + pytest.skip("Skipping Test pyFFTW Not Installed") + + # This test checks that the pyfftw_sdp can be initialized + # with multiple threads + T = np.random.rand(2**5) + Q = np.random.rand(2**4) + + comp = sdp._pyfftw_sliding_dot_product(Q, T, n_threads=2) + ref = naive.rolling_window_dot_product(Q, T) + + np.testing.assert_allclose(comp, ref) + + return + + +def test_pyfftw_sdp_multithreaded_longdouble(): + if not sdp.PYFFTW_IS_AVAILABLE: # pragma: no cover + pytest.skip("Skipping Test pyFFTW Not Installed") + + # This test checks that the pyfftw_sdp can be initialized + # with multiple threads and longdouble data type + max_n = 2**10 + sdp_func = sdp._make_pyfftw_sliding_dot_product(max_n, real_dtype="longdouble") + + T = np.random.rand(max_n) + Q = np.random.rand(2**8) + + comp = sdp_func(Q, T, n_threads=2) + ref = naive.rolling_window_dot_product(Q, T) + + np.testing.assert_allclose(comp, ref) + + return