Source code for hypergas.plumeline

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2023 HyperGas developers
#
# This file is part of hypergas.
#
# hypergas is a library to retrieve trace gases from hyperspectral satellite data
"""Fitting the polynomial centerline for the CSF method."""

import numpy as np
import xarray as xr
from scipy.interpolate import interp1d
from shapely.geometry import LineString, Point
from pyproj import Transformer
import gc


[docs] def feature(x, order=3): """Generate polynomial feature matrix [1, x, x^2, ..., x^order]""" x = x.reshape(-1, 1) return np.power(x, np.arange(order+1).reshape(1, -1))
[docs] def fit_polynomial_centerline(x, y, weights, x_source, y_source, order=3, alpha=0.01): """ Fit a polynomial curve through plume data using principal direction. Parameters ---------- x, y : array-like Coordinates of points to fit weights : array-like Weights for each point (e.g., gas concentrations) x_source, y_source : float Source location order : int Polynomial order (3 = cubic) alpha : float Regularization parameter Returns ------- w : array Polynomial coefficients rotation_angle : float Rotation angle used x_offset, y_offset : float Translation offsets """ # Prepare data X = np.asarray(x).flatten() Y = np.asarray(y).flatten() w_vals = np.asarray(weights).flatten() # Normalize weights w_vals = w_vals / np.max(w_vals) # Find principal direction (source to weighted centroid) w_norm = w_vals / np.sum(w_vals) x_centroid = np.sum(X * w_norm) y_centroid = np.sum(Y * w_norm) # Principal direction vector dx_principal = x_centroid - x_source dy_principal = y_centroid - y_source rotation_angle = np.arctan2(dy_principal, dx_principal) # Rotate coordinates so principal direction aligns with x-axis cos_theta = np.cos(-rotation_angle) sin_theta = np.sin(-rotation_angle) # Translate to source as origin, then rotate x_translated = X - x_source y_translated = Y - y_source x_rotated = x_translated * cos_theta - y_translated * sin_theta y_rotated = x_translated * sin_theta + y_translated * cos_theta # Now fit polynomial in rotated space: y_rotated = f(x_rotated) W = np.diag(w_vals) A = feature(x_rotated, order) # Weighted ridge regression poly_coeffs = np.linalg.pinv( A.T.dot(W).dot(A) + alpha * np.eye(A.shape[1]) ).dot(A.T).dot(W).dot(y_rotated) return poly_coeffs, rotation_angle, x_source, y_source
[docs] def evaluate_polynomial_curve(w, rotation_angle, x_offset, y_offset, x_rotated): """ Evaluate polynomial curve and transform back to original coordinates. Parameters ---------- w : array Polynomial coefficients rotation_angle : float Rotation angle x_offset, y_offset : float Translation offsets (source location) x_rotated : array x coordinates in rotated space Returns ------- x_original, y_original : arrays Coordinates in original space """ # Evaluate polynomial in rotated space X_rot = feature(x_rotated.reshape(-1), len(w)-1) y_rotated = X_rot.dot(w) # Rotate back to original space cos_theta = np.cos(rotation_angle) sin_theta = np.sin(rotation_angle) x_original = x_rotated * cos_theta - y_rotated * sin_theta + x_offset y_original = x_rotated * sin_theta + y_rotated * cos_theta + y_offset return x_original, y_original
[docs] def get_tangent_vector(w, x): """ Calculate unit tangent vector at point x for polynomial curve in rotated space. Parameters ---------- w : array Polynomial coefficients [w0, w1, w2, ..., wd] x : float x-coordinate in rotated space Returns ------- tx, ty : float Unit tangent vector components in rotated space """ # dy/dx = w1 + 2*w2*x + 3*w3*x^2 + ... derivative = sum(i * w[i] * x**(i-1) for i in range(1, len(w))) # Tangent vector in rotated space: (1, dy/dx) tx = 1.0 ty = derivative # Normalize norm = np.sqrt(tx**2 + ty**2) return tx / norm, ty / norm
[docs] def compute_curve_arc_length_fine(w, x_start, x_end, n_samples=10000): """ Compute arc length of polynomial curve using fine numerical integration. Parameters ---------- w : array Polynomial coefficients x_start, x_end : float x-coordinate range in rotated space n_samples : int Number of samples for numerical integration Returns ------- cumulative_arc : array Cumulative arc length at each x sample x_samples : array x coordinates of samples """ x_samples = np.linspace(x_start, x_end, n_samples) # Compute derivative dy/dx at each point derivatives = np.array([sum(i * w[i] * x**(i-1) for i in range(1, len(w))) for x in x_samples]) # Arc length element: sqrt(1 + (dy/dx)^2) * dx dx = (x_end - x_start) / (n_samples - 1) arc_elements = np.sqrt(1 + derivatives**2) * dx # Cumulative arc length cumulative_arc = np.concatenate([[0], np.cumsum(arc_elements[:-1])]) return cumulative_arc, x_samples
[docs] def get_x_at_arc_length(cumulative_arc, x_samples, target_arc_length): """ Find x coordinate corresponding to a specific arc length along the curve. Parameters ---------- cumulative_arc : array Cumulative arc lengths x_samples : array Corresponding x coordinates target_arc_length : float Desired arc length from start Returns ------- x : float x coordinate at target arc length """ if target_arc_length >= cumulative_arc[-1]: return x_samples[-1] if target_arc_length <= 0: return x_samples[0] # Interpolate to find exact x return np.interp(target_arc_length, cumulative_arc, x_samples)
[docs] def calculate_perpendicular_extent(x_data, y_data, weights, w, rotation_angle, x_offset, y_offset, x_start, x_end, n_samples=100): """ Calculate the maximum perpendicular distance of plume points from the centerline. Parameters ---------- x_data, y_data : array-like Coordinates of plume points in the original space. weights : array-like Weights for each point. w : array-like Polynomial coefficients defining the plume centerline in rotated space. rotation_angle : float Rotation angle of the coordinate transformation. x_offset, y_offset : float Source location offsets. x_start, x_end : float Range of x-coordinates (in rotated space) for evaluating the centerline. n_samples : int, optional Number of sample points to generate along the centerline. Returns ------- max_perp_distance : float Maximum perpendicular distance from any sufficiently weighted point to the plume centerline. """ # Generate centerline points in original space x_rot_samples = np.linspace(x_start, x_end, n_samples) x_orig, y_orig = evaluate_polynomial_curve( w, rotation_angle, x_offset, y_offset, x_rot_samples ) # Construct LineString for centerline centerline = LineString(np.column_stack((x_orig, y_orig))) # Only consider points with weight > 10% of maximum weight_threshold = 0.1 * np.max(weights) valid_points = (weights > weight_threshold) # Compute perpendicular distances for valid points max_distance = max( Point(x, y).distance(centerline) for x, y in zip(x_data[valid_points], y_data[valid_points]) ) return max_distance
[docs] def create_perpendicular_lines_equal_arc(w, rotation_angle, x_offset, y_offset, x_start, x_end, n_lines, line_width): """ Create perpendicular lines at equal arc-length intervals in original coordinate system. Parameters ---------- w : array Polynomial coefficients rotation_angle : float Rotation angle x_offset, y_offset : float Source location x_start, x_end : float x-coordinate range in rotated space n_lines : int Number of perpendicular lines line_width : float Total width of perpendicular lines Returns ------- csf_lines : list of LineString Perpendicular lines in original coordinate system """ csf_lines = [] half_width = line_width / 2.0 # Compute cumulative arc length with fine sampling cumulative_arc, x_samples = compute_curve_arc_length_fine(w, x_start, x_end, n_samples=10000) total_arc_length = cumulative_arc[-1] # Generate positions at equal arc-length intervals if n_lines == 1: arc_lengths = np.array([total_arc_length / 2.0]) else: arc_lengths = np.linspace(0, total_arc_length, n_lines) # Rotation matrices cos_theta = np.cos(rotation_angle) sin_theta = np.sin(rotation_angle) for arc_len in arc_lengths: # Find x coordinate in rotated space at this arc length x_rot = get_x_at_arc_length(cumulative_arc, x_samples, arc_len) # Get tangent vector in rotated space tx_rot, ty_rot = get_tangent_vector(w, x_rot) # Perpendicular vector in rotated space (rotate tangent by 90°) px_rot = -ty_rot py_rot = tx_rot # Transform center point to original space X_rot = feature(np.array([x_rot]), len(w)-1) y_rot = float(X_rot.dot(w)) x_center = x_rot * cos_theta - y_rot * sin_theta + x_offset y_center = x_rot * sin_theta + y_rot * cos_theta + y_offset # Transform perpendicular vector to original space px_orig = px_rot * cos_theta - py_rot * sin_theta py_orig = px_rot * sin_theta + py_rot * cos_theta # Normalize (should already be normalized, but just to be safe) p_norm = np.sqrt(px_orig**2 + py_orig**2) px_orig /= p_norm py_orig /= p_norm # Create perpendicular line endpoints x1 = x_center - half_width * px_orig y1 = y_center - half_width * py_orig x2 = x_center + half_width * px_orig y2 = y_center + half_width * py_orig line = LineString([[x1, y1], [x2, y2]]) csf_lines.append(line) return csf_lines