Source code for hypergas.plume_utils

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2023 HyperGas developers
#
# This file is part of hypergas.
#
# hypergas is a library to retrieve trace gases from hyperspectral satellite data
"""Some utils used for creating plume mask and gas emission rates."""

import base64
import gc
import logging
import math
import os
import warnings

import geopandas as gpd
import numpy as np
import pyresample
import xarray as xr
from branca.element import MacroElement
from jinja2 import Template
from pyresample.geometry import SwathDefinition
from scipy import ndimage
from shapely.geometry import Point, Polygon

from hypergas.folium_map import Map
from hypergas.landmask import Land_mask

LOG = logging.getLogger(__name__)

warnings.filterwarnings('ignore')

# calculate IME (kg m-2)
mass = {'ch4': 16.04e-3, 'co2': 44.01e-3}  # molar mass [kg/mol]
mass_dry_air = 28.964e-3  # molas mass dry air [kg/mol]
grav = 9.8  # gravity (m s-2)


[docs] class CustomControl(MacroElement): """Put any HTML on the map as a Leaflet Control. Adopted from https://github.com/python-visualization/folium/pull/1662. """ _template = Template( """ {% macro script(this, kwargs) %} L.Control.CustomControl = L.Control.extend({ onAdd: function(map) { let div = L.DomUtil.create('div'); div.innerHTML = `{{ this.html }}`; return div; }, onRemove: function(map) { // Nothing to do here } }); L.control.customControl = function(opts) { return new L.Control.CustomControl(opts); } L.control.customControl( { position: "{{ this.position }}" } ).addTo({{ this._parent.get_name() }}); {% endmacro %} """ ) def __init__(self, html, position="bottomleft"): def escape_backticks(text): """Escape backticks so text can be used in a JS template.""" import re return re.sub(r"(?<!\\)`", r"\`", text) super().__init__() self.html = escape_backticks(html) self.position = position
[docs] def plot_wind(m, wdir, wspd, arrow_img='./imgs/arrow.png'): """Plot the wind arrow png by rotate the north arrow.""" with open(arrow_img, "rb") as f: image_data = f.read() image_base64 = base64.b64encode(image_data).decode("utf-8") html = f""" <img src="data:image/png;base64,{image_base64}" style="transform:rotate({wdir+180}deg);"> <h5 style="color:white;">{wspd:.1f} m/s</h5> """ widget = CustomControl(html, position='bottomright') widget.add_to(m)
[docs] def get_wind_azimuth(u, v): """ Calculate the wind azimuth angle based on the horizontal wind components. The function computes the azimuth (direction) of the wind vector from its u (east-west) and v (north-south) components. The azimuth is measured in radians and degrees, following meteorological convention. Parameters ---------- u : float Zonal wind component (positive toward the east). v : float Meridional wind component (positive toward the north). Returns ------- azim_rad : float Wind azimuth in radians, measured clockwise from the north. azim : float Wind azimuth in degrees, measured clockwise from the north. """ if (u > 0): azim_rad = (np.pi)/2. - np.arctan(v/u) elif (u == 0.): if (v > 0.): azim_rad = 0. elif (v == 0.): azim_rad = 0. # arbitrary elif (v < 0.): azim_rad = np.pi elif (u < 0.): azim_rad = 3*np.pi/2. + np.arctan(-v/u) azim = azim_rad*180./np.pi return azim_rad, azim
def _azimuth(point1, point2): '''azimuth between 2 points (interval 0 - 180) # https://stackoverflow.com/a/66118219/7347925 ''' angle = np.arctan2(point2[0] - point1[0], point2[1] - point1[1]) return np.degrees(angle) if angle > 0 else np.degrees(angle) + 180 def _dist(a, b): '''distance between points''' return math.hypot(b[0] - a[0], b[1] - a[1])
[docs] def azimuth_mrr(mrr): '''Azimuth of plume's `minimum_rotated_rectangle <https://geopandas.org/en/latest/docs/reference/api/geopandas.GeoSeries.minimum_rotated_rectangle.html>`_.''' bbox = list(mrr.exterior.coords) axis1 = _dist(bbox[0], bbox[3]) axis2 = _dist(bbox[0], bbox[1]) if axis1 <= axis2: az = _azimuth(bbox[0], bbox[1]) else: az = _azimuth(bbox[0], bbox[3]) return az
[docs] def get_index_nearest(lons, lats, lon_target, lat_target): """Get the pixel index closest to the target. Parameters ---------- lons : :class:`numpy.ndarray` 2D longitudes of pixels. lats : :class:`numpy.ndarray` 2D latitudes of pixels. lon_target : float The longitude of target. lat_target : float The latitude of target. Returns ------- y_nearest : int The nearest y index. x_nearest : int The nearest x index. """ # define the areas for data and source point area_source = SwathDefinition(lons=lons, lats=lats) area_target = SwathDefinition(lons=np.array([lon_target]), lats=np.array([lat_target])) # Determine nearest (w.r.t. great circle distance) neighbour in the grid. _, _, index_array, distance_array = pyresample.kd_tree.get_neighbour_info( source_geo_def=area_source, target_geo_def=area_target, radius_of_influence=50, neighbours=1) # get_neighbour_info() returns indices in the flattened lat/lon grid. Compute the 2D grid indices: y_target, x_target = np.unravel_index(index_array, area_source.shape) return y_target[0], x_target[0]
[docs] def target_inside_mask(ds, gas_mask_varname, y_target, x_target, lon_target, lat_target): """Move the target if it is not inside the mask. Parameters ---------- ds : :class:`~xarray.Dataset` The dataset includes the gas field and geolocations. gas_mask_varname : str The mask name of gas. y_target : int The y index of target. x_target : int The x index of target. lon_target : float The longitude of target. lat_target : float The latitude of target. Returns ------- y_target : int x_target : int If the input target index is not included in the mask, it will be updated using the mask pixel closest to the target. """ if ds[gas_mask_varname][y_target, x_target] == 0: LOG.info('Picking the nearest mask pixel because the target is in the background.') lon_mask = ds['longitude'].where(ds[gas_mask_varname] > 0).data.flatten() lat_mask = ds['latitude'].where(ds[gas_mask_varname] > 0).data.flatten() lon_mask = lon_mask[~np.isnan(lon_mask)] lat_mask = lat_mask[~np.isnan(lat_mask)] # Get the closest mask pixel location min_index = gpd.points_from_xy(lon_mask, lat_mask).distance(Point(lon_target, lat_target)).argmin() y_target, x_target = get_index_nearest(ds['longitude'], ds['latitude'], lon_mask[min_index], lat_mask[min_index]) return y_target, x_target
[docs] def plot_mask(filename, ds, gas, mask, lon_target, lat_target, pick_plume_name, only_plume=True): """Plot masked data and export to L3 HTML file. Parameters ---------- filename : str The L2 HTML file name. ds : :class:`~xarray.Dataset` The dataset includes the gas field and RGB data. gas : str The gas name to be plotted. mask : :class:`numpy.ndarray` The plume boolean mask. lon_target : float The longitude of target. lat_target : float The latitude of target. pick_plume_name : str Which plume to be plotted (e.g., "plume0", "plume1", ...). Returns ------- plume_html_filename : str The plume html filename. """ # read gas data da_gas = ds[gas] # get masked plume data if mask.all(): da_gas_mask = da_gas else: da_gas_mask = da_gas.where(xr.DataArray(mask, dims=list(da_gas.dims))) ds[f'{gas}_plume'] = da_gas_mask if only_plume: # only plot plume for quick check m = Map(ds, varnames=[f'{gas}_plume'], center_map=[lat_target, lon_target]) m.initialize() m.plot(show_layers=[True], opacities=[0.9], marker=[lat_target, lon_target], export_dir=os.path.dirname(filename), draw_polygon=False) else: # plot all important data m = Map(ds, varnames=['rgb', gas, f'{gas}_comb', f'{gas}_comb_denoise', f'{gas}_plume'], center_map=[lat_target, lon_target]) m.initialize() m.plot(show_layers=[False, False, False, False, True], opacities=[0.9, 0.8, 0.8, 0.8, 0.8], marker=[lat_target, lon_target], export_dir=os.path.dirname(filename), draw_polygon=False) # export to html file if 'plume' in os.path.basename(filename): if pick_plume_name == 'plume0': plume_html_filename = filename else: # rename the filenames if there are more than one plume in the file plume_html_filename = filename.replace('plume0', pick_plume_name) else: plume_html_filename = filename.replace('L2', 'L3').replace('.html', f'_{pick_plume_name}.html') m.export(plume_html_filename) return plume_html_filename
[docs] def select_connect_masks(masks, y_target, x_target, az_max=30, dist_max=180): """Select connected masks by dilation and limit the minimum rectangle angle difference Parameters ---------- masks : :class:`~xarray.DataArray` 2D a priori mask from L2 data. y_target : float yindex of source target. x_target : float xindex of source target. az_max : float maximum of azimuth of minimum rotated rectangle. (Default: 30). dist_max : float maximum of dilation distance (meter). Returns ------- Connected plume mask : :class:`~xarray.DataArray`. """ # get the source label of original mask mask_target = masks[y_target, x_target].item() # dilation mask struct = ndimage.generate_binary_structure(2, 2) dxy = abs(masks.coords['y'].diff('y')[0]) if dxy == 1: # the projection is not UTM but EPSG:4326 # we use the 2d lat and lon array to calculate the distance R = 6371e3 # meters lat_1 = masks.coords['latitude'][0, 0] lat_2 = masks.coords['latitude'][0, 1] lon_1 = masks.coords['longitude'][0, 0] lon_2 = masks.coords['longitude'][0, 1] phi_1 = lat_1 * np.pi / 180 phi_2 = lat_2 * np.pi / 180 delta_phi = (lat_2 - lat_1) * np.pi / 180 delta_lambda = (lon_2 - lon_1) * np.pi / 180 a = np.sin(delta_phi / 2) * np.sin(delta_phi / 2) + np.cos(phi_1) * \ np.cos(phi_2) * np.sin(delta_lambda / 2) * np.sin(delta_lambda / 2) c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a)) dxy = R * c # meters niter = int(dist_max/dxy) if niter > 0: masks_dilation = masks.copy(deep=True, data=ndimage.binary_dilation( masks.fillna(0), iterations=niter, structure=struct)) else: masks_dilation = masks.copy(deep=True, data=masks.fillna(0)) # Label connected components in the dilated array labeled_array, num_features = ndimage.label(masks_dilation) masks_dilation = masks.copy(deep=True, data=labeled_array).where(masks.notnull()) # get the dilation mask which contains mask including the target mask_dilation_target = masks_dilation[y_target, x_target].values mask_dilation_target = masks_dilation.where(masks_dilation == mask_dilation_target) # mask in the dilation mask masks_in_dilation = masks.where((masks > 0) & (mask_dilation_target > 0)) # unique mask labels within the dilation mask connect_labels = np.unique(masks_in_dilation.data.flatten()) # create mask polygons df_mask = masks.to_dataframe().reset_index() df_mask = df_mask[df_mask[masks.name] > 0] gdf_polygon = gpd.GeoDataFrame(geometry=df_mask.groupby(masks.name) .apply(lambda g: Polygon(gpd.points_from_xy(g['longitude'], g['latitude']))) ) # calculate mrr and azimuth angle gdf_polygon['mrrs'] = gdf_polygon.geometry.apply(lambda geom: geom.minimum_rotated_rectangle) gdf_polygon['az'] = gdf_polygon['mrrs'].apply(azimuth_mrr) # get the polygons inside the dilation mask which includes the target mask gdf_polygon_connect = gdf_polygon[gdf_polygon.index.isin(connect_labels)] if len(gdf_polygon_connect) > 1: # calculate polygon distance gdf_polygon_connect['distance'] = gdf_polygon_connect.geometry.apply( lambda g: gdf_polygon_connect[gdf_polygon_connect.index == mask_target]['geometry'].distance(g, align=False)) # sort masks by distance gdf_polygon_connect.sort_values('distance', inplace=True) # calcualte differences of az gdf_polygon_connect.loc[:, 'az_diff'] = gdf_polygon_connect['az'].diff().abs().fillna(0) index_name = gdf_polygon_connect.index.name gdf_polygon_connect = gdf_polygon_connect.reset_index() # Iterate through the DataFrame to drop rows where az_diff is higher than az_max index = 0 while index < len(gdf_polygon_connect) - 1: if (gdf_polygon_connect['az_diff'].iloc[index + 1] > az_max) and (gdf_polygon_connect['distance'].iloc[index+1] > 0): gdf_polygon_connect = gdf_polygon_connect.drop(index + 1) # drop the next row and recheck gdf_polygon_connect = gdf_polygon_connect.reset_index(drop=True) gdf_polygon_connect['az_diff'] = gdf_polygon_connect['az'].diff().abs().fillna(0) else: index += 1 # Set the index back to the original index values gdf_polygon_connect = gdf_polygon_connect.set_index(index_name) # get final mask mask = masks_in_dilation.isin(gdf_polygon_connect.index) return mask
[docs] def a_priori_mask_data(ds, gas, lon_target, lat_target, pick_plume_name, wind_source, only_plume=True, az_max=30, dist_max=180, filename=None ): '''Read a priori plume masks and connect them by conditions. Parameters ---------- ds : :class:`~xarray.Dataset` L2 dataset. gas : str The gas field to be masked. lon_target : float The longitude of plume source. lat_target : float The latitude of plume source. pick_plume_name : str The plume name (plume0, plume1, ....). wind_source : str "ERA5" or "GEOS-FP". az_max : float Maximum of azimuth of minimum rotated rectangle. (Default: 30). dist_max : float Maximum of dilation distance (meter). filename : str The L2 HTML filename. Returns ------- mask : :class:`~xarray.DataArray` The Boolean mask of pixels. lon_mask : :class:`~xarray.DataArray` Plume longitude. lat_mask : :class:`~xarray.DataArray` Plume latitude. lon_target : float Longitude of target. lat_target : float Latitude of target. plume_html_filename : str Exported plume html filename. ''' LOG.info('Selecting connected plume masks') # get the y/x index of the source location y_target, x_target = get_index_nearest(ds['longitude'], ds['latitude'], lon_target, lat_target) # check if target is inside the masks y_target, x_target = target_inside_mask(ds, f'{gas}_mask', y_target, x_target, lon_target, lat_target) # update target lon_target = ds['longitude'].isel(y=y_target, x=x_target).item() lat_target = ds['latitude'].isel(y=y_target, x=x_target).item() # select connected masks mask = select_connect_masks(ds[f'{gas}_mask'], y_target, x_target, az_max, dist_max) # get the masked lon and lat lon_mask = xr.DataArray(ds['longitude'], dims=['y', 'x']).where(mask).rename('longitude') lat_mask = xr.DataArray(ds['latitude'], dims=['y', 'x']).where(mask).rename('latitude') if filename is not None: # plot the mask (png and html) plume_html_filename = plot_mask(filename, ds, gas, mask, lon_target, lat_target, pick_plume_name, only_plume=only_plume) return mask, lon_mask, lat_mask, lon_target, lat_target, plume_html_filename else: return mask, lon_mask, lat_mask, lon_target, lat_target
[docs] def crop_to_valid_region(da, y_target, x_target, data_crop_length, pixel_res): """Crop a DataArray around a target location to ensure a square result, centered on the target, and adjusted to fit within bounds if necessary. Parameters ---------- da : :class:`~xarray.DataArray` The to be cropped DataArray. y_target : int The yindex of target. x_target : int The xindex of target. data_crop_length : float The crop radius (m). pixel_res : float The pixel resolution (m). Returns ------- The cropping index : ymin, ymax, xmin, xmax. """ crop_pixels = data_crop_length//pixel_res y_len = da.sizes['y'] x_len = da.sizes['x'] ymin = max(0, y_target - crop_pixels) xmin = max(0, x_target - crop_pixels) ymax = min(y_len, y_target + crop_pixels) xmax = min(x_len, x_target + crop_pixels) crop_y = min(ymax - y_target, y_target - ymin) crop_x = min(xmax - x_target, x_target - xmin) crop_pixels = min(crop_y, crop_x)//2 ymin = y_target - crop_pixels xmin = x_target - crop_pixels ymax = y_target + crop_pixels xmax = x_target + crop_pixels return ymin, ymax, xmin, xmax
[docs] def cm_mask_data(ds, gas, lon_target, lat_target, data_crop_length=2500, limit_crop_length=1000, limit_percentile=90): """Create plume mask using `Carbon Mapper's method <https://assets.carbonmapper.org/documents/L3_L4%20Algorithm%20Theoretical%20Basis%20Document_formatted_10-24-24.pdf>`_. Parameters ---------- data_crop_length : float The length (m) for cropping data. limit_crop_length: float The length (m) for calculating the plume enhancement threshold. limit_percentile : float The percentile (%) for the plume enhancement threshold. """ LOG.info('Creating the CM plume mask') # get the y/x index of the source location y_target, x_target = get_index_nearest(ds['longitude'], ds['latitude'], lon_target, lat_target) pixel_res = int(abs(ds.coords['y'].diff(dim='y').mean(dim='y'))) # Step 1: Crop the data to data_crop_length (m) around the origin ymin_crop, ymax_crop, xmin_crop, xmax_crop = crop_to_valid_region(ds[gas], y_target, x_target, data_crop_length, pixel_res) cropped_data = ds[gas].isel(y=slice(ymin_crop, ymax_crop), x=slice(xmin_crop, xmax_crop)) # Step 2: Set concentration threshold by <limit_percentile> percent (<limit_crop_length> m around) crop_origin_y, crop_origin_x = get_index_nearest(cropped_data['longitude'], cropped_data['latitude'], lon_target, lat_target, ) ymin, ymax, xmin, xmax = crop_to_valid_region(cropped_data, crop_origin_y, crop_origin_x, limit_crop_length, pixel_res) small_crop = cropped_data.isel(y=slice(ymin, ymax), x=slice(xmin, xmax)) threshold = np.nanpercentile(small_crop, limit_percentile).item() # Create binary mask mask = (cropped_data > threshold).astype(int) # Step 3: Group connected pixels labeled, num_features = ndimage.label(mask) # Remove small clusters (less than 5 pixels) for i in np.unique(labeled): if np.sum(labeled == i) < 5: labeled[labeled == i] = 0 # Step 4: Enforce proximity metric x_coords, y_coords = np.meshgrid(range(cropped_data.sizes['x']), range(cropped_data.sizes['y'])) distance = np.sqrt((y_coords-cropped_data.sizes['y']/2)**2 + (x_coords-cropped_data.sizes['x']/2)**2) # unit: pixel for i in np.unique(labeled): if np.min(distance[labeled == i]) > 15: labeled[labeled == i] = 0 # Step 5: Create final binary mask # 0: background, >0: labels final_mask = xr.full_like(ds[gas], 0) final_mask.isel(x=slice(xmin_crop, xmax_crop), y=slice(ymin_crop, ymax_crop))[:] = (labeled > 0).astype(int) # clean attrs final_mask = final_mask.rename('cm_mask') final_mask.attrs = '' final_mask.attrs['description'] = 'Carbon Mapper plume mask using v2 method' return final_mask, threshold