#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2023 HyperGas developers
#
# This file is part of hypergas.
#
# hypergas is a library to retrieve trace gases from hyperspectral satellite data
"""IME/CSF method of calculating gas emission rates"""
import os
import gc
import yaml
import logging
import geopandas as gpd
import numpy as np
import pyresample
import xarray as xr
from pyproj import Transformer
from pyresample.geometry import SwathDefinition
from scipy.spatial import ConvexHull
from scipy.stats.mstats import trimmed_std
from shapely.geometry import LineString, Point
from shapely.strtree import STRtree
from hypergas.plumeline import fit_polynomial_centerline, calculate_perpendicular_extent, compute_curve_arc_length_fine, get_x_at_arc_length, evaluate_polynomial_curve, create_perpendicular_lines_equal_arc
from hypergas.landmask import Land_mask
# set the logger level
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s: %(message)s',
datefmt='%Y/%m/%d %H:%M:%S')
LOG = logging.getLogger(__name__)
# constants
mass = {'ch4': 16.04e-3, 'co2': 44.01e-3, 'co': 28.01e-3} # molar mass [kg/mol]
mass_dry_air = 28.964e-3 # molas mass dry air [kg/mol]
grav = 9.8 # gravity (m s-2)
molar_volume = 22.4 # L/mol at STP
# load sensor and calibration settings
_dirname = os.path.dirname(__file__)
with open(os.path.join(_dirname, 'config.yaml')) as f:
settings = yaml.safe_load(f)
sensor_info = settings['sensor']
ime_calibration_info = settings['ime_calibration']
csf_calibration_info = settings['csf_calibration']
[docs]
class IME_CSF():
def __init__(self, sensor,
longitude_source, latitude_source,
plume_nc_filename, plume_name, ipcc_sector,
gas='ch4', wind_source='ERA5',
wspd_manual=None, sp_manual=None,
land_only=True, land_mask_source='OSM'):
"""Initialize IME_CSF.
Parameters
----------
sensor : str
satellite sensor name.
longitude_source : float
longitude of plume source.
latitude_source : float
latitude of plume source.
plume_nc_filename : str
plume NetCDF file generated by :meth:`hypergas.emiss.Emiss.export_plume_nc`.
plume_name : str
the plume name (e.g., "plume0").
ipcc_sector : str
The IPCC sector name:
"Electricity Generation (1A1)", "Coal Mining (1B1a)",
"Oil & Gas (1B2)", "Livestock (4B)", "Solid Waste (6A)", "Other".
gas : str
the trace gas name.
wind_source : str
wind source name ("ERA5" or "GEOS-FP").
wspd_manual : float
Default: ``None`` (using the wspd from wind_source data), units: m/s.
sp_manual : float
Default: ``None`` (using the surface pressure from wind_source data), units: Pa.
land_only : bool
whether only considering land pixels.
land_mask_source : str
the data source of land mask ("OSM", "GSHHS" or "Natural Earth"), Default: "OSM".
"""
self.gas = gas
self.longitude_source = longitude_source
self.latitude_source = latitude_source
self.plume_nc_filename = plume_nc_filename
self.plume_name = plume_name
self.wspd_manual = wspd_manual
self.sp_manual = sp_manual
self.land_only = land_only
self.land_mask_source = land_mask_source
self.sensor_info = sensor_info[sensor]
self.ds = xr.open_dataset(self.plume_nc_filename, decode_coords='all')
self.unit = self.ds[self.gas].attrs['units']
# get the UTM pixel resolution
self.pixel_res = abs(self.ds.y.diff('y').mean().item())
if self.wspd_manual:
# set the wind source to 'OBS' if the wspd_manual is not None
self.wind_source = 'OBS'
else:
self.wind_source = wind_source
# get the crs
if self.ds.rio.crs:
self.crs = self.ds.rio.crs
else:
self.crs = None
# get the masked plume data
self.gas_mask = self.ds.dropna(dim='y', how='all').dropna(dim='x', how='all')[self.gas]
if gas == 'ch4':
if ipcc_sector == 'Solid Waste (6A)':
# area source
self.alpha = ime_calibration_info[gas]['area-source'][sensor]
self.beta = csf_calibration_info[gas]['area-source'][sensor]
self.alpha_replace = ime_calibration_info[gas]['point-source'][sensor]
self.beta_replace = csf_calibration_info[gas]['point-source'][sensor]
# set residual as zero as we will use the point-source calibration
self.alpha_replace['resid'] = 0
self.beta_replace['resid'] = 0
else:
# point source
self.alpha = ime_calibration_info[gas]['point-source'][sensor]
self.beta = csf_calibration_info[gas]['point-source'][sensor]
# use residual as uncertainty
self.alpha_replace = self.alpha
self.beta_replace = self.beta
elif gas == 'co2':
self.alpha = ime_calibration_info[gas]['point-source'][sensor]
self.beta = csf_calibration_info[gas]['point-source'][sensor]
self.alpha_replace = self.alpha
self.beta_replace = self.beta
else:
raise ValueError(
f'{gas} is not supported by IME/CSF yet. Please update config.yaml and paras here to include it.')
[docs]
def calc_emiss(self):
"""Calculate emission rate (kg/h) using all available methods:
:meth:`ime`,
:meth:`ime_fetch`,
:meth:`ime_cm`,
and :meth:`csf`,
"""
surface_pressure, wind_speed, wdir, wind_speed_all, wdir_all, wind_source_all, l_ime, l_eff, u_eff, IME, Q, Q_err, \
err_random, err_wind, err_calib = self.ime()
Q_fetch, Q_fetch_err, err_ime_fetch, err_wind_fetch = self.ime_fetch()
ds_csf, n_csf, l_csf, u_eff_csf, Q_csf, Q_csf_err, err_random_csf, err_wind_csf, err_calib_csf = self.csf()
IME_cm, l_cm, Q_cm = self.ime_cm()
return surface_pressure, wind_speed, wdir, wind_speed_all, wdir_all, wind_source_all, l_ime, l_eff, u_eff, IME, Q, Q_err, \
err_random, err_wind, err_calib, Q_fetch, Q_fetch_err, err_ime_fetch, err_wind_fetch, \
IME_cm, l_cm, Q_cm, ds_csf, n_csf, l_csf, u_eff_csf, Q_csf, Q_csf_err, err_random_csf, err_wind_csf, err_calib_csf
def _create_circular_mask(self, h, w, center=None, radius=None):
"""Create a circular mask.
Parameters
----------
h : int
Height of data.
w : int
Width of data.
center : tuple
(w_coord, h_coord). Default: ``None`` for the center of data.
radius : float
The radius of the mask. Default: ``None`` for the smallest distance between the center and image walls.
Returns
-------
mask : Bool array
"""
if center is None:
# use the middle of the image
center = (int(w/2), int(h/2))
if radius is None:
# use the smallest distance between the center and image walls
radius = min(center[0], center[1], w-center[0], h-center[1])
Y, X = np.ogrid[:h, :w]
dist_from_center = np.sqrt((X - center[0])**2 + (Y-center[1])**2)
mask = dist_from_center <= radius
return mask
def _hull_length(self, mask, origin_y, origin_x):
"""Create ConvexHull and calculate the hull length (units: m)
Parameters
----------
mask : DataArray
Plume mask with y and x coords (units: m).
origin_y : float
Source location (units: m) in ydim.
origin_x : float
Source location (units: m) in xdim.
"""
points = np.array([mask.x, mask.y]).T
hull = ConvexHull(points)
hull_points = points[hull.vertices]
distances = np.linalg.norm(
hull_points - np.array([origin_x, origin_y]), axis=1)
# max_loc = hull_points[np.argmax(distances)]
L = np.max(distances)
return L
def _ime_sum(self, gas_mask):
"""Calculate the total gas mass (kg) in plume mask"""
if self.unit == 'ppb':
delta_omega = gas_mask.sum().values * 1.0e-9 * (mass[self.gas] / mass_dry_air) * self.sp / grav
elif self.unit == 'ppm':
delta_omega = gas_mask.sum().values * 1.0e-6 * (mass[self.gas] / mass_dry_air) * self.sp / grav
elif self.unit == 'ppm m':
delta_omega = gas_mask.sum().values * (1/1e6) * (1000) * (1/molar_volume) * mass[self.gas]
else:
raise ValueError(f"Unit '{self.unit}' is not supported yet. Please add it here.")
return delta_omega * self.area
def _csf_sum(self, gas_mask):
"""Cailculate the total gas mass (kg) along one CSF line"""
if self.unit == 'ppb':
delta_omega = gas_mask.sum().values * 1.0e-9 * (mass[self.gas] / mass_dry_air) * self.sp / grav
elif self.unit == 'ppm':
delta_omega = gas_mask.sum().values * 1.0e-6 * (mass[self.gas] / mass_dry_air) * self.sp / grav
elif self.unit == 'ppm m':
delta_omega = gas_mask.sum().values * (1/1e6) * (1000) * (1/molar_volume) * mass[self.gas]
else:
raise ValueError(f"Unit '{self.unit}' is not supported yet. Please add it here.")
return delta_omega * self.pixel_res
def _get_index_nearest(self, lons, lats, lon_target, lat_target):
# define the areas for data and source point
area_source = SwathDefinition(lons=lons, lats=lats)
area_target = SwathDefinition(lons=np.array([lon_target]), lats=np.array([lat_target]))
# Determine nearest (w.r.t. great circle distance) neighbour in the grid.
_, _, index_array, distance_array = pyresample.kd_tree.get_neighbour_info(
source_geo_def=area_source, target_geo_def=area_target, radius_of_influence=50,
neighbours=1)
# get_neighbour_info() returns indices in the flattened lat/lon grid. Compute the 2D grid indices:
y_target, x_target = np.unravel_index(index_array, area_source.shape)
return y_target[0], x_target[0]
def _calc_wind_error(self, IME, l_eff):
"""Calculate wind error with random distribution"""
# Generate U10 distribution
# uncertainty = 50%, if wspd <= 3 m/s
# uncertainty = 1.5 m/s, if wspd > 3 m/s
if self.wspd <= 3:
sigma = self.wspd * 0.5
else:
sigma = 1.5
wspd_distribution = np.random.normal(self.wspd, sigma, size=1000)
# Calculate Ueff distribution
u_eff_distribution = self.alpha['alpha1'] * np.log(wspd_distribution) + \
self.alpha['alpha2'] + self.alpha['alpha3'] * wspd_distribution
# Calculate Q distribution
Q_distribution = u_eff_distribution * IME / l_eff
# Calculate standard deviation of Q distribution
wind_error = np.nanstd(Q_distribution)
return wind_error
def _calc_wind_error_fetch(self, ime_l_mean):
"""Calculate wind error with random distribution for IME-fetch"""
if self.wspd <= 3:
sigma = self.wspd * 0.5
else:
sigma = 1.5
wspd_distribution = np.random.normal(self.wspd, sigma, size=1000)
# Calculate Q distribution
Q_distribution = ime_l_mean * wspd_distribution
# Calculate standard deviation of Q distribution
wind_error = np.std(Q_distribution)
return wind_error
def _calc_wind_error_csf(self, C_lines):
"""Calculate wind error with random distribution"""
# Generate U10 distribution
# uncertainty = 50%, if wspd <= 3 m/s
# uncertainty = 1.5 m/s, if wspd > 3 m/s
if self.wspd <= 3:
sigma = self.wspd * 0.5
else:
sigma = 1.5
wspd_distribution = np.random.normal(self.wspd, sigma, size=1000)
# Calculate Ueff distribution
u_eff_distribution = self.beta['beta1'] * wspd_distribution + self.beta['beta2']
# Calculate Q distribution
Q_distribution = np.nanmean(u_eff_distribution[:, None] * C_lines, axis=1)
# Calculate standard deviation of Q distribution
wind_error = np.nanstd(Q_distribution)
return wind_error
def _calc_random_err(self, da_gas, gas_mask, max_attempts=1000, samples=100):
"""Calculate random error by moving plume around the whole scene"""
# crop gas field to valid region
gas_mask_crop = gas_mask.where(~gas_mask.isnull()).dropna(dim='y', how='all').dropna(dim='x', how='all')
# get the shape of input data and mask
bkgd_rows, bkgd_cols = gas_mask.shape
mask_rows, mask_cols = gas_mask_crop.shape
# Precompute the number of valid pixels
n_valid_pixel = gas_mask.notnull().sum()
# Insert plume mask data at a random position
IME_noplume = []
self.ch4_bkgds = []
LOG.info('Moving plume around the scene for valid background masks')
attempts = 0
while len(IME_noplume) < samples:
attempts += 1
# Generate random row and column index to place b inside a
y_move_pixel = bkgd_rows - mask_rows
x_move_pixel = bkgd_cols - mask_cols
y_shift = np.random.randint(-y_move_pixel, y_move_pixel)
x_shift = np.random.randint(-x_move_pixel, x_move_pixel)
ch4_shift = gas_mask.shift(y=y_shift, x=x_shift)
ch4_shift_notnull = ch4_shift.notnull().values
# check if 1) the plume shape is not outside of scene 2) plume pixels are not included
no_outside = ch4_shift.notnull().sum() == n_valid_pixel
no_plume = gas_mask.where(ch4_shift_notnull).isnull().all()
if no_outside and no_plume:
# 3) all valid value
# sometimes the plume is long and it is difficult to get a valid background mask of same length
# we can relax the constraints
# but it is important to set fill values for nan pixels and set them to nan again for CSF
if da_gas.notnull().where(ch4_shift_notnull, False).sum() > n_valid_pixel*0.9:
ch4_bkgd = da_gas.where(ch4_shift_notnull)
ch4_bkgd_fillna = da_gas.fillna(-999).where(ch4_shift_notnull)
IME_noplume.append(self._ime_sum(ch4_bkgd))
self.ch4_bkgds.append(ch4_bkgd_fillna)
del ch4_bkgd
# Stop if we've reached `max_samples`
if attempts >= max_attempts:
break
LOG.info(f'Finished moving plume with {len(IME_noplume)} samples after {attempts} attempts.)')
std_value = trimmed_std(np.array(IME_noplume), (1e-3, 1e-3))
del IME_noplume
gc.collect()
return std_value
def _calc_random_err_csf(self, u_eff, match_lists):
"""Calculate random error by moving plume around the whole scene"""
C_noplume = []
for ch4_bkgd in self.ch4_bkgds:
# reset coords to save memory for stacking
ch4_bkgd_valid = ch4_bkgd.reset_coords(drop=True).stack(z=("x", "y")).dropna(dim='z')
ch4_bkgd_valid = ch4_bkgd_valid.where(ch4_bkgd_valid != -999)
C_lines = []
for match_list in match_lists:
gas_match = ch4_bkgd_valid.isel(z=match_list)
C = self._csf_sum(gas_match)
C_lines.append(C)
C_noplume.append(np.array(C_lines).mean())
std_value = trimmed_std(np.array(C_noplume), (1e-3, 1e-3))
del C_noplume
gc.collect()
return std_value
def _calc_calibration_error(self, IME, u_eff, l_eff):
"""Calculate wind calibration error by replacing alphas"""
# Calculate Ueff
u_eff_replace = self.alpha_replace['alpha1'] * \
np.log(self.wspd) + self.alpha_replace['alpha2'] + \
self.alpha_replace['alpha3'] * self.wspd + self.alpha_replace['resid']
# Calculate uncertainty
error = abs(u_eff_replace - u_eff) * IME / l_eff
return error
def _calc_calibration_error_csf(self, C_lines, u_eff):
"""Calculate wind calibration error by replacing betas"""
# Calculate Ueff
u_eff_replace = self.beta_replace['beta1'] * self.wspd + self.beta_replace['beta2'] + self.beta_replace['resid']
if u_eff_replace == u_eff:
# In case betas are same for area and point sources
u_eff_replace = self.beta_replace['beta1'] * self.wspd + self.beta_replace['beta2'] + self.beta['resid']
# Calculate uncertainty
error = abs(u_eff_replace - u_eff) * np.nanmean(C_lines)
return error
def _def_csf_lines(self, npixel_interval):
"""
Define CSF lines for quantification using polynomial-based centerline.
Parameters
----------
npixel_interval: float
Interval of CSF lines (unit: n*pixel_size).
Returns
-------
center_curve : ndarray
Centerline coordinates in original space
csf_lines : list of LineString
CSF perpendicular lines in original space
"""
ds = self.ds
# Only moving around over land pixels
if self.land_only:
ds = ds.where(self.landmask)
# Get 1D array of plume data
self.gas_valid = ds[self.gas].stack(z=('x', 'y')).dropna(dim='z')
# Get plume source location in UTM projection
transformer = Transformer.from_crs("EPSG:4326", ds.rio.crs, always_xy=True)
x_source, y_source = transformer.transform(
ds.attrs['plume_longitude'],
ds.attrs['plume_latitude']
)
# Prepare data for polynomial fitting
x_data = self.gas_valid.x.values
y_data = self.gas_valid.y.values
# Use gas values as weights to emphasize high-concentration areas
weights = self.gas_valid.values.copy()
weights[weights < 0] = 0 # Remove negative values if any
weights = weights / np.nanmax(weights) # Normalize
# Fit polynomial centerline
poly_order = 3
alpha = 0.1
w, rotation_angle, x_offset, y_offset = fit_polynomial_centerline(
x_data, y_data, weights, x_source, y_source,
order=poly_order, alpha=alpha
)
# Rotate data to polynomial coordinate system for analysis
cos_theta = np.cos(-rotation_angle)
sin_theta = np.sin(-rotation_angle)
x_translated = x_data - x_source
y_translated = y_data - y_source
x_rotated = x_translated * cos_theta - y_translated * sin_theta
y_rotated = x_translated * sin_theta + y_translated * cos_theta
# Determine plume direction in rotated space
x_min_rot, x_max_rot = x_rotated.min(), x_rotated.max()
# Check which direction the plume extends from source (origin in rotated space)
x_left = np.sum((x_rotated < 0) & (weights > 0.1))
x_right = np.sum((x_rotated > 0) & (weights > 0.1))
if x_right >= x_left:
# Plume extends to the right
x_start = 0
x_end = x_max_rot
else:
# Plume extends to the left
x_start = x_min_rot
x_end = 0
# Calculate perpendicular extent
perp_extent = calculate_perpendicular_extent(
x_data, y_data, weights, w, rotation_angle, x_offset, y_offset,
x_start, x_end
)
# Line width: 1.5 * perpendicular extent on each side
line_width = 2 * 1.5 * perp_extent
# Calculate number of lines based on arc length
cumulative_arc, x_samples = compute_curve_arc_length_fine(w, x_start, x_end, n_samples=10000)
total_arc_length = cumulative_arc[-1]
interval = self.pixel_res * npixel_interval
n_points = max(2, int(np.round(total_arc_length / interval)) + 1)
# Generate positions at equal arc-length intervals for verification
if n_points == 1:
arc_positions = np.array([total_arc_length / 2.0])
else:
arc_positions = np.linspace(0, total_arc_length, n_points)
# Get x coordinates in rotated space for each arc position
x_rot_centerline = np.array([get_x_at_arc_length(cumulative_arc, x_samples, arc_len)
for arc_len in arc_positions])
# Transform to original space
x_orig, y_orig = evaluate_polynomial_curve(w, rotation_angle, x_offset,
y_offset, x_rot_centerline)
center_curve = np.column_stack([x_orig, y_orig])
if center_curve.shape[0] > 1:
# Create CSF perpendicular lines in original space
csf_lines = create_perpendicular_lines_equal_arc(
w, rotation_angle, x_offset, y_offset,
x_start, x_end,
n_lines=n_points,
line_width=line_width
)
else:
print('The plume is too short to create CSF lines.')
center_curve = None
csf_lines = None
# Cleanup
ds.close()
del ds
gc.collect()
return center_curve, csf_lines
def _emiss_csf_lines(self, csf_lines):
"""Calculate gas emission rate at each CSF line"""
# create gdf of center points of plume pixel
gdf = gpd.GeoDataFrame({'x': self.gas_valid.coords['x'], 'y': self.gas_valid.coords['y']})
gdf['center'] = gdf.apply(lambda x: Point(x['x'], x['y']), axis=1)
# create square around the pixel center
gdf['polygon'] = gdf['center'].buffer(self.pixel_res/2, cap_style=3)
# Query the spatial index for potential intersections
# https://stackoverflow.com/a/43105613/7347925
spatial_index = STRtree(gdf['polygon'])
def check_intersections(spatial_index, polygon, line):
potential_matches = spatial_index.query(line)
# Check if there is any intersection with the potential matches
match_list = potential_matches[[line.intersects(poly) for poly in polygon.iloc[potential_matches]]]
return match_list
# calculate emission rate using intersected pixels
Q_lines = []
C_lines = []
match_lists = []
u_eff = self.beta['beta1'] * self.wspd + self.beta['beta2']
for line in csf_lines:
match_list = check_intersections(spatial_index, gdf['polygon'], line)
if len(match_list) > 0:
gas_match = self.gas_valid.isel(z=match_list)
C = self._csf_sum(gas_match)
C_lines.append(C)
Q_lines.append(C * u_eff)
match_lists.append(match_list)
else:
Q_lines.append(np.nan)
return np.array(C_lines), u_eff, np.array(Q_lines), match_lists
def _csf_dataset(self, center_curve, csf_lines, Q_lines):
"""Combine CSF info into one xarray Dataset"""
x_center = center_curve[:, 0]
y_center = center_curve[:, 1]
x_start = [line.xy[0][0] for line in csf_lines]
x_end = [line.xy[0][1] for line in csf_lines]
y_start = [line.xy[1][0] for line in csf_lines]
y_end = [line.xy[1][1] for line in csf_lines]
ds_csf = xr.Dataset(
data_vars=dict(emission_rate=(['csf_line'], Q_lines*3600),
x_start=(['csf_line'], x_start),
x_end=(['csf_line'], x_end),
y_start=(['csf_line'], y_start),
y_end=(['csf_line'], y_end),
x_center=('ceter_line', x_center),
y_center=('ceter_line', y_center),
),
)
ds_csf['emission_rate'].attrs['description'] = 'emission rate at each csf line'
ds_csf['emission_rate'].attrs['units'] = 'kg h-1'
ds_csf['x_start'].attrs['description'] = 'start coord (x) of csf line'
ds_csf['x_start'].attrs['units'] = 'm'
ds_csf['x_end'].attrs['description'] = 'end coord (x) of csf line'
ds_csf['x_end'].attrs['units'] = 'm'
ds_csf['y_start'].attrs['description'] = 'start coord (y) of csf line'
ds_csf['y_start'].attrs['units'] = 'm'
ds_csf['y_end'].attrs['description'] = 'end coord (y) of csf line'
ds_csf['y_end'].attrs['units'] = 'm'
ds_csf['x_center'].attrs['description'] = 'x coord of center line'
ds_csf['x_center'].attrs['units'] = 'm'
ds_csf['y_center'].attrs['description'] = 'y coord of center line'
ds_csf['y_center'].attrs['units'] = 'm'
if self.crs is not None:
ds_csf.rio.write_crs(self.crs, inplace=True)
return ds_csf
[docs]
def csf(self, npixel_interval=2.5):
"""Calculate the emission rate (kg/h) using the `Cross-Sectional Flux (CSF) method <https://doi.org/10.5194/amt-11-5673-2018>`_.
Parameters
----------
npixel_interval : float
interval of CSF lines (unit: n*pixel_res).
Returns
-------
ds_csf : :class:`~xarray.Dataset`
The xarray dataset saves the CSF line coords and emission rates for each line.
n_csf : int
The number of CSF lines.
l_csf : float
The length (unit: meter) of the curved CSF line.
u_eff : float
The effective wind speed (m/s).
Q : float
The emission rate (kg/h).
Q_err : float
The total estimate error (kg/h).
err_random : float
The uncertainty (kg/h) caused by the retrieval random error.
err_wind : float
The uncertainty (kg/h) caused by the wind speed error.
err_calib : float
The uncertainty (kg/h) caused by the wind calibration error.
"""
LOG.info('Calculating CSF')
# set the plume centerline and CSF lines
center_curve, csf_lines = self._def_csf_lines(npixel_interval)
if csf_lines is not None:
l_csf = LineString(center_curve).length
# calculate the emission rate at each CSF line
C_lines, u_eff, Q_lines, match_lists = self._emiss_csf_lines(csf_lines)
ds_csf = self._csf_dataset(center_curve, csf_lines, Q_lines)
n_csf = len(C_lines) # number of csf lines
# calculate the mean emission rate
Q = np.nanmean(Q_lines)
# ---- uncertainty ----
# 1. random
LOG.info('Calculating random error')
C_std = self._calc_random_err_csf(u_eff, match_lists)
err_random = u_eff * C_std
# 2. wind error
LOG.info('Calculating wind error')
if self.wspd_manual is None:
err_wind = self._calc_wind_error_csf(C_lines)
else:
err_wind = 0
# 3. calibration error
LOG.info('Calculating calibration error')
err_calib = self._calc_calibration_error_csf(C_lines, u_eff)
# sum error
Q_err = np.sqrt(err_random**2 + err_wind**2 + err_calib**2)
return ds_csf, n_csf, l_csf, u_eff, Q*3600, Q_err*3600, \
err_random*3600, err_wind*3600, err_calib*3600 # kg/h
else:
return None, None, None, None, None, None, None, None, None
[docs]
def ime(self):
"""Calculate the emission rate (kg/h) using the `integrated mass enhancement (IME) method <https://doi.org/10.5194/amt-11-5673-2018>`_.
Returns
-------
sp : float
The mean surface pressure (Pa) in the plume mask.
wspd : float
The mean wind speed (m/s) in the plume mask.
wdir : float
The mean wind direction (deg) in the plume mask.
wspd_all : list
The list of available wind speed (m/s) data.
wdir_all : list
The list of available wind direction (deg) data.
l_ime : float
The geometric plume length (m).
l_eff : float
The effctive plume length (m).
u_eff : float
The effective wind speed (m/s).
IME : float
The total gas mass enhancement (kg).
Q : float
The emission rate (kg/h).
Q_err : float
The total estimate error (kg/h).
err_random : float
The uncertainty (kg/h) caused by the retrieval random error.
err_wind : float
The uncertainty (kg/h) caused by the wind speed error.
err_calib : float
The uncertainty (kg/h) caused by the wind calibration error.
"""
# read file and pick valid data
file_original = self.plume_nc_filename.replace('L3', 'L2').replace(f'_{self.plume_name}.nc', '.nc')
ds_original = xr.open_dataset(file_original)
self.ds_original = ds_original
ds = self.ds
# area of pixel in m2
self.area = self.pixel_res*self.pixel_res
# calculate Leff using the root method in meter
plume_pixel_num = (~self.gas_mask.isnull()).sum()
l_eff = np.sqrt(plume_pixel_num * self.area).item()
# create stacked plume mask array with y/x dim in "npixel" units
y_target, x_target = self._get_index_nearest(
ds['longitude'], ds['latitude'], self.longitude_source, self.latitude_source)
mask_stacked_ime = ds[self.gas].assign_coords({'y': np.arange(ds.sizes['y']),
'x': np.arange(ds.sizes['x']),
}
).stack(z=['y', 'x']).dropna(dim='z')
# calculate the convexhull plume length (m)
l_ime = self._hull_length(mask_stacked_ime, y_target, x_target) * self.pixel_res
# calculate IME (kg)
LOG.info('Calculating IME')
if self.sp_manual is None:
self.sp = ds['sp'].mean().item() # use the mean surface pressure (Pa)
else:
# overwrite the surface pressure
self.sp = self.sp_manual
IME = self._ime_sum(self.gas_mask)
# get wind info
LOG.info('Calculating wind info')
if self.wspd_manual is None:
u10 = ds['u10'].sel(source=self.wind_source).item()
v10 = ds['v10'].sel(source=self.wind_source).item()
else:
# read the first wspd data by default if wspd is set manually, this is only used to calculate wdir
u10 = ds['u10'].isel(source=0).item()
v10 = ds['v10'].isel(source=0).item()
if self.wspd_manual is None:
self.wspd = np.sqrt(u10**2 + v10**2)
else:
# overwrite the windspeed
self.wspd = self.wspd_manual
wdir = (270-np.rad2deg(np.arctan2(v10, u10))) % 360
# check all wind products
wind_source_all = list(ds['source'].to_numpy())
wspd_all = np.sqrt(ds['u10']**2+ds['v10']**2)
wdir_all = (270-np.rad2deg(np.arctan2(ds['v10'], ds['u10']))) % 360
wspd_all = list(wspd_all.to_numpy())
wdir_all = list(wdir_all.to_numpy())
# effective wind speed
u_eff = self.alpha['alpha1'] * np.log(self.wspd) + self.alpha['alpha2'] + self.alpha['alpha3'] * self.wspd
# calculate the emission rate (kg/s)
Q = (u_eff / l_eff * IME)
# ---- uncertainty ----
# 1. random
LOG.info('Calculating random error')
if self.land_only:
# get lon and lat
lon = ds_original['longitude']
lat = ds_original['latitude']
self.landmask = Land_mask(lon.data, lat.data, source=self.land_mask_source)
IME_std = self._calc_random_err(ds_original[self.gas].where(self.landmask),
ds[self.gas].where(self.landmask))
else:
IME_std = self._calc_random_err(ds_original[self.gas], ds[self.gas])
err_random = u_eff / l_eff * IME_std
# 2. wind error
LOG.info('Calculating wind error')
if self.wspd_manual is None:
err_wind = self._calc_wind_error(IME, l_eff)
else:
err_wind = 0
# 3. calibration error
LOG.info('Calculating calibration error')
err_calib = self._calc_calibration_error(IME, u_eff, l_eff)
# sum error
Q_err = np.sqrt(err_random**2 + err_wind**2 + err_calib**2)
ds.close()
ds_original.close()
del ds, ds_original
gc.collect()
return self.sp, self.wspd, wdir, wspd_all, wdir_all, wind_source_all, l_ime, l_eff, u_eff, IME, Q*3600, Q_err*3600, \
err_random*3600, err_wind*3600, err_calib*3600 # kg/h
[docs]
def ime_cm(self):
"""Calculate the emission rate (kg/h) using `Carbon Mapper's method <https://assets.carbonmapper.org/documents/L3_L4%20Algorithm%20Theoretical%20Basis%20Document_formatted_10-24-24.pdf>`_.
Returns
-------
IME : float
The total gas mass enhancement (kg).
L : float
The length (m) of plume hull.
Q : float
The emission rate (kg/h).
"""
# create mask centered on source point
y_target, x_target = self._get_index_nearest(
self.ds['longitude'], self.ds['latitude'], self.longitude_source, self.latitude_source)
# create convex hull of masks
mask_stacked = self.ds[f'{self.gas}_cm'].stack(z=['y', 'x']).dropna(dim='z')
if mask_stacked.isnull().sizes['z'] == 0:
# no plume available
return np.nan, np.nan, np.nan
else:
# calculate the length of hull
L = self._hull_length(mask_stacked,
self.ds_original.coords['y'].isel(y=y_target),
self.ds_original.coords['x'].isel(x=x_target),
)
IME = self._ime_sum(self.ds[f'{self.gas}_cm'])
Q = (IME * self.wspd / L).item()
return IME, L, Q*3600
[docs]
def ime_fetch(self):
"""Calculate the emission rate (kg/h) using the `IME-fetch method <https://www.nature.com/articles/s41586-019-1720-3>`_.
Returns
-------
Q : float
The emission rate (kg/h).
Q_err : float
The total estimate error (kg/h).
err_ime : float
The uncertainty (kg/h) caused by the IME error.
err_wind : float
The uncertainty (kg/h) caused by the wind speed error.
"""
# create mask centered on source point
LOG.info('Calculating the index of source loc')
y_target, x_target = self._get_index_nearest(
self.gas_mask['longitude'], self.gas_mask['latitude'], self.longitude_source, self.latitude_source)
mask = np.zeros(self.gas_mask.shape)
mask[y_target, x_target] = 1
# calculate plume height, width, and diagonal
h = mask.shape[0]
w = mask.shape[1]
# r_max = np.sqrt(h**2+w**2)
r_max = int(2.5e3/self.pixel_res) # limit to 2.5 km
# calculate IME by increasing mask radius
LOG.info(f'Calculating IME at different radii with r_max={r_max} pixels')
ime = []
r = 0
r_step = 1/4 # increase by 1/4 pixel size each step
while r < r_max:
r += r_step
LOG.debug(f'IME {r} loop')
mask = self._create_circular_mask(h, w,
center=(x_target, y_target),
radius=r)
# Calculate IME (kg) inside the radius mask
gas_mask = self.gas_mask.where(mask)
IME = self._ime_sum(gas_mask)
ime.append(IME)
# no new plume pixels anymore
if mask.all():
LOG.info(f'Masking iteration stops at r = {r} pixel length')
break
# calculate emission rate
L = np.arange(0+r_step, r_max, r_step).mean()*self.pixel_res
ime_l_mean = np.mean(ime)/np.mean(L)
ime_l_std = np.std(ime/L)
Q = (ime_l_mean * self.wspd).item()
# ---- uncertainty ----
# 1. ime
LOG.info('Calculating IME error')
err_ime = Q * ime_l_std / ime_l_mean
# 2. wind error
LOG.info('Calculating wind error')
if self.wspd_manual is None:
err_wind = self._calc_wind_error_fetch(ime_l_mean)
else:
err_wind = 0
# sum error
Q_err = np.sqrt(err_ime**2 + err_wind**2)
return Q*3600, Q_err*3600, err_ime*3600, err_wind*3600 # kg/h