Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/dfki-ric/uxo-dataset2024/llms.txt

Use this file to discover all available pages before exploring further.

Overview

The optical_flow module provides functions for calculating optical flow between consecutive frames in ARIS sonar video. It supports both dense flow (Farneback) and sparse flow (Lucas-Kanade) methods, outputting overall motion magnitude for motion onset detection and velocity analysis. Module Path: scripts/common/optical_flow.py

Functions

calc_overall_flow

def calc_overall_flow(flow: np.ndarray) -> float
Calculates the overall magnitude of motion from a flow field.
flow
np.ndarray
required
Flow field array. For dense flow, shape is (height, width, 2). For sparse flow, shape is (n_points, 2) representing displacement vectors.
magnitude
float
Overall flow magnitude computed as the L2 norm of the mean displacement vector

Implementation

The function computes:
  1. Mean displacement in x-direction: dx = mean(flow[..., 0])
  2. Mean displacement in y-direction: dy = mean(flow[..., 1])
  3. Overall magnitude: sqrt(dx² + dy²)

calc_optical_flow_farnerback

def calc_optical_flow_farnerback(
    frame_iterator: Iterator,
    flow_params: dict
) -> np.ndarray
Calculates dense optical flow using the Farneback method across a sequence of frames.
frame_iterator
Iterator
required
Iterator yielding grayscale frames as numpy arrays. Each frame should have shape (height, width) with dtype uint8.
flow_params
dict
required
Parameters passed to cv2.calcOpticalFlowFarneback(). Common parameters:
{
    'pyr_scale': 0.5,      # Pyramid scale (< 1)
    'levels': 3,            # Number of pyramid levels
    'winsize': 15,          # Averaging window size
    'iterations': 3,        # Iterations at each pyramid level
    'poly_n': 5,            # Neighborhood size for polynomial expansion
    'poly_sigma': 1.2,      # Gaussian std for polynomial expansion
    'flags': 0              # Operation flags
}
overall_flow
np.ndarray
1D array of flow magnitudes, one per frame transition. Length is n_frames - 1.

Method Details

The Farneback method computes dense optical flow by:
  • Approximating image neighborhoods with quadratic polynomials
  • Computing displacement based on polynomial expansion
  • Producing a flow vector for every pixel
Advantages:
  • Captures complete motion field
  • No feature detection required
  • Good for smooth, continuous motion
Disadvantages:
  • Computationally expensive
  • Slower than sparse methods
  • May struggle with noisy sonar imagery

Example

import cv2
import numpy as np
from scripts.common.optical_flow import calc_optical_flow_farnerback

# Load frames
frame_paths = sorted(glob.glob('/data/aris_frames/*.pgm'))
frame_iterator = (cv2.imread(f, cv2.IMREAD_GRAYSCALE) for f in frame_paths)

# Configure Farneback parameters
params = {
    'pyr_scale': 0.5,
    'levels': 3,
    'winsize': 15,
    'iterations': 3,
    'poly_n': 5,
    'poly_sigma': 1.2,
    'flags': 0
}

# Calculate flow
flow_magnitudes = calc_optical_flow_farnerback(frame_iterator, params)

print(f"Flow calculated for {len(flow_magnitudes)} frame transitions")
print(f"Peak flow: {np.max(flow_magnitudes):.2f} pixels")

calc_optical_flow_lk

def calc_optical_flow_lk(
    frame_iterator: Iterator,
    flow_params: dict,
    feature_params: dict | None = None
) -> np.ndarray
Calculates sparse optical flow using the Lucas-Kanade method with feature tracking.
frame_iterator
Iterator
required
Iterator or iterable yielding grayscale frames as numpy arrays. Each frame should have shape (height, width) with dtype uint8.
flow_params
dict
required
Parameters passed to cv2.calcOpticalFlowPyrLK(). Common parameters:
{
    'winSize': (15, 15),         # Search window size
    'maxLevel': 2,                # Max pyramid levels
    'criteria': (                 # Termination criteria
        cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT,
        10,    # max iterations
        0.03   # epsilon
    )
}
feature_params
dict | None
Parameters passed to cv2.goodFeaturesToTrack(). If None, defaults are used. Common parameters:
{
    'maxCorners': 100,           # Max number of corners to detect
    'qualityLevel': 0.3,          # Quality threshold (0-1)
    'minDistance': 7,             # Minimum distance between corners
    'blockSize': 7                # Size of averaging block
}
overall_flow
np.ndarray
1D array of flow magnitudes, one per frame. First element is always 0. Length is n_frames.

Method Details

The Lucas-Kanade method computes sparse optical flow by:
  1. Detecting “good features to track” (corners) using Shi-Tomasi algorithm
  2. Tracking those features to the next frame
  3. Computing displacement for successfully tracked features
  4. Re-detecting features every 10 frames to maintain feature count
Advantages:
  • Computationally efficient
  • Robust to image noise
  • Good for real-time applications
Disadvantages:
  • Only tracks discrete points
  • May fail if no good features detected
  • Requires periodic feature re-detection

Behavior

  • If no features can be detected in a frame, flow magnitude is 0 for that frame
  • If all features fail to track, flow magnitude is 0 and features are re-detected
  • Features are automatically re-detected every 10 frames to prevent drift
  • The iterator is automatically converted if it’s not already an iterator

Example

import cv2
import numpy as np
from scripts.common.optical_flow import calc_optical_flow_lk

# Load frames
frame_paths = sorted(glob.glob('/data/aris_frames/*.pgm'))
frames = [cv2.imread(f, cv2.IMREAD_GRAYSCALE) for f in frame_paths]

# Configure Lucas-Kanade parameters
flow_params = {
    'winSize': (15, 15),
    'maxLevel': 2,
    'criteria': (
        cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT,
        10,
        0.03
    )
}

feature_params = {
    'maxCorners': 100,
    'qualityLevel': 0.3,
    'minDistance': 7,
    'blockSize': 7
}

# Calculate flow
flow_magnitudes = calc_optical_flow_lk(frames, flow_params, feature_params)

print(f"Flow calculated for {len(flow_magnitudes)} frames")
print(f"Mean flow: {np.mean(flow_magnitudes):.2f} pixels")

Output Format

Both flow calculation functions return a 1D numpy array where each element represents the overall motion magnitude between consecutive frames.
array([0.00, 2.34, 5.67, 8.91, 12.45, ...])  # Motion magnitudes in pixels
Interpretation:
  • Low values (< 1.0): Little to no motion, likely static scene
  • Medium values (1.0 - 5.0): Moderate motion, gradual movement
  • High values (> 5.0): Significant motion, rapid movement or motion onset

Motion Onset Detection

Optical flow is commonly used to detect when motion begins in a recording:
import numpy as np
import matplotlib.pyplot as plt
from scripts.common.optical_flow import calc_optical_flow_lk

# Calculate flow
flow = calc_optical_flow_lk(frames, flow_params, feature_params)

# Smooth the signal
from scipy.ndimage import gaussian_filter1d
flow_smooth = gaussian_filter1d(flow, sigma=3)

# Find onset using threshold
threshold = np.mean(flow_smooth) + 2 * np.std(flow_smooth)
onset_candidates = np.where(flow_smooth > threshold)[0]
onset_frame = onset_candidates[0] if len(onset_candidates) > 0 else None

print(f"Motion onset detected at frame {onset_frame}")

# Visualize
plt.figure(figsize=(12, 4))
plt.plot(flow, alpha=0.5, label='Raw flow')
plt.plot(flow_smooth, label='Smoothed flow')
plt.axhline(threshold, color='r', linestyle='--', label='Threshold')
if onset_frame is not None:
    plt.axvline(onset_frame, color='g', linestyle='--', label='Onset')
plt.xlabel('Frame')
plt.ylabel('Flow Magnitude (pixels)')
plt.legend()
plt.title('Optical Flow Motion Analysis')
plt.show()

Algorithm Comparison

FeatureFarneback (Dense)Lucas-Kanade (Sparse)
ComputationExpensiveFast
CoverageEvery pixelSelected points only
Noise SensitivityModerateLow
Best ForSmooth surfaces, detailed analysisReal-time, feature-rich scenes
Output SizeLarge (H×W×2 per frame)Small (N points × 2)
ARIS SuitabilityGood for high-quality framesBetter for noisy frames

Performance Tips

Optical flow calculation can be computationally intensive for large datasets.
For Farneback:
  • Reduce levels if frames are small or speed is critical
  • Increase pyr_scale (closer to 1.0) for faster computation
  • Decrease winsize for noisy images
For Lucas-Kanade:
  • Reduce maxCorners in feature_params for faster processing
  • Increase minDistance to spread features more evenly
  • Adjust qualityLevel if too few/many features are detected

Integration with MatchingContext

from scripts.common.matching_context import MatchingContext
from scripts.common.optical_flow import calc_optical_flow_lk
import cv2

# Initialize context
ctx = MatchingContext(
    aris_dir='/data/recordings/rec_001_aris',
    gantry_file='/data/recordings/gantry/rec_001.csv',
    gopro_file=None
)

# Create frame iterator
def aris_frame_generator():
    for frame_path in ctx.aris_frames_raw:
        yield cv2.imread(frame_path, cv2.IMREAD_GRAYSCALE)

# Calculate flow
flow_params = {
    'winSize': (15, 15),
    'maxLevel': 2,
    'criteria': (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)
}

feature_params = {'maxCorners': 100, 'qualityLevel': 0.3, 'minDistance': 7, 'blockSize': 7}

flow = calc_optical_flow_lk(aris_frame_generator(), flow_params, feature_params)

# Find motion onset
onset_frame = np.argmax(flow > np.mean(flow) + 2 * np.std(flow))
print(f"Motion starts at frame {onset_frame} (ARIS time: {ctx.get_aris_frametime(onset_frame)}μs)")

Build docs developers (and LLMs) love