#!/usr/bin/env python
##################### IMPORT STANDARD MODULES #########################
# python 3 compatibility
from __future__ import absolute_import, division, print_function
import sys
if (sys.version_info[:2] < (3, 0)):
from builtins import float,int,list,tuple
import pkgutil
import os
import codecs,json #printing output
import numpy as np
import re
from six import string_types # to check if variable is string using isinstance
import ntpath
import pint # For SI units
import decimal
from numba import jit
import typing as tp
import pandas as pd
import warnings
####################### IMPORT AVNI LIBRARIES ###########################
from .. import constants
[docs]def stage(file: str, overwrite: bool = False):
"""Stages a file in the AVNI file directories for testing
file : str
file name to be staged
overwrite : bool, optional
overwite any existing symlink or file, by default False
File not found or a symlink already exists
Raj Moulik (moulik@caa.columbia.edu)
:Last Modified:
2023.02.16 5.00
filedir = get_filedir() #AVNI file directory
stagedfile = get_fullpath(file)
if not os.path.isfile(stagedfile): raise IOError(stagedfile+' not found')
outlink = filedir+'/'+ntpath.basename(file)
os.symlink(stagedfile, outlink)
except OSError:
if overwrite and os.path.islink(outlink):
os.symlink(stagedfile, outlink)
warnings.warn('WARNING: overwriting an existing staged link named '+ntpath.basename(file))
raise IOError('A link to an actual file '+ntpath.basename(file)+' (not symlink) exists within AVNI. Delete the file '+outlink+' first before proceeding.')
[docs]def parse_line(line: str,rx_dict: dict):
"""Function used to parse line with key word from rx_dict
line : str
Line to search
rx_dict : dict
Do a regex search against all defined regexes
Result of the first matching regex, None if not found
Raj Moulik (moulik@caa.columbia.edu)
:Last Modified:
2023.02.16 5.00
for key, rx in rx_dict.items():
match = rx.search(line)
if match:
return key, match
# if there are no matches
return None, None
def ifwithindepth(start_depths: np.ndarray,end_depths: np.ndarray,depth_in_km: np.ndarray):
"""Check if a set of depths are within a range of depths
start_depths : np.ndarray
Starting depth for the range
end_depths : np.ndarray
End depth for the range
depth_in_km : np.ndarray
Depths to check for
index of depth range that each `depth_in_km` belongs to
Raj Moulik (moulik@caa.columbia.edu)
:Last Modified:
2023.02.16 5.00
if depth_in_km.ndim != 1: raise ValueError('only 1-D array depth_in_km allowed')
output = np.ones_like(depth_in_km,dtype=np.int64)
output[:] = -1
for ii,depth in enumerate(depth_in_km):
for jj,start in enumerate(start_depths):
end = end_depths[jj]
if depth >= start and depth <= end:
return output
[docs]def makegrid(latitude: tp.Union[list,tuple,np.ndarray],
longitude: tp.Union[list,tuple,np.ndarray],
depth_in_km: tp.Union[None,list,tuple,np.ndarray] = None):
"""Make a 2D or 3D grid out of input locations and depths.
latitude : tp.Union[list,tuple,np.ndarray]
Latitudes of locations queried
longitude : tp.Union[list,tuple,np.ndarray]
Longitudes of locations queried
depth_in_km : tp.Union[None,list,tuple,np.ndarray], optional
Depths of locations queried, by default None
nrows,latitude,longitude,[optional: depth_in_km]
A 2D or 3D grid with `nrows` rows found by unraveling (depth_in_km,latitude,longitude)
Raj Moulik (moulik@caa.columbia.edu)
:Last Modified:
2023.02.16 5.00
# convert to numpy arrays
latitude = convert2nparray(latitude)
longitude = convert2nparray(longitude)
nlat = len(latitude)
nlon = len(longitude)
if depth_in_km==None:
if not (latitude.ndim == longitude.ndim == 1): raise ValueError("latitude, longitude or depth_in_km should be one-dimensional arrays")
nrows = nlat*nlon
longitude,latitude = np.meshgrid(longitude,latitude)
longitude = longitude.ravel()
latitude = latitude.ravel()
if not (len(latitude) == len(longitude)): raise ValueError("latitude, longitude or depth_in_km should be of same length if not making grid = False")
return nrows,latitude,longitude
if not (latitude.ndim == longitude.ndim == depth_in_km.ndim == 1): raise ValueError("latitude, longitude or depth_in_km should be one-dimensional arrays")
depth_in_km = convert2nparray(depth_in_km)
ndep = len(depth_in_km)
nrows = ndep*nlat*nlon
depth_tmp = np.zeros(nrows)
for indx,depth in enumerate(depth_in_km):
depth_tmp[indx*nlat*nlon:(indx+1)*nlat*nlon] = depth * np.ones(nlat*nlon)
latitude = np.tile(latitude,len(depth_in_km))
longitude = np.tile(longitude,len(depth_in_km))
depth_in_km = depth_tmp
if not (len(latitude) == len(longitude) == len(depth_in_km)): raise ValueError("latitude, longitude or depth_in_km should be of same length if not making grid = False")
return nrows,latitude,longitude,depth_in_km
[docs]def convert2nparray(value: tp.Union[str,list,tuple,float,np.int64,np.ndarray,bool],
int2float: bool = True,
allowstrings: bool = True) -> np.ndarray:
"""Converts input value to a float numpy array. Boolean are returned as Boolean arrays.
value : tp.Union[str,list,tuple,float,np.int64,np.ndarray,bool]
A single value or a set of values.
int2float : bool, optional
Convert integer to floats, by default True
allowstrings : bool, optional
Check if value has strings, by default True
Output values as a numpy array.
Raj Moulik (moulik@caa.columbia.edu)
:Last Modified:
2023.02.16 5.00
# checks
if isinstance(value, (list,tuple,np.ndarray)):
outvalue = np.asarray(value)
elif isinstance(value, bool):
outvalue = np.asarray([value])
elif isinstance(value, float):
outvalue = np.asarray([value])
elif isinstance(value, (int,np.int64)):
if int2float:
outvalue = np.asarray([float(value)])
outvalue = np.asarray([value])
elif isinstance(value,string_types):
if not allowstrings: raise TypeError('input cannot be a string')
outvalue = np.asarray([value])
raise TypeError('input must be list or tuple, not %s' % type(value))
return outvalue
[docs]def precision_and_scale(x: float, max_digits: int = 14) -> tp.Tuple[int, int]:
"""Returns precision and scale of a float
x : float
A floating point number
max_digits : int, optional
Maximum digits or magnitude, by default 14
tp.Tuple[int, int]
Returns precision and scale of a float
Raj Moulik (moulik@caa.columbia.edu)
:Last Modified:
2023.02.16 5.00
int_part = np.int(abs(x))
magnitude = 1 if int_part == 0 else np.int(np.log10(int_part)) + 1
if magnitude >= max_digits:
return (magnitude, 0)
frac_part = abs(x) - int_part
multiplier = 10 ** (max_digits - magnitude)
frac_digits = multiplier + np.int(multiplier * frac_part + 0.5)
while frac_digits % 10 == 0:
frac_digits /= 10
scale = np.int(np.log10(frac_digits))
return (magnitude + scale, scale)
[docs]def alphanum_key(s: str) -> list:
"""Helper tool to sort lists in ascending numerical order (natural sorting),
rather than lexicographic sorting
s : str
string to sort
Sorted list
Raj Moulik (moulik@caa.columbia.edu)
:Last Modified:
2023.02.16 5.00
return [int(c) if c.isdigit() else c for c in re.split('([0-9]+)', s)]
[docs]def diffdict(first_dict: dict,second_dict: dict) -> dict:
"""Helper tool to get difference in two dictionaries
first_dict : dict
First dictionary
second_dict : dict
Second dictionary
Raj Moulik (moulik@caa.columbia.edu)
:Last Modified:
2023.02.16 5.00
return { k : second_dict[k] for k in set(second_dict) - set(first_dict) }
#def equaldict(first_dict,second_dict):
# '''helper tool to check if two dictionaries are equal
# '''
#for k in set(realization.metadata):
# checks.extend(convert2nparray(first_dict==second_dict))
#return np.all(checks)
[docs]def df2nparray(dataframe: pd.DataFrame) -> np.ndarray:
"""Helper tool to return the named numpy array of the pandas dataframe
dataframe : pd.DataFrame
Input Pandas Dataframe
Equivalent named numpy array
Raj Moulik (moulik@caa.columbia.edu)
:Last Modified:
2023.02.16 5.00
columns = {}
for ii in range(dataframe.shape[1]): columns[ii] = str(ii)
dataframe.rename(columns = columns,inplace=True)
ra = dataframe.to_records(index=False)
return np.asarray(ra)
[docs]def krunge(n:int,x: float,
h: tp.Union[int,float],
y: tp.Union[list,tuple,np.ndarray],
f: tp.Union[list,tuple,np.ndarray],
m: int = 0,
phi: np.ndarray = np.zeros(6),
savey: np.ndarray = np.zeros(6)):
"""Some sort of integration or interpolation?
x is incremented on second and fourth calls. Resets itself after 5'th call.
n : int
number of points
x : float
indepent variable for points
h : tp.Union[int,float]
step size
y : tp.Union[list,tuple,np.ndarray]
function evaluated at each point
f : tp.Union[list,tuple,np.ndarray]
function evaluated at each point
m : int, optional
call number, by default 0
phi : np.ndarray, optional
Intermediate variable, by default np.zeros(6)
savey : np.ndarray, optional
Intermediate variable, by default np.zeros(6)
raise NotImplementedError('This function has not been tested against Fortran codes')
if len(y) > 6 or len(f) > 6:
raise ValueError ("len(y) > 6 or len(f) > in krunge")
m = m + 1
if m == 1:
elif m == 2:
for j in np.arange(n):
savey[j] = y[j]
phi[j] = f[j]
y[j] = savey[j]+(0.5*h*f[j])
x = x + 0.5*h
krunge = 1
elif m == 3:
for j in np.arange(n):
phi[j] = phi[j] + (2.0*f[j])
y[j] = savey[j] + (0.5*h*f[j])
krunge = 1
elif m == 4:
for j in np.arange(n):
phi[j] = phi[j] + (2.0*f[j])
y[j] = savey[j] + (h*f[j])
x = x + 0.5*h
krunge = 1
elif m == 5:
for j in np.arange(n):
y[j] = savey[j] + (phi[j]+f[j])*h/6.0
m = 0
krunge = 0
return krunge,y,f,m,phi,savey
[docs]def firstnonspaceindex(string: str) -> tp.Tuple[int,int]:
"""Gets the first and last non-space index of a string
string : str
String to search
First and last non-space index
Raj Moulik (moulik@caa.columbia.edu)
:Last Modified:
2023.02.16 5.00
while string[ifst:ifst+1] == ' ' and ifst < ilst: ifst=ifst+1
if ilst-ifst <= 0: raise ValueError("error reading model")
return ifst,ilst
[docs]def get_fullpath(path: str) -> str:
"""Provides the full path by replacing . and ~ in path
path : str
Input file or folder path
Full path after replacing . and ~ in path
Raj Moulik (moulik@caa.columbia.edu)
:Last Modified:
2023.02.16 5.00
# Get the current directory
if path[0]=='.': path = os.path.abspath(path)
if len(path) > 2 :
if path[:2]=='..': path = os.path.abspath(path)
# If the path starts with tilde, replace with home directory
if path[0]=='~': path = os.path.expanduser(path)
# if the expanded path does not have / as first character it is from current directory
# if only file name provided append current directory
if ntpath.basename(path) == path or path[0] != '/': path = os.path.abspath('./'+path)
return path
[docs]def listfolders(path: str) -> list:
"""Return a list of directories in a path
path : str
Input file or folder path
List of directories
Raj Moulik (moulik@caa.columbia.edu)
:Last Modified:
2023.02.16 5.00
dirs = [d for d in os.listdir(path) if os.path.isdir(os.path.join(path, d))]
return dirs
[docs]def get_installdir(module: str = 'avni',
checkwrite: bool = True,
checkenv: bool =True) -> str:
"""Get the installation directory for any module in the current machine.
module : str, optional
Module to search for, by default 'avni'
checkwrite : bool, optional
Checks for write access to the files, by default True
checkenv : bool, optional
Checks if the directory is specified as an environment variable, by default True
Path to installation directory
Raj Moulik (moulik@caa.columbia.edu)
:Last Modified:
2023.02.16 5.00
installdir = None
if checkenv:
if os.environ.get(module+'_dir') is not None:
# print("Warning: Reading "+module+"_dir"+" from environment variables - "+installdir)
if installdir is None:
if loader is None:
installdir = os.getcwd()
print("Warning: installation directory not found for "+module+". Using current directory - "+installdir)
installdir = os.path.dirname(loader.get_filename(module))
if installdir is None:
raise ValueError("Error: Specify "+module+"_dir environment variable (export "+module+"_dir=/path) or install "+module+" module.")
if checkwrite:
if not os.access(installdir, os.W_OK):
raise PermissionError("Error: Cannot I/O to "+module+" directory "+installdir+ " due to permissions. \
Specify "+module+"_dir environment variable or chdir to a different directory with I/O access.")
return installdir
[docs]def get_filedir(module: str = 'avni',
subdirectory: tp.Union[None,str] = None,
checkwrite: bool = True,
makedir: bool = True) -> str:
"""Get the local files directory for any module in the current machine.
module : str, optional
Module to search for, by default 'avni'
subdirectory : tp.Union[None,str], optional
A directory inside the main directory, by default None
checkwrite : bool, optional
Checks for write access to the files, by default True
makedir : bool, optional
Make a new directory if it doesn't exist, by default True
Path to local files directory
Raj Moulik (moulik@caa.columbia.edu)
:Last Modified:
2023.02.16 5.00
installdir = get_installdir(module=module,checkwrite=checkwrite)
filedir = os.path.join(installdir,constants.localfilefolder)
if subdirectory is not None: filedir = os.path.join(filedir,subdirectory)
if checkwrite and makedir:
if not os.path.exists(filedir): os.makedirs(filedir)
return filedir
[docs]def get_cptdir(module: str = 'avni',
checkwrite: bool = True,
makedir: bool = True) -> str:
"""Get the directory with color palettes for any module in the current machine.
module : str, optional
Module to search for, by default 'avni'
checkwrite : bool, optional
Checks for write access to the files, by default True
makedir : bool, optional
Make a new directory if it doesn't exist, by default True
Path to local CPT color palette directory
Raj Moulik (moulik@caa.columbia.edu)
:Last Modified:
2023.02.16 5.00
return get_filedir(module=module,subdirectory=constants.cptfolder,
[docs]def get_configdir(module: str = 'avni',
checkwrite: bool = True,
makedir: bool = True) -> str:
"""Get the directory containing configuration files.
module : str, optional
Module to search for, by default 'avni'
checkwrite : bool, optional
Checks for write access to the files, by default True
makedir : bool, optional
Make a new directory if it doesn't exist, by default True
Path to local config directory as specified in :py:func:`constants.configfolder`
Raj Moulik (moulik@caa.columbia.edu)
:Last Modified:
2023.02.16 5.00
installdir = get_installdir(module=module,checkwrite=checkwrite)
configdir = installdir+'/'+constants.configfolder
if checkwrite and makedir:
if not os.path.exists(configdir):
return configdir
[docs]def get_projections(checkwrite: bool = True,
makedir: bool = True,
types: str = 'radial') -> tp.Tuple[str,bool]:
"""Get the file containing projection matrices.
checkwrite : bool, optional
Checks for write access to the files, by default True
makedir : bool, optional
Make a new directory if doesn't exist, by default True
types : str, optional
Type can be radial or lateral, by default 'radial'
First entry is location of projection file
Second end is whether this file already exists in the file system
Raj Moulik (moulik@caa.columbia.edu)
:Last Modified:
2023.02.16 5.00
if types != 'radial' and types != 'lateral':
raise ValueError('types is undefined in get_projections')
configdir = get_configdir(checkwrite=checkwrite,makedir=makedir)
projections = configdir+'/projections.'+types+'.npz'
exists = os.path.isfile(projections)
return projections,exists
[docs]def writejson(nparray: np.ndarray,
filename: str,
encoding: str = 'utf-8'):
"""Writes a json file from a numpy array
nparray : np.ndarray
Input numpy array to write to JSON
filename : str
Output JSON file name
encoding : str, optional
File encoding, by default 'utf-8'
Raj Moulik (moulik@caa.columbia.edu)
:Last Modified:
2023.02.16 5.00
listarray = nparray.tolist() # nested lists with same data, indices
json.dump(listarray, codecs.open(filename, 'w', encoding=encoding), separators=(',', ':'), sort_keys=True, indent=4) ### this saves the array in .json format
[docs]def readjson(filename: str,encoding: str = 'utf-8') -> np.ndarray:
"""Reading from a JSON file to a numpy array
filename : str
Input JSON file name
encoding : str, optional
File encoding, by default 'utf-8'
Output numpy array
Raj Moulik (moulik@caa.columbia.edu)
:Last Modified:
2023.02.16 5.00
obj_text = codecs.open(filename, 'r', encoding=encoding).read()
listarray = json.loads(obj_text)
nparray = np.array(listarray)
return nparray
[docs]def uniquenumpyrow(a: np.ndarray):
"""Gets the unique rows from a numpy array and the indices. e.g. to get unique lat-lon values
a : np.ndarray
A numpy aray with multiple rows that needs to searched
Unique rows and their indices
Raj Moulik (moulik@caa.columbia.edu)
:Last Modified:
2023.02.16 5.00
b = np.ascontiguousarray(a).view(np.dtype((np.void, a.dtype.itemsize * a.shape[1])))
_, idx = np.unique(b, return_index=True)
unique_a = a[idx]
return unique_a,idx
[docs]def appendunits(ureg=constants.ureg,
system: str = 'mks',
unitsfile: str = get_configdir()+'/'+constants.customunits):
"""Append the custom units from unitsfile to `ureg` registry
ureg : _type_, optional
Input unit registry, by default constants.ureg
system : str, optional
Default unit system; if not the same as ureg changes it, by default 'mks'
unitsfile : str, optional
additional definitions to add to `ureg`, by default get_configdir()+'/'+constants.customunits
Raj Moulik (moulik@caa.columbia.edu)
:Last Modified:
2023.02.16 5.00
if ureg is None:
ureg = pint.UnitRegistry(system=system)
if ureg.default_system != system: ureg.default_system = system
constants.ureg = ureg
[docs]def convert2units(valstring: str):
"""Returns the value with units from a string that has units.
valstring : str
A string to convert. Only space allowed is that between value and unit.
Value with units
Raj Moulik (moulik@caa.columbia.edu)
:Last Modified:
2023.02.16 5.00
vals = valstring.split()
if len(vals) == 1: #if no unit is provided
return eval(vals[0])*constants.ureg('dimensionless')
elif len(vals) == 2: # first is value, second unit
return eval(vals[0])*constants.ureg(vals[1])
raise ValueError('only space allowed is that between value and unit')
[docs]def decimals(value: tp.Union[list,tuple,np.ndarray]) -> np.ndarray:
"""Returns the number of decimals in a set of floating point numbers.
value : tp.Union[list,tuple,np.ndarray]
Set of floating point numbers
Number of decimals in each float
Raj Moulik (moulik@caa.columbia.edu)
:Last Modified:
2023.02.16 5.00
value = convert2nparray(value)
output = np.zeros(len(value), dtype=int)
for indx, val in enumerate(value):
d = decimal.Decimal(str(val))
output[indx] = abs(d.as_tuple().exponent)
return output if len(output) > 1 else output[0]