Source code for avni.tools.bases

#!/usr/bin/env python

#####################  IMPORT STANDARD MODULES   #########################

# python 3 compatibility
from __future__ import absolute_import, division, print_function
from builtins import *

import numpy as np
from collections import Counter
from scipy import sparse
from timeit import default_timer as timer
from numba import jit,int64
import typing as tp

####################### IMPORT AVNI LIBRARIES  ###########################

from avni.f2py import vbspl,dbsplrem,ylm,shold
from .trigd import sind,cosd,acosd
from .common import convert2nparray,makegrid

##########################################################################

[docs]def eval_vbspl(radius: tp.Union[list,tuple,np.ndarray], knots: tp.Union[str,list,tuple,float,np.int64,np.ndarray,bool]): """Evaluate the cubic spline knot with second derivative as 0 at end points. In contrast to :py:func:`eval_splrem`, this function distributes the spline knots unevenly at the specified depths or radii. Parameters ---------- radius : tp.Union[list,tuple,np.ndarray] A radius value or array of radii (or alternatively depths) queried in km knots : tp.Union[str,list,tuple,float,np.int64,np.ndarray,bool] A numpy array or list of radii (or depths) of spline knots in km Returns ------- vercof, dvercof: float or np.ndarray value of the polynomial coefficients at each depth and derivative. Both arrays have size (Nradius, Nsplines). :Authors: Raj Moulik (moulik@caa.columbia.edu) :Last Modified: 2023.02.16 5.00 """ if isinstance(knots, (list,tuple,np.ndarray)): knots = np.asarray(knots) knots = np.sort(knots) else: raise TypeError('knots must be list or tuple, not %s' % type(knots)) # convert to numpy arrays radius = convert2nparray(radius) # find repeated values repeats = [item for item, count in Counter(knots).items() if count > 1] repeats_gt_2= [item for item, count in Counter(knots).items() if count > 2] if len(repeats_gt_2) != 0: raise ValueError('Cannot have more than 2 repetitions in knots') # if there are repeated knots, splits it if len(repeats) > 0: split_at = [] for index,value in enumerate(repeats): split_at.append(np.where(knots==value)[0][1]) knots_list = np.split(knots, split_at) for knots in knots_list: if len(knots) < 4: raise ValueError('Atleast 4 knots need to be defined at or between '+str(min(knots))+' and '+str(max(knots))+' km') jj = 0 for query in radius: jj = jj + 1 for index,value in enumerate(knots_list): # create the arrays as Fortran-contiguous splpts = np.array(value.tolist(), order = 'F') #Undefined if query does not lie within the query extents of knot points if query < min(value) or query > max(value): temp1 = temp2 = np.zeros_like(splpts) else: (temp1, temp2) = vbspl(query,len(splpts),splpts) if index == 0: vercof_temp = temp1; dvercof_temp = temp2 else: vercof_temp = np.concatenate((vercof_temp,temp1)) dvercof_temp = np.concatenate((dvercof_temp,temp1)) if jj == 1: vercof = vercof_temp; dvercof = dvercof_temp else: vercof = np.vstack([vercof,vercof_temp]) dvercof = np.vstack([dvercof,dvercof_temp]) # when there are no repeated knots else: if len(knots) < 4: raise ValueError('Atleast 4 knots need to be defined at or between '+str(min(knots))+' and '+str(max(knots))) # create the arrays as Fortran-contiguous splpts = np.array(knots.tolist(), order = 'F') # initialize vercof = np.ones((len(radius),len(knots))) dvercof = np.zeros((len(radius),len(knots))) for idep,query in enumerate(radius): #Undefined if query does not lie within the query extents of knot points if query < min(knots) or query > max(knots): vercof_temp = dvercof_temp = np.zeros_like(splpts) else: (vercof_temp, dvercof_temp) = vbspl(query,len(splpts),splpts) vercof[idep]=vercof_temp dvercof[idep]=dvercof_temp return vercof, dvercof
[docs]def eval_splrem(radius: tp.Union[list,tuple,np.ndarray], radius_range: tp.Union[list,tuple,np.ndarray], nsplines: int): """Evaluate the cubic spline knot with second derivative as 0 at end points. In contrast to :py:func:`eval_vbspl`, this function distributes the spline knots evenly within the radius range. Parameters ---------- radius : tp.Union[list,tuple,np.ndarray] A radius value or array of radii (or alternatively depths) queried in km radius_range : tp.Union[list,tuple,np.ndarray] Limits of the radius (or depths) limits of the region nsplines : int number of splines within the range Returns ------- vercof, dvercof: float or np.ndarray value of the polynomial coefficients at each depth and derivative. Both arrays have size (Nradius, Nsplines). :Authors: Raj Moulik (moulik@caa.columbia.edu) :Last Modified: 2023.02.16 5.00 """ # convert to numpy arrays radius = convert2nparray(radius) if len(radius_range) != 2 or not isinstance(radius_range, (list,tuple,np.ndarray)): raise TypeError('radius_range must be list , not %s' % type(radius_range)) for irad,radval in enumerate(radius): #Undefined if depth does not lie within the depth extents of knot points if radval < min(radius_range) or radval > max(radius_range): temp1 = temp2 = np.zeros(nsplines) else: (temp1, temp2) = dbsplrem(radval,radius_range[0], radius_range[1],nsplines) if irad == 0: vercof = temp1; dvercof = temp2 else: vercof = np.vstack([vercof,temp1]) dvercof = np.vstack([dvercof,temp2]) return vercof, dvercof
[docs]def eval_polynomial(radius: tp.Union[list,tuple,np.ndarray], radius_range: tp.Union[list,tuple,np.ndarray], rnorm: float, types: list = ['CONSTANT','LINEAR']): """Evaluate a set of polynomial functions within a range of radii. Parameters ---------- radius : tp.Union[list,tuple,np.ndarray] A radius value or array of radii queried in km radius_range : tp.Union[list,tuple,np.ndarray] Limits of the radius limits of the region rnorm : float normalization for radius, usually the radius of the planet types : list, optional polynomial coefficients to be used for calculation. Options are : TOP,BOTTOM, CONSTANT, LINEAR, QUADRATIC, CUBIC, by default ['CONSTANT','LINEAR'] Returns ------- vercof, dvercof: float or np.ndarray value of the polynomial coefficients and derivative at each depth size (Nradius) :Authors: Raj Moulik (moulik@caa.columbia.edu) :Last Modified: 2023.02.16 5.00 """ # convert to numpy arrays radiusin = convert2nparray(radius) radius_temp = convert2nparray(radius_range) if radius_temp.ndim == 1: radius_range = convert2nparray([radius_temp.tolist()]) if radius_range.shape[1] != 2: raise TypeError('Only two values allowed within radius_range') elif radius_temp.ndim == 2: radius_range = radius_temp if radius_range.shape[1] != 2: raise TypeError('Only two values allowed within radius_range') if not isinstance(radius_range, (list,tuple,np.ndarray)): raise TypeError('radius_range must be list , not %s' % type(radius_range)) # keys in coefficients should be acceptable choices = ['TOP', 'BOTTOM', 'CONSTANT', 'LINEAR', 'QUADRATIC', 'CUBIC'] if not np.all([key in choices for key in types]): raise AssertionError('Only polynomial bases can be used') npoly = len(radius_range)*len(types) # first find whether CONSTANT/LINEAR or TOP/BOTTOM for radii in radius_range: if not np.all(np.sort(radii)==radii): raise AssertionError('radius_range needs to be sorted') # see if either TOP/BOT or CONSTANT/LINEAR exists findtopbot = np.any([key in ['BOTTOM','TOP'] for key in types]) findconstantlinear = np.any([key in ['CONSTANT','LINEAR'] for key in types]) if findtopbot and findconstantlinear: raise ValueError('Cannot have both BOTTOM/TOP and CONSTANT/LINEAR as types in eval_polynomial') # loop through each depth and get the polynomial values and derivatives for irad,_ in enumerate(radiusin): temp = np.zeros(npoly) dtemp = np.zeros(npoly) for irange,_ in enumerate(radius_range): rbot = min(radius_range[irange])/rnorm rtop = max(radius_range[irange])/rnorm #Undefined if depth does not lie within the depth extents of knot points if radiusin[irad]/rnorm < rbot or radiusin[irad]/rnorm > rtop: # <= so that the boundary depth belongs to only one radial kernel for itype in range(len(types)): ii = irange*len(types)+itype temp[ii]=0. dtemp[ii]=0. else: rn=radiusin[irad]/rnorm for itype,_ in enumerate(types): ii = irange*len(types)+itype if findconstantlinear: if types[itype]=='CONSTANT': temp[ii]=1. dtemp[ii]=0. elif types[itype]=='LINEAR': temp[ii]=rn dtemp[ii]=1. elif types[itype]=='QUADRATIC': temp[ii]=rn**2 dtemp[ii]=2.*rn elif types[itype]=='CUBIC': temp[ii]=rn**3 dtemp[ii]=3.*rn**2 elif findtopbot: if types[itype]=='TOP': temp[ii] = 1.-(rn-rtop)/(rbot-rtop) dtemp[ii]= -1./(rbot-rtop) elif types[itype]=='BOTTOM': temp[ii] = (rn-rtop)/(rbot-rtop) dtemp[ii]= 1./(rbot-rtop) elif types[itype]=='QUADRATIC': temp[ii] = rn**2-rtop**2-(rn-rtop)*(rbot+rtop) dtemp[ii]=2.*rn - 1./(rbot+rtop) elif types[itype]=='CUBIC': temp[ii]=rn**3-rtop**3-(rn-rtop)*(rbot**3-rtop**3)/(rbot-rtop) dtemp[ii]=3.*rn**2 - 1.*(rbot**3-rtop**3)/(rbot-rtop) if irad == 0: vercof = temp; dvercof = dtemp else: vercof = np.vstack([vercof,temp]) dvercof = np.vstack([dvercof,dtemp]) return vercof,dvercof
[docs]def eval_splcon(latitude: tp.Union[list,tuple,np.ndarray], longitude: tp.Union[list,tuple,np.ndarray], xlaspl: np.ndarray, xlospl: np.ndarray, xraspl: np.ndarray): """Evaluate spherical splines at a set of locations. Parameters ---------- latitude : tp.Union[list,tuple,np.ndarray] Latitudes of locations queried longitude : tp.Union[list,tuple,np.ndarray] Longitudes of locations queried xlaspl : np.ndarray Latitude locations of splines xlospl : np.ndarray Longitude locations of splines xraspl : np.ndarray Radius of splines Returns ------- horcof : np.ndarray Value of the horizontal coefficents at each location. Size of numpy array is [len(latitude) X ncoefhor] """ latitude = convert2nparray(latitude) longitude = convert2nparray(longitude) if not len(latitude) == len(longitude): raise AssertionError('latitude and longitude should be of same length') if not len(xlaspl) == len(xlospl) == len(xraspl): raise AssertionError('xlaspl,xlospl and xraspl should be of same length') ncoefhor = len(xlaspl) values = sparse.csr_matrix((len(latitude),ncoefhor)) # empty matrix for iloc in range(len(latitude)): lat = latitude[iloc] lon = longitude[iloc] #--- make lon go from 0-360 if lon<0.: lon=lon+360. xlospl[np.where(xlospl<0.)]=xlospl[np.where(xlospl<0.)]+360. ncon,colind,con = splcon(lat,lon,ncoefhor,xlaspl,xlospl,xraspl) rowind = iloc*np.ones(ncon) # update values values = values + sparse.csr_matrix((con, (rowind, colind)), shape=(len(latitude),ncoefhor)) return values
[docs]@jit(nopython=True) def splcon(lat:float,lon:float,ncoefhor:int,xlaspl:np.ndarray,xlospl:np.ndarray,xraspl:np.ndarray): """Evaluate spherical splines at a given location. Modified from the Fortran code splcon.f that is based on Wang and Dahlen (1995) :cite:`Wang:1995fn`. Parameters ---------- lat : float latitude query lon : float longitude query ncoefhor : int number of splines xlaspl : np.ndarray spline latitudes xlospl : np.ndarray spline longitudes xraspl : np.ndarray spline radii Returns ------- ncon,icon,con number of splines, spline index, and value at each location :Authors: Raj Moulik (moulik@caa.columbia.edu) :Last Modified: 2023.02.16 5.00 """ ncon=0 con=np.zeros(ncoefhor) icon=np.zeros(ncoefhor,dtype=int64) for iver in range(ncoefhor): if lat>xlaspl[iver]-2.*xraspl[iver]: if lat<xlaspl[iver]+2.*xraspl[iver]: dd=sind(xlaspl[iver])*sind(lat) dd=dd+cosd(xlaspl[iver])*cosd(lat)*cosd(lon-xlospl[iver]) dd=acosd(dd) if dd <= xraspl[iver]*2.: icon[ncon] = iver rn=dd/xraspl[iver] dr=rn-1. if rn <= 1.: con[ncon] = (0.75*rn-1.5)*(rn**2)+1. elif rn > 1.: con[ncon] = ((-0.25*dr+0.75)*dr-0.75)*dr+0.25 else: con[ncon] = 0. ncon=ncon+1 return ncon,icon[:ncon],con[:ncon]
[docs]def eval_ylm(latitude: tp.Union[list,tuple,np.ndarray], longitude: tp.Union[list,tuple,np.ndarray], lmaxhor: int, weights: tp.Union[None,list,tuple,np.ndarray] = None, grid: bool = False, norm: str = 'ylm'): """Evaluate spherical harmonics with a specific normalization. Parameters ---------- latitude : tp.Union[list,tuple,np.ndarray] Latitudes of locations queried longitude : tp.Union[list,tuple,np.ndarray] Longitudes of locations queried lmaxhor : int Maximum spherical harmonic degree weights : tp.Union[None,list,tuple,np.ndarray], optional Weights to multiply the bases with i.e. coefficients, by default None grid : bool, optional Create a grid based on a combination of latitudes and longitudes, by default False norm : str, optional Spherical harmonic normalization, by default 'ylm' Returns ------- horcof Value of the horizontal coefficents at each location. Size of numpy array is [len(latitude) X ((lmaxhor+1)^2)] :Authors: Raj Moulik (moulik@caa.columbia.edu) :Last Modified: 2023.02.16 5.00 """ # convert to numpy arrays latitude = convert2nparray(latitude) longitude = convert2nparray(longitude) nlat = len(latitude) nlon = len(longitude) #checks if grid: nrows,latitude,longitude = makegrid(latitude,longitude) else: if not (latitude.ndim == longitude.ndim == 1): raise ValueError("latitude, longitude or depth_in_km should be one-dimensional arrays") nrows = nlat ncoefhor = np.power(lmaxhor+1,2) # numpye of coefficients upto Lmax if np.any(weights == None): horcof = sparse.csr_matrix((nrows,ncoefhor)) # empty matrix else: weights = convert2nparray(weights) assert(len(weights)==ncoefhor),' length of weights and lat/lon need to be same' values = np.zeros_like(latitude) if not (len(latitude) == len(longitude)): raise ValueError("latitude, longitude or depth_in_km should be of same length if not making grid = False") for iloc in range(len(latitude)): lat = latitude[iloc] lon = longitude[iloc] #--- make lon go from 0-360 if lon<0.: lon=lon+360. # wk1,wk2,wk3 are legendre polynomials of size Lmax+1 # ylmcof is the value of Ylm if norm == 'ylm': ylmcof, _ , _ , _ = ylm(lat,lon,lmaxhor,ncoefhor,lmaxhor+1) elif norm == 'shold': ylmcof = shold(lat,lon,lmaxhor,ncoefhor) if np.any(weights == None): rowind = iloc*np.ones(ncoefhor) colind = np.arange(ncoefhor) # update values horcof = horcof + sparse.csr_matrix((ylmcof, (rowind, colind)), shape=(nrows,ncoefhor)) else: values[iloc] = np.sum(ylmcof*weights) return horcof if np.any(weights == None) else values
[docs]def eval_pixel(latitude: tp.Union[list,tuple,np.ndarray], longitude: tp.Union[list,tuple,np.ndarray], xlapix: tp.Union[list,tuple,np.ndarray], xlopix: tp.Union[list,tuple,np.ndarray], xsipix: tp.Union[list,tuple,np.ndarray]): """Evaluate pixel values at a set of locations. Parameters ---------- latitude : tp.Union[list,tuple,np.ndarray] Latitudes of locations queried longitude : tp.Union[list,tuple,np.ndarray] Longitudes of locations queried xlapix : tp.Union[list,tuple,np.ndarray] Pixel center latitudes xlopix : tp.Union[list,tuple,np.ndarray] Pixel center longitudes xsipix : tp.Union[list,tuple,np.ndarray] Pixel sizes in degrees Returns ------- horcof Value of the horizontal coefficents at each location. Size of numpy array is [len(latitude) X len(xsipix)] :Authors: Raj Moulik (moulik@caa.columbia.edu) :Last Modified: 2023.02.16 5.00 """ latitude = convert2nparray(latitude) longitude = convert2nparray(longitude) if not len(latitude) == len(longitude): raise AssertionError('latitude and longitude should be of same length') if not len(xlapix) == len(xlopix) == len(xsipix): raise AssertionError('xlapix,xlopix,xsipix should be of same length') if len(np.unique(xsipix)) > 1: strout='' for ii in range(len(np.unique(xsipix))): strout = strout+', '+str(np.unique(xsipix)[ii]) print('Warning: multiple pixel sizes in the PIXEL basis fir evaluation in bases.eval_pixel. Sizes are ') labo = xlapix-xsipix/2.; lato = xlapix+xsipix/2. labo = np.clip(labo,-90.,90.); lato = np.clip(lato,-90.,90.) # go from (-180,180) to (0,360) xlopix[np.where(xlopix<0.)]=xlopix[np.where(xlopix<0.)]+360. lole = xlopix-xsipix/2.; lori = xlopix+xsipix/2. lori = np.clip(lori,0.,360.); lole = np.clip(lole,0.,360.) horcof = sparse.csr_matrix((len(latitude),len(xsipix))) # empty matrix for iloc,lat in enumerate(latitude): lon = longitude[iloc] #--- make lon go from 0-360 if lon<0.: lon=lon+360. # check if the location lies within pixel if lat != 90.: latfind = np.intersect1d(np.where(lat<lato), np.where(lat>=labo)) else: latfind = np.intersect1d(np.where(lat<=lato), np.where(lat>=labo)) if lon != 360.: lonfind = np.intersect1d(np.where(lon<lori), np.where(lon>=lole)) else: lonfind = np.intersect1d(np.where(lon<=lori), np.where(lon>=lole)) findindex = np.intersect1d(latfind,lonfind) if len(findindex) != 1: raise ValueError('found '+str(len(findindex))+' pixels for the location ('+str(lat)+','+str(lon)+')') rowind = iloc*np.ones_like(findindex) values = np.ones_like(findindex,dtype=float) colind = np.array(findindex) # update values horcof = horcof + sparse.csr_matrix((values, (rowind, colind)), shape=(len(latitude),len(xsipix))) return horcof