diff --git a/stumpy/sdp.py b/stumpy/sdp.py index bd1d8fea7..a809bada9 100644 --- a/stumpy/sdp.py +++ b/stumpy/sdp.py @@ -6,6 +6,13 @@ from . import config +try: # pragma: no cover + import pyfftw + + PYFFTW_IS_AVAILABLE = True +except ImportError: # pragma: no cover + PYFFTW_IS_AVAILABLE = False + @njit(fastmath=config.STUMPY_FASTMATH_TRUE) def _njit_sliding_dot_product(Q, T): @@ -109,6 +116,206 @@ def _pocketfft_sliding_dot_product(Q, T): return c2r(False, np.multiply(fft_2d[0], fft_2d[1]), n=next_fast_n)[m - 1 : n] +class _PYFFTW_SLIDING_DOT_PRODUCT: + """ + A class to compute the sliding dot product using FFTW via pyfftw. + + This class uses FFTW (via pyfftw) to efficiently compute the sliding dot product + between a query sequence, Q, and a time series, T. It preallocates arrays and caches + FFTW objects to optimize repeated computations with similar-sized inputs. + + Parameters + ---------- + max_n : int, default=2**20 + Maximum length to preallocate arrays for. This will be the size of the + real-valued array. A complex-valued array of size `1 + (max_n // 2)` + will also be preallocated. If inputs exceed this size, arrays will be + reallocated to accommodate larger sizes. + + Attributes + ---------- + real_arr : pyfftw.empty_aligned + Preallocated real-valued array for FFTW computations. + + complex_arr : pyfftw.empty_aligned + Preallocated complex-valued array for FFTW computations. + + rfft_objects : dict + Cache of FFTW forward transform objects with + (next_fast_n, n_threads, planning_flag) as lookup keys. + + irfft_objects : dict + Cache of FFTW inverse transform objects with + (next_fast_n, n_threads, planning_flag) as lookup keys. + + Methods + ------- + __call__(Q, T, n_threads=1, planning_flag="FFTW_ESTIMATE") + Compute the sliding dot product between `Q` and `T` using FFTW + via pyfftw, and cache FFTW objects if not already cached. + + Notes + ----- + The class maintains internal caches of FFTW objects to avoid redundant planning + operations when called multiple times with similar-sized inputs and parameters. + When `planning_flag=="FFTW_ESTIMATE"`, there will be no planning operation. + However, caching FFTW objects is still beneficial as the overhead of creating + those objects can be avoided in subsequent calls. + + Examples + -------- + >>> sdp_obj = _PYFFTW_SLIDING_DOT_PRODUCT(max_n=1000) + >>> Q = np.array([1, 2, 3]) + >>> T = np.array([4, 5, 6, 7, 8]) + >>> result = sdp_obj(Q, T) + + References + ---------- + `FFTW documentation `__ + + `pyfftw documentation `__ + """ + + def __init__(self, max_n=2**20): + """ + Initialize the `_PYFFTW_SLIDING_DOT_PRODUCT` object, which can be called + to compute the sliding dot product using FFTW via pyfftw. + + Parameters + ---------- + max_n : int, default=2**20 + Maximum length to preallocate arrays for. This will be the size of the + real-valued array. A complex-valued array of size `1 + (max_n // 2)` + will also be preallocated. + + Returns + ------- + None + """ + # Preallocate arrays + self.real_arr = pyfftw.empty_aligned(max_n, dtype="float64") + self.complex_arr = pyfftw.empty_aligned(1 + (max_n // 2), dtype="complex128") + + # Store FFTW objects, keyed by (next_fast_n, n_threads, planning_flag) + self.rfft_objects = {} + self.irfft_objects = {} + + def __call__(self, Q, T, n_threads=1, planning_flag="FFTW_ESTIMATE"): + """ + Compute the sliding dot product between `Q` and `T` using FFTW via pyfftw, + and cache FFTW objects if not already cached. + + Parameters + ---------- + Q : numpy.ndarray + Query array or subsequence. + + T : numpy.ndarray + Time series or sequence. + + n_threads : int, default=1 + Number of threads to use for FFTW computations. + + planning_flag : str, default="FFTW_ESTIMATE" + The planning flag that will be used in FFTW for planning. + See pyfftw documentation for details. Current options, ordered + ascendingly by the level of aggressiveness in planning, are: + "FFTW_ESTIMATE", "FFTW_MEASURE", "FFTW_PATIENT", and "FFTW_EXHAUSTIVE". + The more aggressive the planning, the longer the planning time, but + the faster the execution time. + + Returns + ------- + out : numpy.ndarray + Sliding dot product between `Q` and `T`. + + Notes + ----- + The planning_flag is defaulted to "FFTW_ESTIMATE" to be aligned with + MATLAB's FFTW usage (as of version R2025b) + See: https://www.mathworks.com/help/matlab/ref/fftw.html + + This implementation is inspired by the answer on StackOverflow: + https://stackoverflow.com/a/30615425/2955541 + """ + m = Q.shape[0] + n = T.shape[0] + next_fast_n = pyfftw.next_fast_len(n) + + # Update preallocated arrays if needed + if next_fast_n > len(self.real_arr): + self.real_arr = pyfftw.empty_aligned(next_fast_n, dtype="float64") + self.complex_arr = pyfftw.empty_aligned( + 1 + (next_fast_n // 2), dtype="complex128" + ) + + real_arr = self.real_arr[:next_fast_n] + complex_arr = self.complex_arr[: 1 + (next_fast_n // 2)] + + # Get or create FFTW objects + key = (next_fast_n, n_threads, planning_flag) + + rfft_obj = self.rfft_objects.get(key, None) + irfft_obj = self.irfft_objects.get(key, None) + + if rfft_obj is None or irfft_obj is None: + rfft_obj = pyfftw.FFTW( + input_array=real_arr, + output_array=complex_arr, + direction="FFTW_FORWARD", + flags=(planning_flag,), + threads=n_threads, + ) + irfft_obj = pyfftw.FFTW( + input_array=complex_arr, + output_array=real_arr, + direction="FFTW_BACKWARD", + flags=(planning_flag, "FFTW_DESTROY_INPUT"), + threads=n_threads, + ) + self.rfft_objects[key] = rfft_obj + self.irfft_objects[key] = irfft_obj + else: + # Update the input and output arrays of the cached FFTW objects + # in case their original input and output arrays were reallocated + # in a previous call + rfft_obj.update_arrays(real_arr, complex_arr) + irfft_obj.update_arrays(complex_arr, real_arr) + + # Compute the (circular) convolution between T and Q[::-1], + # each zero-padded to length `next_fast_n` by performing + # the following three steps: + + # Step 1 + # Compute RFFT of T (zero-padded) + # Must make a copy of output to avoid losing it when the array is + # overwritten when computing the RFFT of Q in the next step + rfft_obj.input_array[:n] = T + rfft_obj.input_array[n:] = 0.0 + rfft_obj.execute() + rfft_T = rfft_obj.output_array.copy() + + # Step 2 + # Compute RFFT of Q (reversed, scaled, and zero-padded) + # Scaling is required because the thin wrapper `execute` + # that will be called below does not perform normalization + np.multiply(Q[::-1], 1.0 / next_fast_n, out=rfft_obj.input_array[:m]) + rfft_obj.input_array[m:] = 0.0 + rfft_obj.execute() + rfft_Q = rfft_obj.output_array + + # Step 3 + # Convert back to time domain by taking the inverse RFFT + np.multiply(rfft_T, rfft_Q, out=irfft_obj.input_array) + irfft_obj.execute() + + return irfft_obj.output_array[m - 1 : n] # valid portion + + +if PYFFTW_IS_AVAILABLE: # pragma: no cover + _pyfftw_sliding_dot_product = _PYFFTW_SLIDING_DOT_PRODUCT(max_n=2**20) + + def _sliding_dot_product(Q, T): """ Compute the sliding dot product between `Q` and `T` diff --git a/test.sh b/test.sh index 194288574..fc80e76c3 100755 --- a/test.sh +++ b/test.sh @@ -154,27 +154,58 @@ gen_ray_coveragerc() # Generate a .coveragerc_override file that excludes Ray functions and tests gen_coveragerc_boilerplate echo " def .*_ray_*" >> .coveragerc_override - echo " def ,*_ray\(*" >> .coveragerc_override + echo " def .*_ray\(*" >> .coveragerc_override echo " def ray_.*" >> .coveragerc_override echo " def test_.*_ray*" >> .coveragerc_override } -set_ray_coveragerc() +check_fftw_pyfftw() { - # If `ray` command is not found then generate a .coveragerc_override file - if ! command -v ray &> /dev/null + if ! command -v fftw-wisdom &> /dev/null \ + || ! python -c "import pyfftw" &> /dev/null; then - echo "Ray Not Installed" + echo "FFTW and/or pyFFTW Not Installed" + else + echo "FFTW and pyFFTW Installed" + fi +} + +gen_pyfftw_coveragerc() +{ + gen_coveragerc_boilerplate + echo " class .*PYFFTW*" >> .coveragerc_override + echo " def test_.*pyfftw*" >> .coveragerc_override +} + +set_coveragerc() +{ + fcoveragerc="" + + if ! command -v ray &> /dev/null; + then + echo "Ray not installed" gen_ray_coveragerc - fcoveragerc="--rcfile=.coveragerc_override" else - echo "Ray Installed" + echo "Ray installed" + fi + + if ! command -v fftw-wisdom &> /dev/null \ + || ! python -c "import pyfftw" &> /dev/null; + then + echo "FFTW and/or pyFFTW not Installed" + gen_pyfftw_coveragerc + else + echo "FFTW and pyFFTW Installed" + fi + + if [ -f .coveragerc_override ]; then + fcoveragerc="--rcfile=.coveragerc_override" fi } show_coverage_report() { - set_ray_coveragerc + set_coveragerc coverage report --show-missing --fail-under=100 --skip-covered --omit=fastmath.py,docstring.py,versions.py $fcoveragerc check_errs $? } @@ -361,6 +392,7 @@ check_print check_pkg_imports check_naive check_ray +check_fftw_pyfftw if [[ -z $NUMBA_DISABLE_JIT || $NUMBA_DISABLE_JIT -eq 0 ]]; then @@ -405,4 +437,4 @@ else test_coverage fi -clean_up +clean_up \ No newline at end of file diff --git a/tests/test_sdp.py b/tests/test_sdp.py index b8d98eeb9..3ec26971f 100644 --- a/tests/test_sdp.py +++ b/tests/test_sdp.py @@ -74,6 +74,9 @@ def get_sdp_function_names(): if func_name.endswith("sliding_dot_product"): out.append(func_name) + if sdp.PYFFTW_IS_AVAILABLE: # pragma: no cover + out.append("_pyfftw_sliding_dot_product") + return out @@ -152,3 +155,25 @@ def test_sdp_power2(): raise e return + + +def test_pyfftw_sdp_max_n(): + if not sdp.PYFFTW_IS_AVAILABLE: # pragma: no cover + pytest.skip("Skipping Test pyFFTW Not Installed") + + # When `len(T)` larger than `max_n` in pyfftw_sdp, + # the internal preallocated arrays should be resized. + # This test checks that functionality. + max_n = 2**10 + sdp_func = sdp._PYFFTW_SLIDING_DOT_PRODUCT(max_n) + + # len(T) > max_n to trigger array resizing + T = np.random.rand(max_n + 1) + Q = np.random.rand(2**8) + + comp = sdp_func(Q, T) + ref = naive.rolling_window_dot_product(Q, T) + + np.testing.assert_allclose(comp, ref) + + return