AtomicCorrPlan
Compute time-lagged correlation functions for atomic positions.
Constructor
from warp_md import AtomicCorrPlan
plan = AtomicCorrPlan(
selection,
max_lag=1000,
normalize=True
)
Atoms to compute correlations for
Maximum time lag in frames
Normalize correlation to range [0, 1]
run() Method
Returns: 2D array of shape (max_lag, n_atoms) containing autocorrelation functions
Example
import warp_md as wmd
import numpy as np
import matplotlib.pyplot as plt
system = wmd.System("protein.pdb")
traj = wmd.open_trajectory("md.xtc", system)
# Compute position autocorrelation for C-alpha atoms
ca_atoms = system.select("name CA")
plan = wmd.AtomicCorrPlan(ca_atoms, max_lag=500)
corr = plan.run(traj, system)
# Plot average correlation
avg_corr = corr.mean(axis=1)
plt.figure(figsize=(8, 5))
plt.plot(avg_corr)
plt.xlabel("Lag (frames)")
plt.ylabel("Position Autocorrelation")
plt.title("Average Atomic Correlation Decay")
plt.grid(alpha=0.3)
plt.show()
# Find decorrelation time (where correlation drops to 1/e)
import numpy as np
decorr_idx = np.where(avg_corr < np.exp(-1))[0]
if len(decorr_idx) > 0:
decorr_time = decorr_idx[0]
print(f"Decorrelation time: {decorr_time} frames")
Position autocorrelation reveals the time scale of atomic motions. Fast decay indicates flexible regions, slow decay indicates rigid regions.
VelocityAutoCorrPlan
Compute velocity autocorrelation function (VACF).
Constructor
plan = VelocityAutoCorrPlan(
selection,
max_lag=1000,
normalize=True
)
run() Method
Returns: 1D array of VACF values
Example
# Velocity autocorrelation for diffusion analysis
water_o = system.select("resname WAT and name O")
plan = wmd.VelocityAutoCorrPlan(water_o, max_lag=200)
vacf = plan.run(traj, system)
# Plot VACF
import matplotlib.pyplot as plt
plt.plot(vacf)
plt.xlabel("Lag (frames)")
plt.ylabel("VACF")
plt.axhline(0, color='k', linestyle='--', alpha=0.3)
plt.title("Velocity Autocorrelation Function")
plt.show()
# Compute diffusion coefficient via Green-Kubo relation
# D = (1/3) * integral of VACF
import numpy as np
dt = 0.002 # Time step in ps
integral = np.trapz(vacf, dx=dt)
diffusion_coeff = integral / 3.0
print(f"Diffusion coefficient: {diffusion_coeff:.2e} cm²/s")
VACF is used to compute transport properties:
- Diffusion coefficient (Green-Kubo)
- Viscosity (via stress tensor autocorrelation)
- Thermal conductivity
XcorrPlan
Compute cross-correlation between two time series or selections.
Constructor
plan = XcorrPlan(
selection_a,
selection_b,
max_lag=500,
normalize=True
)
Normalize cross-correlation
Example
# Analyze correlated motion between two domains
domain1 = system.select("resid 1:50 and name CA")
domain2 = system.select("resid 100:150 and name CA")
plan = wmd.XcorrPlan(domain1, domain2, max_lag=200)
xcorr = plan.run(traj, system)
import matplotlib.pyplot as plt
lags = np.arange(-200, 201)
plt.plot(lags, xcorr)
plt.xlabel("Lag (frames)")
plt.ylabel("Cross-correlation")
plt.axhline(0, color='k', linestyle='--', alpha=0.3)
plt.axvline(0, color='r', linestyle='--', alpha=0.3)
plt.title("Domain-Domain Cross-Correlation")
plt.show()
# Find lag of maximum correlation
max_idx = np.argmax(np.abs(xcorr))
max_lag = lags[max_idx]
max_corr = xcorr[max_idx]
print(f"Maximum correlation: {max_corr:.3f} at lag {max_lag} frames")
if max_lag > 0:
print(f"Domain 2 lags behind domain 1 by {max_lag} frames")
elif max_lag < 0:
print(f"Domain 1 lags behind domain 2 by {-max_lag} frames")
else:
print("Domains move synchronously")
Python Helper Functions
from warp_md.analysis import (
timecorr,
velocity_autocorrelation,
xcorr,
acorr
)
# Time correlation of any observable
observable = rg_values # Some 1D time series
corr = timecorr(observable, max_lag=1000)
# Velocity autocorrelation
vacf = velocity_autocorrelation(
traj,
system,
mask="resname WAT and name O",
max_lag=500
)
# Cross-correlation
xcorr_data = xcorr(
observable1,
observable2,
max_lag=500
)
# Autocorrelation (convenience wrapper)
acorr_data = acorr(observable, max_lag=1000)
Advanced: Dynamic Cross-Correlation Matrix
import warp_md as wmd
import numpy as np
import matplotlib.pyplot as plt
system = wmd.System("protein.pdb")
traj = wmd.open_trajectory("md.xtc", system)
# Select C-alpha atoms
ca_atoms = system.select("name CA")
n_residues = len(ca_atoms.indices)
# Compute pairwise cross-correlations
corr_matrix = np.zeros((n_residues, n_residues))
for i in range(n_residues):
for j in range(i, n_residues):
# Create single-atom selections
sel_i = system.select_indices([ca_atoms.indices[i]])
sel_j = system.select_indices([ca_atoms.indices[j]])
# Compute cross-correlation
plan = wmd.XcorrPlan(sel_i, sel_j, max_lag=0)
xcorr = plan.run(traj, system)
# Zero-lag correlation
corr_matrix[i, j] = xcorr[0]
corr_matrix[j, i] = xcorr[0]
# Plot correlation matrix
plt.figure(figsize=(10, 8))
im = plt.imshow(corr_matrix, cmap='RdBu_r', vmin=-1, vmax=1)
plt.colorbar(im, label='Correlation')
plt.xlabel("Residue Index")
plt.ylabel("Residue Index")
plt.title("Dynamic Cross-Correlation Matrix")
plt.tight_layout()
plt.show()
# Identify highly correlated pairs
threshold = 0.7
high_corr = np.where((corr_matrix > threshold) &
(corr_matrix < 0.999)) # Exclude diagonal
print(f"\nHighly correlated residue pairs (r > {threshold}):")
for i, j in zip(*high_corr):
if i < j: # Print each pair once
print(f" Residue {i+1} - Residue {j+1}: r = {corr_matrix[i,j]:.3f}")
Correlation Decay Fitting
import numpy as np
from scipy.optimize import curve_fit
import matplotlib.pyplot as plt
def exponential_decay(t, A, tau):
"""Exponential decay function."""
return A * np.exp(-t / tau)
def stretched_exponential(t, A, tau, beta):
"""Stretched exponential (Kohlrausch-Williams-Watts)."""
return A * np.exp(-(t / tau)**beta)
# Compute correlation
ca_atoms = system.select("name CA")
plan = wmd.AtomicCorrPlan(ca_atoms, max_lag=1000)
corr = plan.run(traj, system)
avg_corr = corr.mean(axis=1)
# Time axis
dt = 0.01 # ps per frame
time = np.arange(len(avg_corr)) * dt
# Fit exponential decay
popt_exp, _ = curve_fit(exponential_decay, time, avg_corr, p0=[1.0, 10.0])
A_exp, tau_exp = popt_exp
# Fit stretched exponential
popt_str, _ = curve_fit(stretched_exponential, time, avg_corr,
p0=[1.0, 10.0, 0.8])
A_str, tau_str, beta = popt_str
# Plot
plt.figure(figsize=(10, 5))
plt.plot(time, avg_corr, 'o', alpha=0.5, label='Data')
plt.plot(time, exponential_decay(time, *popt_exp),
label=f'Exp: τ={tau_exp:.2f} ps')
plt.plot(time, stretched_exponential(time, *popt_str),
label=f'Stretched: τ={tau_str:.2f} ps, β={beta:.2f}')
plt.xlabel("Time (ps)")
plt.ylabel("Correlation")
plt.legend()
plt.title("Correlation Decay Fitting")
plt.grid(alpha=0.3)
plt.show()
print(f"Exponential decay: τ = {tau_exp:.2f} ps")
print(f"Stretched exponential: τ = {tau_str:.2f} ps, β = {beta:.2f}")
if beta < 0.9:
print("Non-exponential decay suggests heterogeneous dynamics")
WaveletPlan
Perform wavelet transform for time-frequency analysis of trajectories.
Constructor:
from warp_md import WaveletPlan
plan = WaveletPlan(
selection, # Selection: atoms for analysis
wavelet="morlet", # str: wavelet type
scales=None, # array: scale parameters
device="auto" # str: execution device
)
Atoms for wavelet analysis
Wavelet type (e.g., “morlet”, “mexican_hat”)
Array of scale parameters for the transform
Returns: ndarray — Wavelet coefficients
Example:
from warp_md import System, Trajectory, WaveletPlan
import numpy as np
system = System.from_pdb("protein.pdb")
traj = Trajectory.open_xtc("traj.xtc", system)
ca_atoms = system.select("name CA")
scales = np.arange(1, 128)
plan = WaveletPlan(ca_atoms, wavelet="morlet", scales=scales)
coefficients = plan.run(traj, system)
# Analyze frequency content over time
print(f"Coefficient shape: {coefficients.shape}")