Source code for pydda.retrieval.wind_retrieve

"""
reated on Mon Aug  7 09:17:40 2017

@author: rjackson
"""

import pyart
import numpy as np
import time
import math
import xarray as xr

from scipy.interpolate import interp1d
from scipy.optimize import fmin_l_bfgs_b
from scipy.signal import savgol_filter
from .auglag import auglag
from ..io import read_from_pyart_grid

try:
    import tensorflow_probability as tfp
    import tensorflow as tf

    TENSORFLOW_AVAILABLE = True
except (ImportError, AttributeError):
    TENSORFLOW_AVAILABLE = False

try:
    import jax.numpy as jnp
    import jax
    import jaxopt

    JAX_AVAILABLE = True
except ImportError:
    JAX_AVAILABLE = False

# imports changed to local import path to run on computer
from ..cost_functions import (
    J_function,
    grad_J,
    calculate_fall_speed,
    grad_jax,
    J_function_jax,
)
from copy import deepcopy
from .angles import add_azimuth_as_field, add_elevation_as_field

_wprevmax = np.empty(0)
_wcurrmax = np.empty(0)
iterations = 0


[docs] class DDParameters(object): """ This is a helper class for inserting more arguments into the :func:`pydda.cost_functions.J_function` and :func:`pydda.cost_functions.grad_J` function. Since these cost functions take numerous parameters, this class will store the needed parameters as one positional argument for easier readability of the code. In addition, class members can be added here so that those contributing more constraints to the variational framework can add any parameters they may need. Attributes ---------- vrs: List of float arrays List of radial velocities from each radar azs: List of float arrays List of azimuths from each radar els: List of float arrays List of elevations from each radar wts: List of float arrays Float array containing fall speed from radar. u_back: 1D float array (number of vertical levels) Background u wind v_back: 1D float array (number of vertical levels) Background u wind u_model: list of 3D float arrays U from each model integrated into the retrieval v_model: list of 3D float arrays V from each model integrated into the retrieval w_model: W from each model integrated into the retrieval Co: float Weighting coefficient for data constraint. Cm: float Weighting coefficient for mass continuity constraint. Cx: float Smoothing coefficient for x-direction Cy: float Smoothing coefficient for y-direction Cz: float Smoothing coefficient for z-direction Cb: float Coefficient for sounding constraint Cv: float Weight for cost function related to vertical vorticity equation. Cmod: float Coefficient for model constraint Cpoint: float Coefficient for point constraint Ut: float Prescribed storm motion. This is only needed if Cv is not zero. Vt: float Prescribed storm motion. This is only needed if Cv is not zero. grid_shape: Shape of wind grid dx: Spacing of grid in x direction dy: Spacing of grid in y direction dz: Spacing of grid in z direction x: E-W grid levels in m y: N-S grid levels in m z: Grid vertical levels in m rmsVr: float The sum of squares of velocity/num_points. Use for normalization of data weighting coefficient weights: n_radars by z_bins by y_bins x x_bins float array Data weights for each pair of radars bg_weights: z_bins by y_bins x x_bins float array Data weights for sounding constraint model_weights: n_models by z_bins by y_bins by x_bins float array Data weights for each model. point_list: list or None point_list: list of dicts List of point constraints. Each member is a dict with keys of "u", "v", to correspond to each component of the wind field and "x", "y", "z" to correspond to the location of the point observation in the Grid's Cartesian coordinates. roi: float The radius of influence of each point observation in m. upper_bc: bool True to enforce w=0 at top of domain (impermeability condition), False to not enforce impermeability at top of domain """ def __init__(self): self.Ut = np.nan self.Vt = np.nan self.rmsVr = np.nan self.grid_shape = None self.Cmod = np.nan self.Cpoint = np.nan self.u_back = None self.v_back = None self.wts = [] self.vrs = [] self.azs = [] self.els = [] self.weights = [] self.bg_weights = [] self.model_weights = [] self.u_model = [] self.v_model = [] self.w_model = [] self.x = None self.y = None self.z = None self.dx = np.nan self.dy = np.nan self.dz = np.nan self.Co = 1.0 self.Cm = 1500.0 self.Cx = 0.0 self.Cy = 0.0 self.Cz = 0.0 self.Cb = 0.0 self.Cv = 0.0 self.Cmod = 0.0 self.Cpoint = 0.0 self.Ut = 0.0 self.Vt = 0.0 self.upper_bc = True self.lower_bc = True self.roi = 1000.0 self.frz = 4500.0 self.Nfeval = 0.0 self.engine = "scipy" self.point_list = [] self.cvtol = 1e-2 self.gtol = 1e-2 self.Jveltol = 100.0 self.const_boundary_cond = False
def _get_dd_wind_field_scipy( Grids, u_init, v_init, w_init, engine, points=None, vel_name=None, refl_field=None, u_back=None, v_back=None, z_back=None, frz=4500.0, Co=1.0, Cm=1500.0, Cx=0.0, Cy=0.0, Cz=0.0, Cb=0.0, Cv=0.0, Cmod=0.0, Cpoint=0.0, cvtol=1e-2, gtol=1e-2, Jveltol=100.0, Ut=None, Vt=None, low_pass_filter=True, mask_outside_opt=False, weights_obs=None, weights_model=None, weights_bg=None, max_iterations=1000, mask_w_outside_opt=True, filter_window=5, filter_order=3, min_bca=30.0, max_bca=150.0, upper_bc=True, model_fields=None, output_cost_functions=True, roi=1000.0, wind_tol=0.1, tolerance=1e-8, const_boundary_cond=False, max_wind_mag=100.0, ): global _wcurrmax global _wprevmax global iterations # We have to have a prescribed storm motion for vorticity constraint if Ut is None or Vt is None: if Cv != 0.0: raise ValueError( ( "Ut and Vt cannot be None if vertical " + "vorticity constraint is enabled!" ) ) if not isinstance(Grids, list): raise ValueError("Grids has to be a list!") parameters = DDParameters() parameters.Ut = Ut parameters.Vt = Vt parameters.engine = engine parameters.const_boundary_cond = const_boundary_cond print(parameters.const_boundary_cond) # Ensure that all Grids are on the same coordinate system prev_grid = Grids[0] for g in Grids: if not np.allclose(g["x"].values, prev_grid["x"].values, atol=10): raise ValueError("Grids do not have equal x coordinates!") if not np.allclose(g["y"].values, prev_grid["y"].values, atol=10): raise ValueError("Grids do not have equal y coordinates!") if not np.allclose(g["z"].values, prev_grid["z"].values, atol=10): raise ValueError("Grids do not have equal z coordinates!") if not np.allclose( g["origin_latitude"].values, prev_grid["origin_latitude"].values ): raise ValueError(("Grids have unequal origin lat/lons!")) prev_grid = g if engine.lower() == "auglag" and not TENSORFLOW_AVAILABLE: raise ModuleNotFoundError( "Tensorflow 2.6+ needs to be installed for the Augmented Lagrangian solver." ) # Disable background constraint if none provided if u_back is None or v_back is None: parameters.u_back = np.zeros(u_init.shape[0]) parameters.v_back = np.zeros(v_init.shape[0]) else: # Interpolate sounding to radar grid print("Interpolating sounding to radar grid") if isinstance(u_back, np.ma.MaskedArray): u_back = u_back.filled(-9999.0) if isinstance(v_back, np.ma.MaskedArray): v_back = v_back.filled(-9999.0) if isinstance(z_back, np.ma.MaskedArray): z_back = z_back.filled(-9999.0) valid_inds = np.logical_and.reduce( (u_back > -9998, v_back > -9998, z_back > -9998) ) u_interp = interp1d(z_back[valid_inds], u_back[valid_inds], bounds_error=False) v_interp = interp1d(z_back[valid_inds], v_back[valid_inds], bounds_error=False) if isinstance(Grids[0]["z"].values, np.ma.MaskedArray): parameters.u_back = u_interp(Grids[0]["z"].values.filled(np.nan)) parameters.v_back = v_interp(Grids[0]["z"].values.filled(np.nan)) else: parameters.u_back = u_interp(Grids[0]["z"].values) parameters.v_back = v_interp(Grids[0]["z"].values) print("Grid levels:") print(Grids[0]["z"].values) # Parse names of velocity field if refl_field is None: refl_field = pyart.config.get_field_name("reflectivity") # Parse names of velocity field if vel_name is None: vel_name = pyart.config.get_field_name("corrected_velocity") winds = np.stack([u_init, v_init, w_init]) # Set up wind fields and weights from each radar parameters.weights = np.zeros( (len(Grids), u_init.shape[0], u_init.shape[1], u_init.shape[2]) ) parameters.bg_weights = np.zeros(v_init.shape) if model_fields is not None: parameters.model_weights = np.ones( (len(model_fields), u_init.shape[0], u_init.shape[1], u_init.shape[2]) ) else: parameters.model_weights = np.zeros( (1, u_init.shape[0], u_init.shape[1], u_init.shape[2]) ) if model_fields is None: if Cmod != 0.0: raise ValueError("Cmod must be zero if model fields are not specified!") bca = np.zeros((len(Grids), len(Grids), u_init.shape[1], u_init.shape[2])) sum_Vr = np.zeros(len(Grids)) for i in range(len(Grids)): parameters.wts.append( np.ma.masked_invalid( calculate_fall_speed(Grids[i], refl_field=refl_field, frz=frz).squeeze() ) ) parameters.vrs.append(np.ma.masked_invalid(Grids[i][vel_name].values.squeeze())) parameters.azs.append( np.ma.masked_invalid(Grids[i]["AZ"].values.squeeze() * np.pi / 180) ) parameters.els.append( np.ma.masked_invalid(Grids[i]["EL"].values.squeeze() * np.pi / 180) ) if len(Grids) > 1: for i in range(len(Grids)): for j in range(len(Grids)): if i == j: continue print(("Calculating weights for radars " + str(i) + " and " + str(j))) bca[i, j] = get_bca(Grids[i], Grids[j]) for k in range(parameters.vrs[i].shape[0]): if weights_obs is None: valid = np.logical_and.reduce( ( ~parameters.vrs[i][k].mask, ~parameters.wts[i][k].mask, ~parameters.azs[i][k].mask, ~parameters.els[i][k].mask, ) ) valid = np.logical_and.reduce( ( valid, np.isfinite(parameters.vrs[i][k]), np.isfinite(parameters.wts[i][k]), np.isfinite(parameters.azs[i][k]), np.isfinite(parameters.els[i][k]), ) ) valid = np.logical_and.reduce( ( valid, np.isfinite(parameters.vrs[j][k]), np.isfinite(parameters.wts[j][k]), np.isfinite(parameters.azs[j][k]), np.isfinite(parameters.els[j][k]), ) ) valid = np.logical_and.reduce( ( valid, ~parameters.vrs[j][k].mask, ~parameters.wts[j][k].mask, ~parameters.azs[j][k].mask, ~parameters.els[j][k].mask, ) ) cur_array = parameters.weights[i, k] cur_array[ np.logical_and( valid, np.logical_and( bca[i, j] >= math.radians(min_bca), bca[i, j] <= math.radians(max_bca), ), ) ] = 1 cur_array[~valid] = 0 parameters.weights[i, k] += cur_array else: parameters.weights[i, k] = weights_obs[i][k, :, :] if weights_bg is None: valid = np.logical_and.reduce( ( ~parameters.vrs[j][k].mask, ~parameters.wts[j][k].mask, ~parameters.azs[j][k].mask, ~parameters.els[j][k].mask, ) ) valid = np.logical_and.reduce( ( valid, np.isfinite(parameters.vrs[j][k]), np.isfinite(parameters.wts[j][k]), np.isfinite(parameters.azs[j][k]), np.isfinite(parameters.els[j][k]), ) ) valid = np.logical_and.reduce( ( valid, np.isfinite(parameters.vrs[j][k]), np.isfinite(parameters.wts[j][k]), np.isfinite(parameters.azs[j][k]), np.isfinite(parameters.els[j][k]), ) ) valid = np.logical_and.reduce( ( valid, ~parameters.vrs[j][k].mask, ~parameters.wts[j][k].mask, ~parameters.azs[j][k].mask, ~parameters.els[j][k].mask, ) ) cur_array = parameters.bg_weights[k] cur_array[ np.logical_or.reduce( ( ~valid, bca[i, j] < math.radians(min_bca), bca[i, j] > math.radians(max_bca), ) ) ] = 1 cur_array[~valid] = 1 parameters.bg_weights[i] += cur_array else: parameters.bg_weights[i] = weights_bg[i] print("Calculating weights for models...") coverage_grade = parameters.weights.sum(axis=0) coverage_grade = coverage_grade / coverage_grade.max() # Weigh in model input more when we have no coverage # Model only weighs 1/(# of grids + 1) when there is full # Coverage if model_fields is not None: if weights_model is None: for i in range(len(model_fields)): parameters.model_weights[i] = 1 - ( coverage_grade / (len(Grids) + 1) ) else: for i in range(len(model_fields)): parameters.model_weights[i] = weights_model[i] else: if weights_obs is None: parameters.weights[0] = np.where(~parameters.vrs[0].mask, 1, 0) else: parameters.weights[0] = weights_obs[0] if weights_bg is None: parameters.bg_weights = np.where(~parameters.vrs[0].mask, 0, 1) else: parameters.bg_weights = weights_bg parameters.vrs = [x.filled(-9999.0) for x in parameters.vrs] parameters.azs = [x.filled(-9999.0) for x in parameters.azs] parameters.els = [x.filled(-9999.0) for x in parameters.els] parameters.wts = [x.filled(-9999.0) for x in parameters.wts] parameters.weights[~np.isfinite(parameters.weights)] = 0 parameters.bg_weights[~np.isfinite(parameters.bg_weights)] = 0 parameters.weights[parameters.weights > 0] = 1 parameters.bg_weights[parameters.bg_weights > 0] = 1 sum_Vr = np.nansum(np.square(parameters.vrs * parameters.weights)) parameters.rmsVr = np.sqrt(np.nansum(sum_Vr) / np.nansum(parameters.weights)) del bca parameters.grid_shape = u_init.shape # Parse names of velocity field winds = winds.flatten() print("Starting solver ") parameters.dx = np.diff(Grids[0]["x"].values, axis=0)[0] parameters.dy = np.diff(Grids[0]["y"].values, axis=0)[0] parameters.dz = np.diff(Grids[0]["z"].values, axis=0)[0] print("rmsVR = " + str(parameters.rmsVr)) print("Total points: %d" % parameters.weights.sum()) parameters.z = Grids[0]["point_z"].values parameters.x = Grids[0]["point_x"].values parameters.y = Grids[0]["point_y"].values bt = time.time() # First pass - no filter wcurrmax = w_init.max() print("The max of w_init is", wcurrmax) iterations = 0 bounds = [(-x, x) for x in max_wind_mag * np.ones(winds.shape)] if model_fields is not None: for i, the_field in enumerate(model_fields): u_field = "U_" + the_field v_field = "V_" + the_field w_field = "W_" + the_field parameters.u_model.append(np.nan_to_num(Grids[0][u_field].values.squeeze())) parameters.v_model.append(np.nan_to_num(Grids[0][v_field].values.squeeze())) parameters.w_model.append(np.nan_to_num(Grids[0][w_field].values.squeeze())) # Don't weigh in where model data unavailable where_finite_u = np.isfinite(Grids[0][u_field].values.squeeze()) where_finite_v = np.isfinite(Grids[0][v_field].values.squeeze()) where_finite_w = np.isfinite(Grids[0][w_field].values.squeeze()) parameters.model_weights[i, :, :, :] = np.where( np.logical_and.reduce((where_finite_u, where_finite_v, where_finite_w)), 1, 0, ) print("Total number of model points: %d" % np.sum(parameters.model_weights)) parameters.Co = Co parameters.Cm = Cm parameters.Cx = Cx parameters.Cy = Cy parameters.Cz = Cz parameters.Cb = Cb parameters.Cv = Cv parameters.Cmod = Cmod parameters.Cpoint = Cpoint parameters.roi = roi parameters.upper_bc = upper_bc parameters.points = points parameters.point_list = points _wprevmax = np.zeros(parameters.grid_shape) _wcurrmax = np.zeros(parameters.grid_shape) iterations = 0 if engine.lower() == "scipy" or engine.lower() == "jax": def _vert_velocity_callback(x): global _wprevmax global _wcurrmax global iterations if iterations % 10 > 0: iterations = iterations + 1 return False wind = np.reshape( x, ( 3, parameters.grid_shape[0], parameters.grid_shape[1], parameters.grid_shape[2], ), ) _wcurrmax = wind[2] if iterations == 0: _wprevmax = _wcurrmax iterations = iterations + 1 return False diff = np.abs(_wprevmax - _wcurrmax) diff = np.where(parameters.bg_weights == 0, diff, np.nan) delta = np.nanmax(diff) if delta < wind_tol: return True _wprevmax = _wcurrmax iterations = iterations + 1 print("Max change in w: %4.3f" % delta) return False parameters.print_out = False if engine.lower() == "scipy": winds = fmin_l_bfgs_b( J_function, winds, args=(parameters,), maxiter=max_iterations, pgtol=tolerance, bounds=bounds, fprime=grad_J, disp=0, iprint=-1, callback=_vert_velocity_callback, ) else: def loss_and_gradient(x): x_loss = J_function_jax(x["winds"], parameters) x_grad = {} x_grad["winds"] = grad_jax(x["winds"], parameters) return x_loss, x_grad bounds = ( {"winds": -max_wind_mag * jnp.ones(winds.shape)}, {"winds": max_wind_mag * jnp.ones(winds.shape)}, ) winds = jnp.array(winds) solver = jaxopt.LBFGSB( loss_and_gradient, True, has_aux=False, maxiter=max_iterations, tol=tolerance, jit=True, implicit_diff=False, ) winds = {"winds": winds} winds, state = solver.run(winds, bounds=bounds) winds = [np.asanyarray(winds["winds"])] winds = np.reshape( winds[0], ( 3, parameters.grid_shape[0], parameters.grid_shape[1], parameters.grid_shape[2], ), ) parameters.print_out = True elif engine.lower() == "auglag": if not TENSORFLOW_AVAILABLE: raise ImportError( "Tensorflow must be available to use the Augmented Lagrangian engine!" ) parameters.vrs = [tf.constant(x, dtype=tf.float32) for x in parameters.vrs] parameters.azs = [tf.constant(x, dtype=tf.float32) for x in parameters.azs] parameters.els = [tf.constant(x, dtype=tf.float32) for x in parameters.els] parameters.wts = [tf.constant(x, dtype=tf.float32) for x in parameters.wts] parameters.model_weights = tf.constant( parameters.model_weights, dtype=tf.float32 ) parameters.weights[~np.isfinite(parameters.weights)] = 0 parameters.weights[parameters.weights > 0] = 1 parameters.weights = tf.constant(parameters.weights, dtype=tf.float32) parameters.bg_weights[parameters.bg_weights > 0] = 1 parameters.bg_weights = tf.constant(parameters.bg_weights, dtype=tf.float32) parameters.z = tf.constant(Grids[0]["point_z"].values, dtype=tf.float32) parameters.x = tf.constant(Grids[0]["point_x"].values, dtype=tf.float32) parameters.y = tf.constant(Grids[0]["point_y"].values, dtype=tf.float32) bounds = [(-x, x) for x in max_wind_mag * np.ones(winds.shape, dtype="float32")] winds = winds.astype("float32") winds, mult, AL_Filter, funcalls = auglag(winds, parameters, bounds) # """ winds = np.stack([winds[0], winds[1], winds[2]]) winds = winds.flatten() if low_pass_filter is True: print("Applying low pass filter to wind field...") winds = np.reshape( winds, ( 3, parameters.grid_shape[0], parameters.grid_shape[1], parameters.grid_shape[2], ), ) winds[0] = savgol_filter(winds[0], filter_window, filter_order, axis=0) winds[0] = savgol_filter(winds[0], filter_window, filter_order, axis=1) winds[0] = savgol_filter(winds[0], filter_window, filter_order, axis=2) winds[1] = savgol_filter(winds[1], filter_window, filter_order, axis=0) winds[1] = savgol_filter(winds[1], filter_window, filter_order, axis=1) winds[1] = savgol_filter(winds[1], filter_window, filter_order, axis=2) winds[2] = savgol_filter(winds[2], filter_window, filter_order, axis=0) winds[2] = savgol_filter(winds[2], filter_window, filter_order, axis=1) winds[2] = savgol_filter(winds[2], filter_window, filter_order, axis=2) winds = np.stack([winds[0], winds[1], winds[2]]) winds = winds.flatten() print("Done! Time = " + "{:2.1f}".format(time.time() - bt)) # First pass - no filter the_winds = np.reshape( winds, ( 3, parameters.grid_shape[0], parameters.grid_shape[1], parameters.grid_shape[2], ), ) u = the_winds[0] v = the_winds[1] w = the_winds[2] where_mask = np.sum(parameters.weights, axis=0) + np.sum( parameters.model_weights, axis=0 ) u = np.ma.array(u) w = np.ma.array(w) v = np.ma.array(v) if mask_outside_opt is True: u = np.ma.masked_where(where_mask < 1, u) v = np.ma.masked_where(where_mask < 1, v) w = np.ma.masked_where(where_mask < 1, w) if mask_w_outside_opt is True: w = np.ma.masked_where(where_mask < 1, w) u_field = {} u_field["standard_name"] = "u_wind" u_field["long_name"] = "meridional component of wind velocity" u_field["min_bca"] = min_bca u_field["max_bca"] = max_bca v_field = {} v_field["standard_name"] = "v_wind" v_field["long_name"] = "zonal component of wind velocity" v_field["min_bca"] = min_bca v_field["max_bca"] = max_bca w_field = {} w_field["standard_name"] = "w_wind" w_field["long_name"] = "vertical component of wind velocity" w_field["min_bca"] = min_bca w_field["max_bca"] = max_bca new_grid_list = [] for grid in Grids: grid["u"] = xr.DataArray( np.expand_dims(u, 0), dims=("time", "z", "y", "x"), attrs=u_field ) grid["v"] = xr.DataArray( np.expand_dims(v, 0), dims=("time", "z", "y", "x"), attrs=v_field ) grid["w"] = xr.DataArray( np.expand_dims(w, 0), dims=("time", "z", "y", "x"), attrs=w_field ) new_grid_list.append(grid) return new_grid_list, parameters def _get_dd_wind_field_tensorflow( Grids, u_init, v_init, w_init, points=None, vel_name=None, refl_field=None, u_back=None, v_back=None, z_back=None, frz=4500.0, Co=1.0, Cm=1500.0, Cx=0.0, Cy=0.0, Cz=0.0, Cb=0.0, Cv=0.0, Cmod=0.0, Cpoint=0.0, Ut=None, Vt=None, low_pass_filter=True, mask_outside_opt=False, weights_obs=None, weights_model=None, weights_bg=None, max_iterations=200, mask_w_outside_opt=True, filter_window=5, filter_order=3, min_bca=30.0, max_bca=150.0, upper_bc=True, model_fields=None, output_cost_functions=True, roi=1000.0, lower_bc=True, parallel_iterations=1, wind_tol=0.1, tolerance=1e-8, const_boundary_cond=False, max_wind_mag=100.0, ): if not TENSORFLOW_AVAILABLE: raise ImportError( "Tensorflow >=2.5 and tensorflow-probability " + "need to be installed in order to use the tensorflow engine." ) # We have to have a prescribed storm motion for vorticity constraint if Ut is None or Vt is None: if Cv != 0.0: raise ValueError( ( "Ut and Vt cannot be None if vertical " + "vorticity constraint is enabled!" ) ) if not isinstance(Grids, list): raise ValueError("Grids has to be a list!") parameters = DDParameters() parameters.Ut = Ut parameters.Vt = Vt parameters.upper_bc = upper_bc parameters.lower_bc = lower_bc parameters.engine = "tensorflow" parameters.const_boundary_cond = const_boundary_cond # Ensure that all Grids are on the same coordinate system prev_grid = Grids[0] for g in Grids: if not np.allclose(g["x"].values, prev_grid["x"].values, atol=10): raise ValueError("Grids do not have equal x coordinates!") if not np.allclose(g["y"].values, prev_grid["y"].values, atol=10): raise ValueError("Grids do not have equal y coordinates!") if not np.allclose(g["z"].values, prev_grid["z"].values, atol=10): raise ValueError("Grids do not have equal z coordinates!") if not np.allclose( g["origin_latitude"].values, prev_grid["origin_latitude"].values ): raise ValueError(("Grids have unequal origin lat/lons!")) prev_grid = g # Disable background constraint if none provided if u_back is None or v_back is None: parameters.u_back = tf.zeros(u_init.shape[0]) parameters.v_back = tf.zeros(v_init.shape[0]) else: # Interpolate sounding to radar grid print("Interpolating sounding to radar grid") if isinstance(u_back, np.ma.MaskedArray): u_back = u_back.filled(-9999.0) if isinstance(v_back, np.ma.MaskedArray): v_back = v_back.filled(-9999.0) if isinstance(z_back, np.ma.MaskedArray): z_back = z_back.filled(-9999.0) valid_inds = np.logical_and.reduce( (u_back > -9998, v_back > -9998, z_back > -9998) ) u_interp = interp1d(z_back[valid_inds], u_back[valid_inds], bounds_error=False) v_interp = interp1d(z_back[valid_inds], v_back[valid_inds], bounds_error=False) if isinstance(Grids[0]["z"].values, np.ma.MaskedArray): parameters.u_back = tf.constant( u_interp(Grids[0]["z"].values.filled(np.nan)), dtype=tf.float32 ) parameters.v_back = tf.constant( v_interp(Grids[0]["z"].values.filled(np.nan)), dtype=tf.float32 ) else: parameters.u_back = tf.constant( u_interp(Grids[0]["z"].values), dtype=tf.float32 ) parameters.v_back = tf.constant( v_interp(Grids[0]["z"].values), dtype=tf.float32 ) print("Interpolated U field:") print(parameters.u_back) print("Interpolated V field:") print(parameters.v_back) print("Grid levels:") print(Grids[0]["z"].values) # Parse names of velocity field if refl_field is None: refl_field = pyart.config.get_field_name("reflectivity") # Parse names of velocity field if vel_name is None: vel_name = pyart.config.get_field_name("corrected_velocity") winds = np.stack([u_init, v_init, w_init]) winds = winds.astype(np.float32) # Set up wind fields and weights from each radar parameters.weights = np.zeros( (len(Grids), u_init.shape[0], u_init.shape[1], u_init.shape[2]), dtype=np.float32, ) parameters.bg_weights = np.zeros(v_init.shape) if model_fields is not None: parameters.model_weights = np.ones( (len(model_fields), u_init.shape[0], u_init.shape[1], u_init.shape[2]), dtype=np.float32, ) else: parameters.model_weights = np.zeros( (1, u_init.shape[0], u_init.shape[1], u_init.shape[2]), dtype=np.float32 ) if model_fields is None: if Cmod != 0.0: raise ValueError("Cmod must be zero if model fields are not specified!") bca = np.zeros( (len(Grids), len(Grids), u_init.shape[1], u_init.shape[2]), dtype=np.float32 ) for i in range(len(Grids)): parameters.wts.append( np.ma.masked_invalid( calculate_fall_speed(Grids[i], refl_field=refl_field, frz=frz).squeeze() ) ) parameters.vrs.append(np.ma.masked_invalid(Grids[i][vel_name].values.squeeze())) parameters.azs.append( np.ma.masked_invalid(Grids[i]["AZ"].values.squeeze() * np.pi / 180) ) parameters.els.append( np.ma.masked_invalid(Grids[i]["EL"].values.squeeze() * np.pi / 180) ) if len(Grids) > 1: for i in range(len(Grids)): for j in range(len(Grids)): if i == j: continue print(("Calculating weights for radars " + str(i) + " and " + str(j))) bca[i, j] = get_bca(Grids[i], Grids[j]) for k in range(parameters.vrs[i].shape[0]): if weights_obs is None: cur_array = parameters.weights[i, k] valid = np.logical_and.reduce( ( ~parameters.vrs[i][k].mask, ~parameters.wts[i][k].mask, ~parameters.azs[i][k].mask, ~parameters.els[i][k].mask, ) ) valid = np.logical_and.reduce( ( valid, np.isfinite(parameters.vrs[i][k]), np.isfinite(parameters.wts[i][k]), np.isfinite(parameters.azs[i][k]), np.isfinite(parameters.els[i][k]), ) ) valid = np.logical_and.reduce( ( valid, np.isfinite(parameters.vrs[j][k]), np.isfinite(parameters.wts[j][k]), np.isfinite(parameters.azs[j][k]), np.isfinite(parameters.els[j][k]), ) ) valid = np.logical_and.reduce( ( valid, ~parameters.vrs[j][k].mask, ~parameters.wts[j][k].mask, ~parameters.azs[j][k].mask, ~parameters.els[j][k].mask, ) ) cur_array[ np.logical_and( valid, np.logical_and( bca[i, j] >= math.radians(min_bca), bca[i, j] <= math.radians(max_bca), ), ) ] = 1 cur_array[~valid] = 0 parameters.weights[i, k] += cur_array else: parameters.weights[i, k] = weights_obs[i][k, :, :] if weights_bg is None: cur_array = parameters.bg_weights[k] valid = np.logical_and.reduce( ( ~parameters.vrs[i][k].mask, ~parameters.wts[i][k].mask, ~parameters.azs[i][k].mask, ~parameters.els[i][k].mask, ) ) valid = np.logical_and.reduce( ( valid, np.isfinite(parameters.vrs[i][k]), np.isfinite(parameters.wts[i][k]), np.isfinite(parameters.azs[i][k]), np.isfinite(parameters.els[i][k]), ) ) valid = np.logical_and.reduce( ( valid, np.isfinite(parameters.vrs[j][k]), np.isfinite(parameters.wts[j][k]), np.isfinite(parameters.azs[j][k]), np.isfinite(parameters.els[j][k]), ) ) valid = np.logical_and.reduce( ( valid, ~parameters.vrs[j][k].mask, ~parameters.wts[j][k].mask, ~parameters.azs[j][k].mask, ~parameters.els[j][k].mask, ) ) cur_array[ np.logical_or.reduce( ( ~valid, bca[i, j] < math.radians(min_bca), bca[i, j] > math.radians(max_bca), ) ) ] = 1 cur_array[~valid] = 1 parameters.bg_weights[i] += cur_array else: parameters.bg_weights[i] = weights_bg[i] print("Calculating weights for models...") coverage_grade = parameters.weights.sum(axis=0) coverage_grade = coverage_grade / coverage_grade.max() # Weigh in model input more when we have no coverage # Model only weighs 1/(# of grids + 1) when there is full # Coverage if model_fields is not None: if weights_model is None: for i in range(len(model_fields)): parameters.model_weights[i] = 1 - ( coverage_grade / (len(Grids) + 1) ) else: for i in range(len(model_fields)): parameters.model_weights[i] = weights_model[i] else: if weights_obs is None: parameters.weights[0] = np.where(np.isfinite(parameters.vrs[0]), 1, 0) else: parameters.weights[0] = weights_obs[0] if weights_bg is None: parameters.bg_weights = np.where(np.isfinite(parameters.vrs[0]), 0, 1) else: parameters.bg_weights = weights_bg parameters.vrs = [ tf.constant(x.filled(-9999), dtype=tf.float32) for x in parameters.vrs ] parameters.azs = [ tf.constant(x.filled(-9999), dtype=tf.float32) for x in parameters.azs ] parameters.els = [ tf.constant(x.filled(-9999), dtype=tf.float32) for x in parameters.els ] parameters.wts = [ tf.constant(x.filled(-9999), dtype=tf.float32) for x in parameters.wts ] parameters.weights[~np.isfinite(parameters.weights)] = 0 parameters.weights[parameters.weights > 0] = 1 for i in range(len(Grids)): print("Points from Radar %d: %d" % (i, parameters.weights[i].sum())) parameters.weights = tf.constant(parameters.weights, dtype=tf.float32) parameters.bg_weights[parameters.bg_weights > 0] = 1 parameters.bg_weights = tf.constant(parameters.bg_weights, dtype=tf.float32) sum_Vr = tf.experimental.numpy.nansum( tf.square(parameters.vrs * parameters.weights) ) parameters.rmsVr = np.sqrt( np.nansum(sum_Vr) / tf.experimental.numpy.nansum(parameters.weights) ) del bca parameters.grid_shape = u_init.shape # Parse names of velocity field winds = winds.flatten() winds = tf.Variable(winds, name="winds") print("Starting solver ") parameters.dx = np.diff(Grids[0]["x"].values, axis=0)[0] parameters.dy = np.diff(Grids[0]["y"].values, axis=0)[0] parameters.dz = np.diff(Grids[0]["z"].values, axis=0)[0] print("rmsVR = " + str(parameters.rmsVr)) print("Total points: %d" % tf.reduce_sum(parameters.weights)) parameters.z = tf.constant(Grids[0]["point_z"].values, dtype=tf.float32) parameters.x = tf.constant(Grids[0]["point_x"].values, dtype=tf.float32) parameters.y = tf.constant(Grids[0]["point_y"].values, dtype=tf.float32) bt = time.time() # First pass - no filter wcurrmax = w_init.max() print("The max of w_init is", wcurrmax) [(-x, x) for x in 100.0 * np.ones(winds.shape)] if model_fields is not None: for i, the_field in enumerate(model_fields): u_field = "U_" + the_field v_field = "V_" + the_field w_field = "W_" + the_field parameters.u_model.append( tf.constant(np.nan_to_num(Grids[0][u_field].values.squeeze())) ) parameters.v_model.append( tf.constant(np.nan_to_num(Grids[0][v_field].values.squeeze())) ) parameters.w_model.append( tf.constant(np.nan_to_num(Grids[0][w_field].values.squeeze())) ) # Don't weigh in where model data unavailable where_finite_u = np.isfinite(Grids[0][u_field].values.squeeze()) where_finite_v = np.isfinite(Grids[0][v_field].values.squeeze()) where_finite_w = np.isfinite(Grids[0][w_field].values.squeeze()) parameters.model_weights[i, :, :, :] = np.where( np.logical_and.reduce((where_finite_u, where_finite_v, where_finite_w)), 1, 0, ) parameters.model_weights = tf.constant(parameters.model_weights, dtype=tf.float32) parameters.Co = Co parameters.Cm = Cm parameters.Cx = Cx parameters.Cy = Cy parameters.Cz = Cz parameters.Cb = Cb parameters.Cv = Cv parameters.Cmod = Cmod parameters.Cpoint = Cpoint parameters.roi = roi parameters.upper_bc = upper_bc parameters.points = points parameters.point_list = points loss_and_gradient = lambda x: (J_function(x, parameters), grad_J(x, parameters)) winds = tfp.optimizer.lbfgs_minimize( loss_and_gradient, initial_position=winds, tolerance=tolerance, x_tolerance=wind_tol, max_iterations=max_iterations, parallel_iterations=parallel_iterations, max_line_search_iterations=20, ) winds = np.reshape( winds.position.numpy(), ( 3, parameters.grid_shape[0], parameters.grid_shape[1], parameters.grid_shape[2], ), ) wcurrmax = winds[2].max() winds = np.stack([winds[0], winds[1], winds[2]]) winds = winds.flatten() # """ if low_pass_filter: print("Applying low pass filter to wind field...") winds = np.asarray(winds) winds = np.reshape( winds, ( 3, parameters.grid_shape[0], parameters.grid_shape[1], parameters.grid_shape[2], ), ) winds[0] = savgol_filter(winds[0], filter_window, filter_order, axis=0) winds[0] = savgol_filter(winds[0], filter_window, filter_order, axis=1) winds[0] = savgol_filter(winds[0], filter_window, filter_order, axis=2) winds[1] = savgol_filter(winds[1], filter_window, filter_order, axis=0) winds[1] = savgol_filter(winds[1], filter_window, filter_order, axis=1) winds[1] = savgol_filter(winds[1], filter_window, filter_order, axis=2) winds[2] = savgol_filter(winds[2], filter_window, filter_order, axis=0) winds[2] = savgol_filter(winds[2], filter_window, filter_order, axis=1) winds[2] = savgol_filter(winds[2], filter_window, filter_order, axis=2) winds = np.stack([winds[0], winds[1], winds[2]]) winds = winds.flatten() print("Done! Time = " + "{:2.1f}".format(time.time() - bt)) the_winds = np.reshape( winds, ( 3, parameters.grid_shape[0], parameters.grid_shape[1], parameters.grid_shape[2], ), ) u = the_winds[0] v = the_winds[1] w = the_winds[2] where_mask = np.sum(parameters.weights, axis=0) + np.sum( parameters.model_weights, axis=0 ) u = np.ma.array(u) w = np.ma.array(w) v = np.ma.array(v) if mask_outside_opt is True: u = np.ma.masked_where(where_mask < 1, u) v = np.ma.masked_where(where_mask < 1, v) w = np.ma.masked_where(where_mask < 1, w) if mask_w_outside_opt is True: w = np.ma.masked_where(where_mask < 1, w) u_field = {} u_field["standard_name"] = "u_wind" u_field["long_name"] = "meridional component of wind velocity" u_field["min_bca"] = min_bca u_field["max_bca"] = max_bca v_field = {} v_field["standard_name"] = "v_wind" v_field["long_name"] = "zonal component of wind velocity" v_field["min_bca"] = min_bca v_field["max_bca"] = max_bca w_field = {} w_field["standard_name"] = "w_wind" w_field["long_name"] = "vertical component of wind velocity" w_field["min_bca"] = min_bca w_field["max_bca"] = max_bca new_grid_list = [] for grid in Grids: grid["u"] = xr.DataArray( np.expand_dims(u, 0), dims=("time", "z", "y", "x"), attrs=u_field ) grid["v"] = xr.DataArray( np.expand_dims(v, 0), dims=("time", "z", "y", "x"), attrs=v_field ) grid["w"] = xr.DataArray( np.expand_dims(w, 0), dims=("time", "z", "y", "x"), attrs=w_field ) new_grid_list.append(grid) return new_grid_list, parameters
[docs] def get_dd_wind_field( Grids, u_init=None, v_init=None, w_init=None, engine="scipy", **kwargs ): """ This function takes in a list of Py-ART Grid objects and derives a wind field. Every Py-ART Grid in Grids must have the same grid specification. In order for the model data constraint to be used, the model data must be added as a field to at least one of the grids in Grids. This involves interpolating the model data to the Grids' coordinates. There are helper functions for this for WRF and HRRR data in :py:func:`pydda.constraints`: :py:func:`make_constraint_from_wrf` :py:func:`add_hrrr_constraint_to_grid` Parameters ========== Grids: list of Py-ART/DDA Grids The list of Py-ART or PyDDA grids to take in corresponding to each radar. All grids must have the same shape, x coordinates, y coordinates and z coordinates. u_init: 3D ndarray The intial guess for the zonal wind field, input as a 3D array with the same shape as the fields in Grids. If this is None, PyDDA will use the u field in the first Grid as the initalization. v_init: 3D ndarray The intial guess for the meridional wind field, input as a 3D array with the same shape as the fields in Grids. If this is None, PyDDA will use the v field in the first Grid as the initalization. w_init: 3D ndarray The intial guess for the vertical wind field, input as a 3D array with the same shape as the fields in Grids. If this is None, PyDDA will use the w field in the first Grid as the initalization. engine: str (one of "scipy", "tensorflow", "jax") Setting this flag will use the solver based off of SciPy, TensorFlow, or Jax. Using Tensorflow or Jax expands PyDDA's capabiability to take advantage of GPU-based systems. In addition, these two implementations use automatic differentation to calculate the gradient of the cost function in order to optimize the gradient calculation. TensorFlow 2.6 and tensorflow-probability are required for the TensorFlow-based engine. The latest version of Jax is required for the Jax-based engine. points: None or list of dicts Point observations as returned by :func:`pydda.constraints.get_iem_obs`. Set to None to disable. vel_name: string Name of radial velocity field. Setting to None will have PyDDA attempt to automatically detect the velocity field name. refl_field: string Name of reflectivity field. Setting to None will have PyDDA attempt to automatically detect the reflectivity field name. u_back: 1D array Background zonal wind field from a sounding as a function of height. This should be given in the sounding's vertical coordinates. v_back: 1D array Background meridional wind field from a sounding as a function of height. This should be given in the sounding's vertical coordinates. z_back: 1D array Heights corresponding to background wind field levels in meters. This is given in the sounding's original coordinates. frz: float Freezing level used for fall speed calculation in meters. Co: float Weight for cost function related to observed radial velocities. Cm: float Weight for cost function related to the mass continuity equation. Cx: float Weight for cost function related to smoothness in x direction Cy: float Weight for cost function related to smoothness in y direction Cz: float Weight for cost function related to smoothness in z direction Cv: float Weight for cost function related to vertical vorticity equation. Cmod: float Weight for cost function related to custom constraints. Cpoint: float Weight for cost function related to point observations. weights_obs: list of floating point arrays or None List of weights for each point in grid from each radar in Grids. Set to None to let PyDDA determine this automatically. weights_model: list of floating point arrays or None List of weights for each point in grid from each custom field in model_fields. Set to None to let PyDDA determine this automatically. weights_bg: list of floating point arrays or None List of weights for each point in grid from the sounding. Set to None to let PyDDA determine this automatically. Ut: float Prescribed storm motion in zonal direction. This is only needed if Cv is not zero. Vt: float Prescribed storm motion in meridional direction. This is only needed if Cv is not zero. filter_winds: bool If this is True, PyDDA will run a low pass filter on the retrieved wind field. Set to False to disable the low pass filter. mask_outside_opt: bool If set to true, wind values outside the multiple doppler lobes will be masked, i.e. if less than 2 radars provide coverage for a given point. max_iterations: int The maximum number of iterations to run the optimization loop for. mask_w_outside_opt: bool If set to true, vertical winds outside the multiple doppler lobes will be masked, i.e. if less than 2 radars provide coverage for a given point. filter_window: int Window size to use for the low pass filter. A larger window will increase the number of points factored into the polynomial fit for the filter, and hence will increase the smoothness. filter_order: int The order of the polynomial to use for the low pass filter. Higher order polynomials allow for the retention of smaller scale features but may also not remove enough noise. min_bca: float Minimum beam crossing angle in degrees between two radars. 30.0 is the typical value used in many publications. max_bca: float Minimum beam crossing angle in degrees between two radars. 150.0 is the typical value used in many publications. upper_bc: bool Set this to true to enforce w = 0 at the top of the atmosphere. This is commonly called the impermeability condition. model_fields: list of strings The list of fields in the first grid in Grids that contain the custom data interpolated to the Grid's grid specification. Helper functions to create such gridded fields for HRRR and NetCDF WRF data exist in :py:func:`pydda.constraints`. PyDDA will look for fields named *U_(model field name)*, *V_(model field name)*, and *W_(model field name)*. For example, if you have *U_hrrr*, *V_hrrr*, and *W_hrrr*, then specify *["hrrr"]* into model_fields. output_cost_functions: bool Set to True to output the value of each cost function every 10 iterations. roi: float Radius of influence for the point observations. The point observation will not hold any weight outside this radius. parallel_iterations: int The number of iterations to run in parallel in the optimization loop. This is only for the TensorFlow-based engine. wind_tol: float Stop iterations after maximum change in winds is less than this value. tolerance: float Tolerance for :math:`L_{2}` norm of gradient before stopping. max_wind_magnitude: float Constrain the optimization to have :math:`|u|`, :math:`|w|`, and :math:`|w| < x` m/s. Returns ======= new_grid_list: list A list of Py-ART grids containing the derived wind fields. These fields are displayable by the visualization module. parameters: struct The parameters used in the generation of the Multi-Doppler wind field. """ if isinstance(Grids, list): if isinstance(Grids[0], pyart.core.Grid): for x in Grids: new_grids = [read_from_pyart_grid(x) for x in Grids] else: new_grids = Grids elif isinstance(Grids, pyart.core.Grid): new_grids = [read_from_pyart_grid(Grids)] elif isinstance(Grids, xr.Dataset): new_grids = [Grids] else: raise TypeError( "Input grids must be an xarray Dataset, Py-ART Grid, or a list of those." ) if u_init is None: u_init = new_grids[0]["u"].values.squeeze() if v_init is None: v_init = new_grids[0]["v"].values.squeeze() if w_init is None: w_init = new_grids[0]["w"].values.squeeze() if ( engine.lower() == "scipy" or engine.lower() == "jax" or engine.lower() == "auglag" ): return _get_dd_wind_field_scipy( new_grids, u_init, v_init, w_init, engine, **kwargs ) elif engine.lower() == "tensorflow": return _get_dd_wind_field_tensorflow( new_grids, u_init, v_init, w_init, **kwargs ) else: raise NotImplementedError("Engine %s is not supported." % engine)
[docs] def get_bca(Grid1, Grid2): """ This function gets the beam crossing angle between two lat/lon pairs. Parameters ========== Grid1: xarray (PyDDA) Dataset The PyDDA Dataset storing the first radar's Grid. Grid2: PyDDA Dataset The PyDDA Dataset storing the second radar's Grid. Returns ======= bca: nD float array The beam crossing angle between the two radars in radians. """ rad1_lon = Grid1["radar_longitude"].values rad1_lat = Grid1["radar_latitude"].values rad2_lon = Grid2["radar_longitude"].values rad2_lat = Grid2["radar_latitude"].values x = Grid1["point_x"].values y = Grid1["point_y"].values projparams = Grid1["projection"].attrs if projparams["_include_lon_0_lat_0"] == "true": projparams["lat_0"] = Grid1["origin_latitude"].values projparams["lon_0"] = Grid1["origin_longitude"].values rad1 = pyart.core.geographic_to_cartesian(rad1_lon, rad1_lat, projparams) rad2 = pyart.core.geographic_to_cartesian(rad2_lon, rad2_lat, projparams) # Create grid with Radar 1 in center x = x - rad1[0] y = y - rad1[1] rad2 = np.array(rad2) - np.array(rad1) a = np.sqrt(np.multiply(x, x) + np.multiply(y, y)) b = np.sqrt(pow(x - rad2[0], 2) + pow(y - rad2[1], 2)) c = np.sqrt(rad2[0] * rad2[0] + rad2[1] * rad2[1]) inp_array1 = x / a inp_array1 = np.where(inp_array1 < -1, -1, inp_array1) inp_array1 = np.where(inp_array1 > 1, 1, inp_array1) inp_array2 = (x - rad2[1]) / b inp_array2 = np.where(inp_array2 < -1, -1, inp_array2) inp_array2 = np.where(inp_array2 > 1, 1, inp_array2) inp_array3 = (a * a + b * b - c * c) / (2 * a * b) inp_array3 = np.where(inp_array3 < -1, -1, inp_array3) inp_array3 = np.where(inp_array3 > 1, 1, inp_array3) return np.ma.masked_invalid(np.arccos(inp_array3))[0, :, :]