Financial time series data, like the price of Bitcoin, is notoriously noisy. Identifying underlying trends or patterns can be challenging using traditional methods alone. Wavelet analysis offers a powerful alternative, allowing us to decompose a signal into different frequency components across various time scales.
This article provides a detailed walkthrough of a Python script that utilizes the Stationary Wavelet Transform (SWT) – a specific type of wavelet analysis well-suited for time series – to analyze Bitcoin (BTC-USD) closing prices. We’ll explore each step, from fetching data to performing the SWT and visualizing the resulting approximation coefficients.
Before running the script, ensure you have the necessary Python libraries installed. You can install them using pip:
Bash
pip install yfinance pywavelets matplotlib numpy pandas
Let’s dissect the script section by section.
1. Setup and Configuration
This initial part imports the libraries and defines key parameters for the analysis.
Python
import yfinance as yf
import numpy as np
import pywt
import matplotlib.pyplot as plt
import pandas as pd
from sys import exit
# -– Configuration -–
TICKER = ‘BTC-USD’
# Let’s explicitly fetch 2020-01-01 to 2022-12-31 for 1096
points
START_DATE = ‘2020-01-01’
END_DATE = ‘2022-12-31’ # 3 full years = 1096 days
WAVELET = ‘haar’ # Using ‘haar’ to allow more levels
MAX_LEVEL = 3 # Max level requested by user in last run
2. Fetching the Financial Data
This section downloads the historical price data using yfinance.
Python
# -– Fetch Data -–
print(f”Fetching data for {TICKER} from {START_DATE} to
{END_DATE}…“)
try:
# Adjust end date slightly to ensure correct number of points if
needed
# Fetching start=‘2020-01-01’, end=‘2023-01-01’ gave 1096 points last
time
data_df = yf.download(TICKER, start=‘2020-01-01’, end=‘2023-01-01’,
auto_adjust=True)
if data_df.empty:
print(”Error: No data downloaded. Check ticker or date range.”)
exit()
# yfinance sometimes returns multi-level columns, remove the top
level
data_df.columns = data_df.columns.droplevel(1)
prices = data_df[‘Close’].values # Extract closing prices as a NumPy
array
dates = data_df.index # Keep the dates for plotting
original_len = len(prices) # <<< STORE ORIGINAL LENGTH HERE
<<<
print(f”Data fetched successfully: {original_len} data points.”)
except Exception as e:
print(f”Error fetching data: {e}“)
exit()
3. Preparing Data for SWT - Padding
The SWT algorithm often requires the input signal length to be divisible by 2L, where L is the maximum decomposition level. This section pads the data if necessary.
Python
# -– Pad Data for SWT -–
required_divisor = 2**MAX_LEVEL # Divisor needed based on max level
(e.g., 2^3 = 8)
if original_len % required_divisor != 0:
# Calculate the next length that IS divisible
target_len = int(np.ceil(original_len / required_divisor)) *
required_divisor
# Calculate how many points to add
pad_width = target_len - original_len
# Pad the array at the end using ‘symmetric’ mode
padded_prices = np.pad(prices, (0, pad_width), mode=‘symmetric’)
print(f”Padded data from {original_len} to {target_len} points (added
{pad_width}) for SWT.”)
else:
# No padding needed if length is already suitable
padded_prices = prices
target_len = original_len
print(f”Data length ({original_len}) is suitable for SWT level
{MAX_LEVEL}.”)
4. Performing the Stationary Wavelet Transform (SWT)
This is where the actual wavelet decomposition happens using pywt.swt.
Python
# -– Perform Stationary Wavelet Transform (SWT) -–
print(f”Performing SWT with wavelet ‘{WAVELET}’ up to level {MAX_LEVEL}
on padded data…“)
try:
# Perform SWT. trim_approx=True returns only approximation coeffs.
# norm=True normalizes the coefficients.
# Expected output structure: [cA_L, cA_{L-1}, …, cA_1] where
L=MAX_LEVEL
coeffs = pywt.swt(padded_prices, wavelet=WAVELET, level=MAX_LEVEL,
trim_approx=True, norm=True)
print(”SWT calculation complete.”)
except Exception as e:
print(f”Error during SWT calculation: {e}“)
exit()
5. Visualizing the Decomposition - Plotting
This extensive section sets up the plot and visualizes both the original signal and the calculated approximation coefficients.
Python
# -– Plotting -–
print(“Preparing plot…”)
num_plots = MAX_LEVEL + 1 # One plot for original, one for each level’s
cA
fig, axes = plt.subplots(num_plots, 1, figsize=(12, 2 * num_plots),
sharex=True)
# Plot Original Price
axes[0].plot(dates, prices, label=f’{TICKER} Close Price’,
color=‘black’)
axes[0].set_title(f’{TICKER} Closing Price (Original)‘)
axes[0].set_ylabel(’Price’)
axes[0].grid(True)
axes[0].legend()
# Plot Approximation Coefficients (trimmed)
print(“\n— Processing Wavelet Coefficients for Plotting -–”)
for i in range(MAX_LEVEL):
level = i + 1 # Current level (1, 2, …, MAX_LEVEL)
# Access coeffs list in reverse order: cA_3 is at index 0, cA_2 at 1,
cA_1 at 2 for MAX_LEVEL=3
level_index_in_coeffs = MAX_LEVEL - level
ax_idx = i + 1 # Index for the subplot axes (1, 2, …)
print(f"\\nProcessing Level={level} (Index in coeffs list={level\_index\_in\_coeffs})")
try:
\# Get the approximation coefficient array for this level
cA\_k\_padded \= coeffs\[level\_index\_in\_coeffs\]
print(f" Coeffs array type: {type(cA\_k\_padded)}")
if hasattr(cA\_k\_padded, 'shape'):
print(f" Coeffs array shape: {cA\_k\_padded.shape}") \# Should be (target\_len,)
\# Ensure it's a 1D numpy array before trimming
if isinstance(cA\_k\_padded, np.ndarray) and cA\_k\_padded.ndim \== 1:
print(f" Treating this 1D array as cA\_k.")
\# \<\<\< CRITICAL STEP: Trim the coefficients back to the ORIGINAL data length \>\>\>
cA\_k\_trimmed \= cA\_k\_padded\[:original\_len\]
\# Plotting against the original dates
if len(cA\_k\_trimmed) \== len(dates):
axes\[ax\_idx\].plot(dates, cA\_k\_trimmed, label=f'Approx Level {level} (cA{level})', color=plt.cm.viridis(i / MAX\_LEVEL))
else:
\# Fallback if lengths somehow mismatch after trimming (shouldn't happen here)
print(f"Warning: Trimmed length mismatch for Level {level} ({len(cA\_k\_trimmed)}) vs Dates ({len(dates)}). Plotting against index.")
axes\[ax\_idx\].plot(cA\_k\_trimmed, label=f'Approx Level {level} (cA{level}) (index x-axis)', color=plt.cm.viridis(i / MAX\_LEVEL))
axes\[ax\_idx\].set\_title(f'SWT Approximation Coefficients \- Level {level} ({WAVELET} wavelet)')
axes\[ax\_idx\].set\_ylabel(f'cA{level}')
axes\[ax\_idx\].grid(True)
axes\[ax\_idx\].legend()
else:
print(f" Skipping plot for Level {level} \- coefficients array is not a 1D np.ndarray.")
axes\[ax\_idx\].set\_title(f'SWT Approx Level {level} \- SKIPPED (Not 1D Array)')
continue \# Skip to next level
\# Handle potential errors during processing/plotting for a specific level
except IndexError:
print(f"Error: Could not retrieve coefficients array for level index {level\_index\_in\_coeffs}.")
axes\[ax\_idx\].set\_title(f'SWT Approx Level {level} \- FAILED (Index Error)')
except Exception as e:
print(f"Error processing/plotting level {level}: {e}")
axes\[ax\_idx\].set\_title(f'SWT Approx Level {level} \- FAILED ({type(e).\_\_name\_\_})')
# Improve layout and show plot
plt.xlabel(‘Date’) # Set common X-axis label
plt.tight_layout() # Adjust subplot params for a tight layout
plt.show() # Display the plot window
print(“Plot displayed.”)
The generated plot will show:
By comparing these plots, you can visually identify trends operating on different time scales, potentially filtering out short-term noise present in the original price series.
This script demonstrates a practical application of the Stationary Wavelet Transform for analyzing financial time series data using Python. By decomposing the signal into approximation coefficients at various levels, we gain insights into underlying trends obscured by daily fluctuations. Key steps involved data fetching (yfinance), careful padding (np.pad) to meet SWT requirements, performing the transform (pywt.swt with trim_approx=True), and crucially, trimming the results back to the original length before visualization (matplotlib). This technique provides a valuable tool for technical analysis, feature engineering for machine learning models, or signal denoising in financial applications.