Source code for avni.tools.io

#!/usr/bin/env python

#####################  IMPORT STANDARD MODULES   #########################

# python 3 compatibility
from __future__ import absolute_import, division, print_function, unicode_literals
import sys
if (sys.version_info[:2] < (3, 0)):
    from builtins import tuple

import numpy as np
import gc
import warnings
from scipy import sparse
import h5py

#######################################################################################

[docs]def close_h5py(): """Close all h5py files :Authors: Raj Moulik (moulik@caa.columbia.edu) :Last Modified: 2023.02.16 5.00 """ for obj in gc.get_objects(): # Browse through ALL objects if isinstance(obj, h5py.File): # Just HDF5 files try: obj.close() except: warnings.warn('Warning: HDF5 files already closed') pass # Was already closed
[docs]def store_sparse_hdf(h5f,varname: str,mat,compression: str = "gzip"): """Store a `csr` matrix in HDF5 Parameters ---------- h5f HDF5 file handle varname : str node prefix in HDF5 hierarchy mat : scipy.sparse.csr.csr_matrix sparse matrix to be stored compression : str, optional Compression type in HDF5, by default "gzip" :Authors: Raj Moulik (moulik@caa.columbia.edu) :Last Modified: 2023.02.16 5.00 """ # Check the vector type msg = "This code only works for csr matrices" if not mat.__class__ == sparse.csr.csr_matrix: raise AssertionError(msg) try: # Try loading the sparse array if it exists mat_original = load_sparse_hdf(h5f,varname) mat_write = sparse.vstack([mat_original,mat]) del(h5f[varname]) except KeyError: mat_write = mat # Write to a file for par in ('data', 'indices', 'indptr', 'shape'): arr = np.array(getattr(mat_write, par)) h5f.create_dataset(varname+'/'+par, data=arr, compression=compression)
[docs]def load_sparse_hdf(h5f,varname: str): """Load a `csr` matrix from HDF5 file Parameters ---------- h5f HDF5 file handle varname : str node prefix in HDF5 hierarchy Returns ------- scipy.sparse.csr.csr_matrix A sparse `csr` matrix :Authors: Raj Moulik (moulik@caa.columbia.edu) :Last Modified: 2023.02.16 5.00 """ # Check the vector type pars = [] for par in ('data', 'indices', 'indptr', 'shape'): pars.append(h5f[varname][par].value) m = sparse.csr_matrix(tuple(pars[:3]), shape=pars[3]) return m
[docs]def store_numpy_hdf(h5f,varname: str,array: np.ndarray,compression: str = "gzip", compression_opts: int = 9): """Store a named numpy array in HDF5 file Parameters ---------- h5f HDF5 file handle varname : str node prefix in HDF5 hierarchy array : np.ndarray Named numpy array compression : str, optional Compression type in HDF5, by default "gzip" compression_opts : int, optional Compression level opts, by default 9 :Authors: Raj Moulik (moulik@caa.columbia.edu) :Last Modified: 2023.02.16 5.00 """ # Check if it is a named numpy array if not isinstance(array, np.ndarray) : raise ValueError('Only numpy arrays can be stored with store_numpy_hdf') if array.dtype.names is None: raise ValueError('Only named numpy arrays are allowed') else: fields = np.array(array.dtype.names,dtype='a15') try: # Try loading the sparse array if it exists arr_original = load_numpy_hdf(h5f,varname) arr_write = np.hstack([arr_original,array]) del(h5f[varname]) print('Warning: appending to existing field: '+varname) except: arr_write = array # Write the file h5f.create_dataset(varname+'/fields',data=fields,compression=compression, compression_opts=compression_opts) for field in fields: # if string, change to utf for python2/3 compatibility if arr_write[field].dtype.kind == 'S' or arr_write[field].dtype.kind == 'U': outarr=np.array(arr_write[field].tolist(),dtype='a'+str(arr_write[field].dtype.itemsize)) h5f.create_dataset(varname+'/columns/'+field, data=outarr,compression=compression, compression_opts=compression_opts) else: h5f.create_dataset(varname+'/columns/'+field, data=arr_write[field], compression=compression, compression_opts=compression_opts)
[docs]def load_numpy_hdf(h5f,varname: str) -> np.ndarray: """Read a named numpy array from HDF5 file Parameters ---------- h5f HDF5 file handle varname : str node prefix in HDF5 hierarchy Returns ------- np.ndarray Named numpy array :Authors: Raj Moulik (moulik@caa.columbia.edu) :Last Modified: 2023.02.16 5.00 """ if (sys.version_info[:2] > (3, 0)): names = h5f[varname]['fields'].value else: names = [name.decode('utf-8') for name in h5f[varname]['fields'].value] formats = [h5f[varname]['columns'][field].dtype.kind+ str(h5f[varname]['columns'][field].dtype.itemsize) for field in names] dt = {'names':names, 'formats':formats} output = np.zeros(h5f[varname]['columns'][names[0]].value.shape, dtype=dt) for field in names: output[field]=h5f[varname]['columns'][field].value return output