Source code for hypergas.ime_csf

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2023 HyperGas developers
#
# This file is part of hypergas.
#
# hypergas is a library to retrieve trace gases from hyperspectral satellite data
"""IME/CSF method of calculating gas emission rates"""

import os
import gc
import yaml
import logging

import geopandas as gpd
import numpy as np
import pyresample
import xarray as xr
from pyproj import Transformer
from pyresample.geometry import SwathDefinition
from scipy.spatial import ConvexHull
from scipy.stats.mstats import trimmed_std
from shapely.geometry import LineString, Point
from shapely.strtree import STRtree

from hypergas.plumeline import fit_polynomial_centerline, calculate_perpendicular_extent, compute_curve_arc_length_fine, get_x_at_arc_length, evaluate_polynomial_curve, create_perpendicular_lines_equal_arc
from hypergas.landmask import Land_mask

# set the logger level
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(name)s - %(levelname)s: %(message)s',
                    datefmt='%Y/%m/%d %H:%M:%S')
LOG = logging.getLogger(__name__)


# constants
mass = {'ch4': 16.04e-3, 'co2': 44.01e-3, 'co': 28.01e-3}  # molar mass [kg/mol]
mass_dry_air = 28.964e-3  # molas mass dry air [kg/mol]
grav = 9.8  # gravity (m s-2)
molar_volume = 22.4  # L/mol at STP


# load sensor and calibration settings
_dirname = os.path.dirname(__file__)
with open(os.path.join(_dirname, 'config.yaml')) as f:
    settings = yaml.safe_load(f)
    sensor_info = settings['sensor']
    ime_calibration_info = settings['ime_calibration']
    csf_calibration_info = settings['csf_calibration']


[docs] class IME_CSF(): def __init__(self, sensor, longitude_source, latitude_source, plume_nc_filename, plume_name, ipcc_sector, gas='ch4', wind_source='ERA5', wspd_manual=None, sp_manual=None, land_only=True, land_mask_source='OSM'): """Initialize IME_CSF. Parameters ---------- sensor : str satellite sensor name. longitude_source : float longitude of plume source. latitude_source : float latitude of plume source. plume_nc_filename : str plume NetCDF file generated by :meth:`hypergas.emiss.Emiss.export_plume_nc`. plume_name : str the plume name (e.g., "plume0"). ipcc_sector : str The IPCC sector name: "Electricity Generation (1A1)", "Coal Mining (1B1a)", "Oil & Gas (1B2)", "Livestock (4B)", "Solid Waste (6A)", "Other". gas : str the trace gas name. wind_source : str wind source name ("ERA5" or "GEOS-FP"). wspd_manual : float Default: ``None`` (using the wspd from wind_source data), units: m/s. sp_manual : float Default: ``None`` (using the surface pressure from wind_source data), units: Pa. land_only : bool whether only considering land pixels. land_mask_source : str the data source of land mask ("OSM", "GSHHS" or "Natural Earth"), Default: "OSM". """ self.gas = gas self.longitude_source = longitude_source self.latitude_source = latitude_source self.plume_nc_filename = plume_nc_filename self.plume_name = plume_name self.wspd_manual = wspd_manual self.sp_manual = sp_manual self.land_only = land_only self.land_mask_source = land_mask_source self.sensor_info = sensor_info[sensor] self.ds = xr.open_dataset(self.plume_nc_filename, decode_coords='all') self.unit = self.ds[self.gas].attrs['units'] # get the UTM pixel resolution self.pixel_res = abs(self.ds.y.diff('y').mean().item()) if self.wspd_manual: # set the wind source to 'OBS' if the wspd_manual is not None self.wind_source = 'OBS' else: self.wind_source = wind_source # get the crs if self.ds.rio.crs: self.crs = self.ds.rio.crs else: self.crs = None # get the masked plume data self.gas_mask = self.ds.dropna(dim='y', how='all').dropna(dim='x', how='all')[self.gas] if gas == 'ch4': if ipcc_sector == 'Solid Waste (6A)': # area source self.alpha = ime_calibration_info[gas]['area-source'][sensor] self.beta = csf_calibration_info[gas]['area-source'][sensor] self.alpha_replace = ime_calibration_info[gas]['point-source'][sensor] self.beta_replace = csf_calibration_info[gas]['point-source'][sensor] # set residual as zero as we will use the point-source calibration self.alpha_replace['resid'] = 0 self.beta_replace['resid'] = 0 else: # point source self.alpha = ime_calibration_info[gas]['point-source'][sensor] self.beta = csf_calibration_info[gas]['point-source'][sensor] # use residual as uncertainty self.alpha_replace = self.alpha self.beta_replace = self.beta elif gas == 'co2': self.alpha = ime_calibration_info[gas]['point-source'][sensor] self.beta = csf_calibration_info[gas]['point-source'][sensor] self.alpha_replace = self.alpha self.beta_replace = self.beta else: raise ValueError( f'{gas} is not supported by IME/CSF yet. Please update config.yaml and paras here to include it.')
[docs] def calc_emiss(self): """Calculate emission rate (kg/h) using all available methods: :meth:`ime`, :meth:`ime_fetch`, :meth:`ime_cm`, and :meth:`csf`, """ surface_pressure, wind_speed, wdir, wind_speed_all, wdir_all, wind_source_all, l_ime, l_eff, u_eff, IME, Q, Q_err, \ err_random, err_wind, err_calib = self.ime() Q_fetch, Q_fetch_err, err_ime_fetch, err_wind_fetch = self.ime_fetch() ds_csf, n_csf, l_csf, u_eff_csf, Q_csf, Q_csf_err, err_random_csf, err_wind_csf, err_calib_csf = self.csf() IME_cm, l_cm, Q_cm = self.ime_cm() return surface_pressure, wind_speed, wdir, wind_speed_all, wdir_all, wind_source_all, l_ime, l_eff, u_eff, IME, Q, Q_err, \ err_random, err_wind, err_calib, Q_fetch, Q_fetch_err, err_ime_fetch, err_wind_fetch, \ IME_cm, l_cm, Q_cm, ds_csf, n_csf, l_csf, u_eff_csf, Q_csf, Q_csf_err, err_random_csf, err_wind_csf, err_calib_csf
def _create_circular_mask(self, h, w, center=None, radius=None): """Create a circular mask. Parameters ---------- h : int Height of data. w : int Width of data. center : tuple (w_coord, h_coord). Default: ``None`` for the center of data. radius : float The radius of the mask. Default: ``None`` for the smallest distance between the center and image walls. Returns ------- mask : Bool array """ if center is None: # use the middle of the image center = (int(w/2), int(h/2)) if radius is None: # use the smallest distance between the center and image walls radius = min(center[0], center[1], w-center[0], h-center[1]) Y, X = np.ogrid[:h, :w] dist_from_center = np.sqrt((X - center[0])**2 + (Y-center[1])**2) mask = dist_from_center <= radius return mask def _hull_length(self, mask, origin_y, origin_x): """Create ConvexHull and calculate the hull length (units: m) Parameters ---------- mask : DataArray Plume mask with y and x coords (units: m). origin_y : float Source location (units: m) in ydim. origin_x : float Source location (units: m) in xdim. """ points = np.array([mask.x, mask.y]).T hull = ConvexHull(points) hull_points = points[hull.vertices] distances = np.linalg.norm( hull_points - np.array([origin_x, origin_y]), axis=1) # max_loc = hull_points[np.argmax(distances)] L = np.max(distances) return L def _ime_sum(self, gas_mask): """Calculate the total gas mass (kg) in plume mask""" if self.unit == 'ppb': delta_omega = gas_mask.sum().values * 1.0e-9 * (mass[self.gas] / mass_dry_air) * self.sp / grav elif self.unit == 'ppm': delta_omega = gas_mask.sum().values * 1.0e-6 * (mass[self.gas] / mass_dry_air) * self.sp / grav elif self.unit == 'ppm m': delta_omega = gas_mask.sum().values * (1/1e6) * (1000) * (1/molar_volume) * mass[self.gas] else: raise ValueError(f"Unit '{self.unit}' is not supported yet. Please add it here.") return delta_omega * self.area def _csf_sum(self, gas_mask): """Cailculate the total gas mass (kg) along one CSF line""" if self.unit == 'ppb': delta_omega = gas_mask.sum().values * 1.0e-9 * (mass[self.gas] / mass_dry_air) * self.sp / grav elif self.unit == 'ppm': delta_omega = gas_mask.sum().values * 1.0e-6 * (mass[self.gas] / mass_dry_air) * self.sp / grav elif self.unit == 'ppm m': delta_omega = gas_mask.sum().values * (1/1e6) * (1000) * (1/molar_volume) * mass[self.gas] else: raise ValueError(f"Unit '{self.unit}' is not supported yet. Please add it here.") return delta_omega * self.pixel_res def _get_index_nearest(self, lons, lats, lon_target, lat_target): # define the areas for data and source point area_source = SwathDefinition(lons=lons, lats=lats) area_target = SwathDefinition(lons=np.array([lon_target]), lats=np.array([lat_target])) # Determine nearest (w.r.t. great circle distance) neighbour in the grid. _, _, index_array, distance_array = pyresample.kd_tree.get_neighbour_info( source_geo_def=area_source, target_geo_def=area_target, radius_of_influence=50, neighbours=1) # get_neighbour_info() returns indices in the flattened lat/lon grid. Compute the 2D grid indices: y_target, x_target = np.unravel_index(index_array, area_source.shape) return y_target[0], x_target[0] def _calc_wind_error(self, IME, l_eff): """Calculate wind error with random distribution""" # Generate U10 distribution # uncertainty = 50%, if wspd <= 3 m/s # uncertainty = 1.5 m/s, if wspd > 3 m/s if self.wspd <= 3: sigma = self.wspd * 0.5 else: sigma = 1.5 wspd_distribution = np.random.normal(self.wspd, sigma, size=1000) # Calculate Ueff distribution u_eff_distribution = self.alpha['alpha1'] * np.log(wspd_distribution) + \ self.alpha['alpha2'] + self.alpha['alpha3'] * wspd_distribution # Calculate Q distribution Q_distribution = u_eff_distribution * IME / l_eff # Calculate standard deviation of Q distribution wind_error = np.nanstd(Q_distribution) return wind_error def _calc_wind_error_fetch(self, ime_l_mean): """Calculate wind error with random distribution for IME-fetch""" if self.wspd <= 3: sigma = self.wspd * 0.5 else: sigma = 1.5 wspd_distribution = np.random.normal(self.wspd, sigma, size=1000) # Calculate Q distribution Q_distribution = ime_l_mean * wspd_distribution # Calculate standard deviation of Q distribution wind_error = np.std(Q_distribution) return wind_error def _calc_wind_error_csf(self, C_lines): """Calculate wind error with random distribution""" # Generate U10 distribution # uncertainty = 50%, if wspd <= 3 m/s # uncertainty = 1.5 m/s, if wspd > 3 m/s if self.wspd <= 3: sigma = self.wspd * 0.5 else: sigma = 1.5 wspd_distribution = np.random.normal(self.wspd, sigma, size=1000) # Calculate Ueff distribution u_eff_distribution = self.beta['beta1'] * wspd_distribution + self.beta['beta2'] # Calculate Q distribution Q_distribution = np.nanmean(u_eff_distribution[:, None] * C_lines, axis=1) # Calculate standard deviation of Q distribution wind_error = np.nanstd(Q_distribution) return wind_error def _calc_random_err(self, da_gas, gas_mask, max_attempts=1000, samples=100): """Calculate random error by moving plume around the whole scene""" # crop gas field to valid region gas_mask_crop = gas_mask.where(~gas_mask.isnull()).dropna(dim='y', how='all').dropna(dim='x', how='all') # get the shape of input data and mask bkgd_rows, bkgd_cols = gas_mask.shape mask_rows, mask_cols = gas_mask_crop.shape # Precompute the number of valid pixels n_valid_pixel = gas_mask.notnull().sum() # Insert plume mask data at a random position IME_noplume = [] self.ch4_bkgds = [] LOG.info('Moving plume around the scene for valid background masks') attempts = 0 while len(IME_noplume) < samples: attempts += 1 # Generate random row and column index to place b inside a y_move_pixel = bkgd_rows - mask_rows x_move_pixel = bkgd_cols - mask_cols y_shift = np.random.randint(-y_move_pixel, y_move_pixel) x_shift = np.random.randint(-x_move_pixel, x_move_pixel) ch4_shift = gas_mask.shift(y=y_shift, x=x_shift) ch4_shift_notnull = ch4_shift.notnull().values # check if 1) the plume shape is not outside of scene 2) plume pixels are not included no_outside = ch4_shift.notnull().sum() == n_valid_pixel no_plume = gas_mask.where(ch4_shift_notnull).isnull().all() if no_outside and no_plume: # 3) all valid value # sometimes the plume is long and it is difficult to get a valid background mask of same length # we can relax the constraints # but it is important to set fill values for nan pixels and set them to nan again for CSF if da_gas.notnull().where(ch4_shift_notnull, False).sum() > n_valid_pixel*0.9: ch4_bkgd = da_gas.where(ch4_shift_notnull) ch4_bkgd_fillna = da_gas.fillna(-999).where(ch4_shift_notnull) IME_noplume.append(self._ime_sum(ch4_bkgd)) self.ch4_bkgds.append(ch4_bkgd_fillna) del ch4_bkgd # Stop if we've reached `max_samples` if attempts >= max_attempts: break LOG.info(f'Finished moving plume with {len(IME_noplume)} samples after {attempts} attempts.)') std_value = trimmed_std(np.array(IME_noplume), (1e-3, 1e-3)) del IME_noplume gc.collect() return std_value def _calc_random_err_csf(self, u_eff, match_lists): """Calculate random error by moving plume around the whole scene""" C_noplume = [] for ch4_bkgd in self.ch4_bkgds: # reset coords to save memory for stacking ch4_bkgd_valid = ch4_bkgd.reset_coords(drop=True).stack(z=("x", "y")).dropna(dim='z') ch4_bkgd_valid = ch4_bkgd_valid.where(ch4_bkgd_valid != -999) C_lines = [] for match_list in match_lists: gas_match = ch4_bkgd_valid.isel(z=match_list) C = self._csf_sum(gas_match) C_lines.append(C) C_noplume.append(np.array(C_lines).mean()) std_value = trimmed_std(np.array(C_noplume), (1e-3, 1e-3)) del C_noplume gc.collect() return std_value def _calc_calibration_error(self, IME, u_eff, l_eff): """Calculate wind calibration error by replacing alphas""" # Calculate Ueff u_eff_replace = self.alpha_replace['alpha1'] * \ np.log(self.wspd) + self.alpha_replace['alpha2'] + \ self.alpha_replace['alpha3'] * self.wspd + self.alpha_replace['resid'] # Calculate uncertainty error = abs(u_eff_replace - u_eff) * IME / l_eff return error def _calc_calibration_error_csf(self, C_lines, u_eff): """Calculate wind calibration error by replacing betas""" # Calculate Ueff u_eff_replace = self.beta_replace['beta1'] * self.wspd + self.beta_replace['beta2'] + self.beta_replace['resid'] if u_eff_replace == u_eff: # In case betas are same for area and point sources u_eff_replace = self.beta_replace['beta1'] * self.wspd + self.beta_replace['beta2'] + self.beta['resid'] # Calculate uncertainty error = abs(u_eff_replace - u_eff) * np.nanmean(C_lines) return error def _def_csf_lines(self, npixel_interval): """ Define CSF lines for quantification using polynomial-based centerline. Parameters ---------- npixel_interval: float Interval of CSF lines (unit: n*pixel_size). Returns ------- center_curve : ndarray Centerline coordinates in original space csf_lines : list of LineString CSF perpendicular lines in original space """ ds = self.ds # Only moving around over land pixels if self.land_only: ds = ds.where(self.landmask) # Get 1D array of plume data self.gas_valid = ds[self.gas].stack(z=('x', 'y')).dropna(dim='z') # Get plume source location in UTM projection transformer = Transformer.from_crs("EPSG:4326", ds.rio.crs, always_xy=True) x_source, y_source = transformer.transform( ds.attrs['plume_longitude'], ds.attrs['plume_latitude'] ) # Prepare data for polynomial fitting x_data = self.gas_valid.x.values y_data = self.gas_valid.y.values # Use gas values as weights to emphasize high-concentration areas weights = self.gas_valid.values.copy() weights[weights < 0] = 0 # Remove negative values if any weights = weights / np.nanmax(weights) # Normalize # Fit polynomial centerline poly_order = 3 alpha = 0.1 w, rotation_angle, x_offset, y_offset = fit_polynomial_centerline( x_data, y_data, weights, x_source, y_source, order=poly_order, alpha=alpha ) # Rotate data to polynomial coordinate system for analysis cos_theta = np.cos(-rotation_angle) sin_theta = np.sin(-rotation_angle) x_translated = x_data - x_source y_translated = y_data - y_source x_rotated = x_translated * cos_theta - y_translated * sin_theta y_rotated = x_translated * sin_theta + y_translated * cos_theta # Determine plume direction in rotated space x_min_rot, x_max_rot = x_rotated.min(), x_rotated.max() # Check which direction the plume extends from source (origin in rotated space) x_left = np.sum((x_rotated < 0) & (weights > 0.1)) x_right = np.sum((x_rotated > 0) & (weights > 0.1)) if x_right >= x_left: # Plume extends to the right x_start = 0 x_end = x_max_rot else: # Plume extends to the left x_start = x_min_rot x_end = 0 # Calculate perpendicular extent perp_extent = calculate_perpendicular_extent( x_data, y_data, weights, w, rotation_angle, x_offset, y_offset, x_start, x_end ) # Line width: 1.5 * perpendicular extent on each side line_width = 2 * 1.5 * perp_extent # Calculate number of lines based on arc length cumulative_arc, x_samples = compute_curve_arc_length_fine(w, x_start, x_end, n_samples=10000) total_arc_length = cumulative_arc[-1] interval = self.pixel_res * npixel_interval n_points = max(2, int(np.round(total_arc_length / interval)) + 1) # Generate positions at equal arc-length intervals for verification if n_points == 1: arc_positions = np.array([total_arc_length / 2.0]) else: arc_positions = np.linspace(0, total_arc_length, n_points) # Get x coordinates in rotated space for each arc position x_rot_centerline = np.array([get_x_at_arc_length(cumulative_arc, x_samples, arc_len) for arc_len in arc_positions]) # Transform to original space x_orig, y_orig = evaluate_polynomial_curve(w, rotation_angle, x_offset, y_offset, x_rot_centerline) center_curve = np.column_stack([x_orig, y_orig]) if center_curve.shape[0] > 1: # Create CSF perpendicular lines in original space csf_lines = create_perpendicular_lines_equal_arc( w, rotation_angle, x_offset, y_offset, x_start, x_end, n_lines=n_points, line_width=line_width ) else: print('The plume is too short to create CSF lines.') center_curve = None csf_lines = None # Cleanup ds.close() del ds gc.collect() return center_curve, csf_lines def _emiss_csf_lines(self, csf_lines): """Calculate gas emission rate at each CSF line""" # create gdf of center points of plume pixel gdf = gpd.GeoDataFrame({'x': self.gas_valid.coords['x'], 'y': self.gas_valid.coords['y']}) gdf['center'] = gdf.apply(lambda x: Point(x['x'], x['y']), axis=1) # create square around the pixel center gdf['polygon'] = gdf['center'].buffer(self.pixel_res/2, cap_style=3) # Query the spatial index for potential intersections # https://stackoverflow.com/a/43105613/7347925 spatial_index = STRtree(gdf['polygon']) def check_intersections(spatial_index, polygon, line): potential_matches = spatial_index.query(line) # Check if there is any intersection with the potential matches match_list = potential_matches[[line.intersects(poly) for poly in polygon.iloc[potential_matches]]] return match_list # calculate emission rate using intersected pixels Q_lines = [] C_lines = [] match_lists = [] u_eff = self.beta['beta1'] * self.wspd + self.beta['beta2'] for line in csf_lines: match_list = check_intersections(spatial_index, gdf['polygon'], line) if len(match_list) > 0: gas_match = self.gas_valid.isel(z=match_list) C = self._csf_sum(gas_match) C_lines.append(C) Q_lines.append(C * u_eff) match_lists.append(match_list) else: Q_lines.append(np.nan) return np.array(C_lines), u_eff, np.array(Q_lines), match_lists def _csf_dataset(self, center_curve, csf_lines, Q_lines): """Combine CSF info into one xarray Dataset""" x_center = center_curve[:, 0] y_center = center_curve[:, 1] x_start = [line.xy[0][0] for line in csf_lines] x_end = [line.xy[0][1] for line in csf_lines] y_start = [line.xy[1][0] for line in csf_lines] y_end = [line.xy[1][1] for line in csf_lines] ds_csf = xr.Dataset( data_vars=dict(emission_rate=(['csf_line'], Q_lines*3600), x_start=(['csf_line'], x_start), x_end=(['csf_line'], x_end), y_start=(['csf_line'], y_start), y_end=(['csf_line'], y_end), x_center=('ceter_line', x_center), y_center=('ceter_line', y_center), ), ) ds_csf['emission_rate'].attrs['description'] = 'emission rate at each csf line' ds_csf['emission_rate'].attrs['units'] = 'kg h-1' ds_csf['x_start'].attrs['description'] = 'start coord (x) of csf line' ds_csf['x_start'].attrs['units'] = 'm' ds_csf['x_end'].attrs['description'] = 'end coord (x) of csf line' ds_csf['x_end'].attrs['units'] = 'm' ds_csf['y_start'].attrs['description'] = 'start coord (y) of csf line' ds_csf['y_start'].attrs['units'] = 'm' ds_csf['y_end'].attrs['description'] = 'end coord (y) of csf line' ds_csf['y_end'].attrs['units'] = 'm' ds_csf['x_center'].attrs['description'] = 'x coord of center line' ds_csf['x_center'].attrs['units'] = 'm' ds_csf['y_center'].attrs['description'] = 'y coord of center line' ds_csf['y_center'].attrs['units'] = 'm' if self.crs is not None: ds_csf.rio.write_crs(self.crs, inplace=True) return ds_csf
[docs] def csf(self, npixel_interval=2.5): """Calculate the emission rate (kg/h) using the `Cross-Sectional Flux (CSF) method <https://doi.org/10.5194/amt-11-5673-2018>`_. Parameters ---------- npixel_interval : float interval of CSF lines (unit: n*pixel_res). Returns ------- ds_csf : :class:`~xarray.Dataset` The xarray dataset saves the CSF line coords and emission rates for each line. n_csf : int The number of CSF lines. l_csf : float The length (unit: meter) of the curved CSF line. u_eff : float The effective wind speed (m/s). Q : float The emission rate (kg/h). Q_err : float The total estimate error (kg/h). err_random : float The uncertainty (kg/h) caused by the retrieval random error. err_wind : float The uncertainty (kg/h) caused by the wind speed error. err_calib : float The uncertainty (kg/h) caused by the wind calibration error. """ LOG.info('Calculating CSF') # set the plume centerline and CSF lines center_curve, csf_lines = self._def_csf_lines(npixel_interval) if csf_lines is not None: l_csf = LineString(center_curve).length # calculate the emission rate at each CSF line C_lines, u_eff, Q_lines, match_lists = self._emiss_csf_lines(csf_lines) ds_csf = self._csf_dataset(center_curve, csf_lines, Q_lines) n_csf = len(C_lines) # number of csf lines # calculate the mean emission rate Q = np.nanmean(Q_lines) # ---- uncertainty ---- # 1. random LOG.info('Calculating random error') C_std = self._calc_random_err_csf(u_eff, match_lists) err_random = u_eff * C_std # 2. wind error LOG.info('Calculating wind error') if self.wspd_manual is None: err_wind = self._calc_wind_error_csf(C_lines) else: err_wind = 0 # 3. calibration error LOG.info('Calculating calibration error') err_calib = self._calc_calibration_error_csf(C_lines, u_eff) # sum error Q_err = np.sqrt(err_random**2 + err_wind**2 + err_calib**2) return ds_csf, n_csf, l_csf, u_eff, Q*3600, Q_err*3600, \ err_random*3600, err_wind*3600, err_calib*3600 # kg/h else: return None, None, None, None, None, None, None, None, None
[docs] def ime(self): """Calculate the emission rate (kg/h) using the `integrated mass enhancement (IME) method <https://doi.org/10.5194/amt-11-5673-2018>`_. Returns ------- sp : float The mean surface pressure (Pa) in the plume mask. wspd : float The mean wind speed (m/s) in the plume mask. wdir : float The mean wind direction (deg) in the plume mask. wspd_all : list The list of available wind speed (m/s) data. wdir_all : list The list of available wind direction (deg) data. l_ime : float The geometric plume length (m). l_eff : float The effctive plume length (m). u_eff : float The effective wind speed (m/s). IME : float The total gas mass enhancement (kg). Q : float The emission rate (kg/h). Q_err : float The total estimate error (kg/h). err_random : float The uncertainty (kg/h) caused by the retrieval random error. err_wind : float The uncertainty (kg/h) caused by the wind speed error. err_calib : float The uncertainty (kg/h) caused by the wind calibration error. """ # read file and pick valid data file_original = self.plume_nc_filename.replace('L3', 'L2').replace(f'_{self.plume_name}.nc', '.nc') ds_original = xr.open_dataset(file_original) self.ds_original = ds_original ds = self.ds # area of pixel in m2 self.area = self.pixel_res*self.pixel_res # calculate Leff using the root method in meter plume_pixel_num = (~self.gas_mask.isnull()).sum() l_eff = np.sqrt(plume_pixel_num * self.area).item() # create stacked plume mask array with y/x dim in "npixel" units y_target, x_target = self._get_index_nearest( ds['longitude'], ds['latitude'], self.longitude_source, self.latitude_source) mask_stacked_ime = ds[self.gas].assign_coords({'y': np.arange(ds.sizes['y']), 'x': np.arange(ds.sizes['x']), } ).stack(z=['y', 'x']).dropna(dim='z') # calculate the convexhull plume length (m) l_ime = self._hull_length(mask_stacked_ime, y_target, x_target) * self.pixel_res # calculate IME (kg) LOG.info('Calculating IME') if self.sp_manual is None: self.sp = ds['sp'].mean().item() # use the mean surface pressure (Pa) else: # overwrite the surface pressure self.sp = self.sp_manual IME = self._ime_sum(self.gas_mask) # get wind info LOG.info('Calculating wind info') if self.wspd_manual is None: u10 = ds['u10'].sel(source=self.wind_source).item() v10 = ds['v10'].sel(source=self.wind_source).item() else: # read the first wspd data by default if wspd is set manually, this is only used to calculate wdir u10 = ds['u10'].isel(source=0).item() v10 = ds['v10'].isel(source=0).item() if self.wspd_manual is None: self.wspd = np.sqrt(u10**2 + v10**2) else: # overwrite the windspeed self.wspd = self.wspd_manual wdir = (270-np.rad2deg(np.arctan2(v10, u10))) % 360 # check all wind products wind_source_all = list(ds['source'].to_numpy()) wspd_all = np.sqrt(ds['u10']**2+ds['v10']**2) wdir_all = (270-np.rad2deg(np.arctan2(ds['v10'], ds['u10']))) % 360 wspd_all = list(wspd_all.to_numpy()) wdir_all = list(wdir_all.to_numpy()) # effective wind speed u_eff = self.alpha['alpha1'] * np.log(self.wspd) + self.alpha['alpha2'] + self.alpha['alpha3'] * self.wspd # calculate the emission rate (kg/s) Q = (u_eff / l_eff * IME) # ---- uncertainty ---- # 1. random LOG.info('Calculating random error') if self.land_only: # get lon and lat lon = ds_original['longitude'] lat = ds_original['latitude'] self.landmask = Land_mask(lon.data, lat.data, source=self.land_mask_source) IME_std = self._calc_random_err(ds_original[self.gas].where(self.landmask), ds[self.gas].where(self.landmask)) else: IME_std = self._calc_random_err(ds_original[self.gas], ds[self.gas]) err_random = u_eff / l_eff * IME_std # 2. wind error LOG.info('Calculating wind error') if self.wspd_manual is None: err_wind = self._calc_wind_error(IME, l_eff) else: err_wind = 0 # 3. calibration error LOG.info('Calculating calibration error') err_calib = self._calc_calibration_error(IME, u_eff, l_eff) # sum error Q_err = np.sqrt(err_random**2 + err_wind**2 + err_calib**2) ds.close() ds_original.close() del ds, ds_original gc.collect() return self.sp, self.wspd, wdir, wspd_all, wdir_all, wind_source_all, l_ime, l_eff, u_eff, IME, Q*3600, Q_err*3600, \ err_random*3600, err_wind*3600, err_calib*3600 # kg/h
[docs] def ime_cm(self): """Calculate the emission rate (kg/h) using `Carbon Mapper's method <https://assets.carbonmapper.org/documents/L3_L4%20Algorithm%20Theoretical%20Basis%20Document_formatted_10-24-24.pdf>`_. Returns ------- IME : float The total gas mass enhancement (kg). L : float The length (m) of plume hull. Q : float The emission rate (kg/h). """ # create mask centered on source point y_target, x_target = self._get_index_nearest( self.ds['longitude'], self.ds['latitude'], self.longitude_source, self.latitude_source) # create convex hull of masks mask_stacked = self.ds[f'{self.gas}_cm'].stack(z=['y', 'x']).dropna(dim='z') if mask_stacked.isnull().sizes['z'] == 0: # no plume available return np.nan, np.nan, np.nan else: # calculate the length of hull L = self._hull_length(mask_stacked, self.ds_original.coords['y'].isel(y=y_target), self.ds_original.coords['x'].isel(x=x_target), ) IME = self._ime_sum(self.ds[f'{self.gas}_cm']) Q = (IME * self.wspd / L).item() return IME, L, Q*3600
[docs] def ime_fetch(self): """Calculate the emission rate (kg/h) using the `IME-fetch method <https://www.nature.com/articles/s41586-019-1720-3>`_. Returns ------- Q : float The emission rate (kg/h). Q_err : float The total estimate error (kg/h). err_ime : float The uncertainty (kg/h) caused by the IME error. err_wind : float The uncertainty (kg/h) caused by the wind speed error. """ # create mask centered on source point LOG.info('Calculating the index of source loc') y_target, x_target = self._get_index_nearest( self.gas_mask['longitude'], self.gas_mask['latitude'], self.longitude_source, self.latitude_source) mask = np.zeros(self.gas_mask.shape) mask[y_target, x_target] = 1 # calculate plume height, width, and diagonal h = mask.shape[0] w = mask.shape[1] # r_max = np.sqrt(h**2+w**2) r_max = int(2.5e3/self.pixel_res) # limit to 2.5 km # calculate IME by increasing mask radius LOG.info(f'Calculating IME at different radii with r_max={r_max} pixels') ime = [] r = 0 r_step = 1/4 # increase by 1/4 pixel size each step while r < r_max: r += r_step LOG.debug(f'IME {r} loop') mask = self._create_circular_mask(h, w, center=(x_target, y_target), radius=r) # Calculate IME (kg) inside the radius mask gas_mask = self.gas_mask.where(mask) IME = self._ime_sum(gas_mask) ime.append(IME) # no new plume pixels anymore if mask.all(): LOG.info(f'Masking iteration stops at r = {r} pixel length') break # calculate emission rate L = np.arange(0+r_step, r_max, r_step).mean()*self.pixel_res ime_l_mean = np.mean(ime)/np.mean(L) ime_l_std = np.std(ime/L) Q = (ime_l_mean * self.wspd).item() # ---- uncertainty ---- # 1. ime LOG.info('Calculating IME error') err_ime = Q * ime_l_std / ime_l_mean # 2. wind error LOG.info('Calculating wind error') if self.wspd_manual is None: err_wind = self._calc_wind_error_fetch(ime_l_mean) else: err_wind = 0 # sum error Q_err = np.sqrt(err_ime**2 + err_wind**2) return Q*3600, Q_err*3600, err_ime*3600, err_wind*3600 # kg/h