#!/usr/bin/python
#=======================================================================
# Library description
#=======================================================================
#
#
#=======================================================================
# Modules importation
#=======================================================================
import copy
try:
import numpy as np
import numpy.ma as ma
except:
print 'Package numpy is not available...'
exit()
try:
import ncTools as nct
except:
print 'Package pyClim is not correctly installed...'
exit()
try:
from netcdftime import utime
except:
print 'Package netcdftime is not available...'
exit()
try:
import matplotlib.pyplot as plt
from matplotlib import mlab
except:
print 'Package rpy2 is not available...'
exit()
try:
from scipy import stats
except:
print 'Package scipy is not available...'
exit()
try:
import spectrum as spec
except:
print 'Package pySpectrum is not available...'
exit()
try:
import rpy2.robjects as robjects
import rpy2.robjects.numpy2ri
# Since rpy2 version 2.3, it is necessary to activate the automatic
# conversion from numpy to R arrays
if hasattr(rpy2.robjects.numpy2ri, 'activate'):
rpy2.robjects.numpy2ri.activate()
except:
print ('Warning: Package rpy2 is not available, some functions '
'will not work without this package (i.e. loess_detrend)')
#=======================================================================
# Definition of constant
#=======================================================================
n_per_yr = {'1y':1, '3m':4, '1m':12, '7d':52, '5d':73, '1d':365}
d_per_t = {'1y':365, '3m':91.25, '1m':365/12., '7d':7, '5d':5, '1d':1}
u_per_d = {'days':1., '3m':4./365, 'months':1./30, 'years':1./365}
s_per_d = 24 * 60 * 60
class timeseries():
#=======================================================================
# Functions definition
#=======================================================================
def __init__(self, var, time_res='1m', time_counter=None,
units='seconds since 1958-01-01 00:00:00',
calendar='noleap'):
self.var = var
self.dim = np.shape(var)
self.t = time_counter
cdftime = utime(units, calendar=calendar)
date = cdftime.num2date(time_counter)
self.year = []
self.month = []
self.day = []
for d in date:
self.year.append(d.year)
self.month.append(d.month)
self.day.append(d.day)
self.n_t = np.size(time_counter)
self.n_fft = self.n_t / 2 * 2
self.l_yr = n_per_yr[time_res]
self.n_yr = self.n_t / self.l_yr
#=======================================================================
# G - Get attributes functions
#=======================================================================
def get_mean(self):
"""
Return the mean of the timeseries
"""
if not hasattr(self, 'mean'):
self.compute_mean()
return self.mean.filled()
def get_std(self):
"""
Return the standard deviation of the timeseries
"""
if not hasattr(self, 'std'):
self.compute_std()
return self.std.filled()
def get_skw(self):
"""
Return the skewness of the timeseries
"""
if not hasattr(self, 'skw'):
self.compute_skw()
return self.skw.filled()
def get_kur(self):
"""
Return the kurtosis of the timeseries
"""
if not hasattr(self, 'kur'):
self.compute_kur()
return self.kur.filled()
#=======================================================================
# S - Statisitical functions
#=======================================================================
def compute_mean(self):
"""
Compute the mean of the timeseries
"""
self.mean = np.mean(self.var, axis=0)
return self.mean
def compute_std(self):
"""
Compute an unbiased estimator of the standard deviation
"""
if not hasattr(self, 'mean'):
self.compute_mean()
vard2 = np.mean(self.var ** 2, axis=0)
# Compute the biased estimator
var_std = np.sqrt(vard2 - self.mean ** 2)
# Get the previous estimator unbiased
self.std = np.sqrt(self.n_t / (self.n_t - 1)) * var_std
def compute_skw(self):
"""
Compute an unbiased estimator of the skewness
"""
if not hasattr(self, 'std'):
self.compute_std()
vard2 = ma.mean(var ** 2, axis=0)
vard3 = ma.mean(var ** 3, axis=0)
vard3.set_fill_value(self.fill_value)
# Compute the biased estimator
var_skew = ((vard3 - 3 * var_mean * vard2 + 2 * var_mean ** 3) /
var_std ** 3)
# Get the previous estimator unbiased
self.skw = np.sqrt( n_t * (n_t - 1)) / (n_t - 2) * var_skew
def compute_kur(self):
"""
Compute an unbiased estimator of the kurtosis
"""
if not hasattr(self, 'std'):
self.compute_std()
vard2 = ma.mean(var ** 2, axis=0)
vard3 = ma.mean(var ** 3, axis=0)
vard4 = ma.mean(var ** 4, axis=0)
vard4.set_fill_value(self.fill_value)
var_kur = (vard4 - 4 * var_mean * vard3 +
6 * vard2 * var_mean ** 2 -
3 * var_mean ** 4) / var_std ** 4
#Compute kurtosis and write in NetCDF file
self.kur = (((n_t + 1) * (n_t - 1)) /
((n_t - 2) * (n_t - 3)) * var_kur -
3 * (n_t - 1) ** 2 /
((n_t - 2) * (n_t - 3)))
def clim_mean(self, fill_value=0.):
"""
Compute the mean seasonal cycle and its standard deviation
with a climatological method
"""
var_yr = self.yr_reshape()
seasonal_mean = ma.mean(var_yr, axis=1)
var_yr2_mean = ma.mean(var_yr**2, axis=1)
seasonal_std = ma.sqrt(var_yr2_mean - seasonal_mean ** 2)
seasonal_mean.fill_value = fill_value
seasonal_std.fill_value = fill_value
return seasonal_mean, seasonal_std
def deseasonalize(self, mode='mean', normalized=False,
fill_value=0.):
"""
Remove the seasonal component
"""
if mode=='mean':
seasonal_mean, seasonal_std = \
self.clim_mean(fill_value=fill_value)
elif mode=='harmonic':
#To write...
pass
seasonal_mean_long = ma.resize(seasonal_mean, self.dim)
seasonal_mean_long.fill_value = fill_value
var_deseasoned = self.var - seasonal_mean_long
if normalized:
seasonal_std_long = ma.resize(seasonal_std, self.dim)
seasonal_std_long.fill_value = fill_value
var_deseasoned = var_deseasoned / seasonal_std_long
return var_deseasoned, seasonal_mean, seasonal_std
def loess_detrend(self, span=0.99, degree=2):
"""
Non-linear detrending using LOESS function from R
"""
#Define the R objects
rloess = robjects.r['loess']
fmla = robjects.Formula('y ~ x')
env = fmla.environment
env['x'] = self.t
env['y'] = self.var
self.var_detrended = np.array(rloess(fmla, span=span,degree=degree)[2])
def seasonal_min_max(self):
"""
Determine the min and max value of each year
"""
var_yr = self.yr_reshape()
var_min = np.min(var_yr, axis=0)
var_max = np.max(var_yr, axis=0)
return var_min, var_max
def mean_deriv(self,n,m):
"""
Compute the mean slope using two windows with overlap
"""
s=np.zeros(self.dim)
inv=np.zeros(self.dim)
m=(m-1)/2
for k in range(n-m-1,n_t+m-n):
I1=np.mean(var[k+m-n+1:k+m+1,],axis=0)
I2=np.mean(var[k-m:k-m+n,],axis=0)
s[k]=I2-I1
inv[k]=(s[k]*s[k-1]<=0 and s[k]>s[k-1])
return s,inv
def yr_reshape(self):
"""
From a 1D-vector creates a 2D-matrix with years in columns
"""
t_end = self.n_yr*self.l_yr
var = self.var[:t_end,]
dim2 = self.dim[1:]
dim1 = (self.l_yr,self.n_yr,)
newdim = dim1 + dim2
newvar = ma.reshape(var, newdim, order='F')
return newvar
#================================
# Functions returning attributes
#================================
def get_detrended(self):
"""
Return, if computed, the detrended time series
"""
if hasattr(self,'var_detrended'):
return self.var_detrended
else:
"The trend has not been computed, please compute it first."
def get_trend(self):
"""
Return, if computed, the trend of the time series
"""
if hasattr(self,'var_detrended'):
return self.var-self.var_detrended
else:
"The trend has not been computed, please compute it first."
def compute_powerspectrum(self, NFFT=None, Fs=1,
detrend=mlab.detrend_none,
window=mlab.window_hanning,
noverlap=0, pad_to=None,
sides='default', scale_by_freq=None):
"""
Compute the Power Spectral Density of the time series using the
integrated function psd in Matplotlib .
Arguments
---------
NFFT: integer
The number of data points used in each block for the FFT.
Must be even; a power 2 is most efficient. The default
value is 256. This should NOT be used to get zero padding,
or the scaling of the result will be incorrect. Use pad_to
for this instead.
Fs: scalar
The sampling frequency (samples per time unit). It is used
to calculate the Fourier frequencies, freqs, in cycles per
time unit. The default value is 2.
detrend: callable
The function applied to each segment before fft-ing,
designed to remove the mean or linear trend. Unlike in
MATLAB, where the detrend parameter is a vector, in
matplotlib is it a function. The pylab module defines
detrend_none(), detrend_mean(), and detrend_linear(), but
you can use a custom function as well.
window: callable or ndarray
A function or a vector of length NFFT. To create window
vectors see window_hanning(), window_none(),
numpy.blackman(), numpy.hamming(), numpy.bartlett(),
scipy.signal(), scipy.signal.get_window(), etc. The
default is window_hanning(). If a function is passed as
the argument, it must take a data segment as an argument
and return the windowed version of the segment.
pad_to: integer
The number of points to which the data segment is padded
when performing the FFT. This can be different from NFFT,
which specifies the number of data points used. While not
increasing the actual resolution of the psd (the minimum
distance between resolvable peaks), this can give more
points in the plot, allowing for more detail. This
corresponds to the n parameter in the call to fft(). The
default is None, which sets pad_to equal to NFFT
scale_by_freq: boolean
Specifies whether the resulting density values should be
scaled by the scaling frequency, which gives density in
units of Hz^-1. This allows for integration over the
returned frequency values. The default is True for MATLAB
compatibility.
noverlap: integer
The number of points of overlap between blocks. The
default value is 0 (no overlap).
"""
if NFFT == None:
NFFT = self.n_fft
if not hasattr(self, 'std'):
self.compute_std()
if not hasattr(self, 'var_detrended'):
self.loess_detrend()
self.periodogram, self.freqs = \
mlab.psd(self.var_detrended / np.sqrt(self.std), NFFT=NFFT,
Fs=Fs, detrend=detrend, window=window,
noverlap=noverlap, pad_to=pad_to, sides=sides,
scale_by_freq=scale_by_freq)
print "Length of the timeseries"
print len(self.var_detrended)
print "Size of the periodogram"
print len(self.periodogram)
print 'The power spectrum has been correctly computed'
print "Time integral"
print np.std((self.var_detrended/np.sqrt(self.std)))
print "Spectrum integral"
print 1./2*np.mean(self.periodogram)
self.compute_estimated_rednoise_spectrum()
def compute_variance(self):
self.variance = np.var(self.var_detrended, axis=0)
print "Signal variance:"
print self.variance
def compute_estimated_rednoise_spectrum(self, clevel=0.95):
AR, P, k = spec.aryule(self.var_detrended/np.sqrt(self.std), 1)
self.mean_rednoise_spectrum = \
spec.arma2psd(AR, NFFT=self.n_fft, rho=P,sides='centerdc')
print "Size of the Rednoise"
print len(self.periodogram)
print "Rednoise integral"
print np.mean(self.mean_rednoise_spectrum)
self.mean_rednoise_spectrum=self.mean_rednoise_spectrum[len(self.mean_rednoise_spectrum)/2-1:]
#Compute 95% confidence intervals
chi22D = stats.chi2(2)
self.higher_bound_spectrum = (self.mean_rednoise_spectrum * chi22D.ppf(clevel) / 2.)
self.lower_bound_spectrum = (self.mean_rednoise_spectrum *
1. / (chi22D.ppf(clevel) / 2.))
#
#def compute_estimated_spectrum(self, clevel=0.95):
def plot_autocorrelation(self, maxlags=None, **kwargs):
"""
Compute and plot the autocorrelation of the time series
"""
self.fig_autocorrelation = plt.figure(figsize=(11.69,8.27))
ax = self.fig_autocorrelation.add_subplot(111)
if maxlags == None:
maxlags = self.n_t-1
lags, autocorrelation,_ ,_ = \
ax.acorr(self.var_detrended / np.sqrt(self.variance),
normed=True, detrend=mlab.detrend_none,
usevlines=True, maxlags=maxlags, **kwargs)
self.lags = lags[maxlags:]
self.autocorrelation = autocorrelation[maxlags:]
def plot_rawts(self,output_file=None,**kwargs):
"""
Plot and show the raw time series and optionally save the plot in an encapsuled postscript file.
"""
self.fig_rawts = plt.figure(figsize=(11.69,8.27))
ax = self.fig_rawts.add_subplot(111)
ax.plot(self.t,self.var_detrended/np.sqrt(self.variance),**kwargs)
if output_file!=None:
plt.savefig('output_file.eps')
plt.show()
#------------------------- Wavelet Analysis ---------------------------
def morlet_FT(f,f0=6):
"""Fourier transform of the approximate Morlet wavelet
"""
return (pi**-.25)*exp(-.5*((f-f0))**2.)
def cwt_a(signal, scales, sampling_scale = 1.0, wavelet=morlet_FT):
""" Continuous wavelet transform via fft. Scales version. """
signal_ft = fft(signal) # FFT of the signal
W = numpy.zeros((len(scales), len(signal)),'complex') # create the matrix beforehand
ftfreqs = 2*pi*fftfreq(len(signal), sampling_scale) # FFT frequencies
# Now fill in the matrix
for n,s in enumerate(scales):
psi_ft_bar = numpy.conjugate(wavelet(ftfreqs*s))
W[n,:] = (s*2*pi/sampling_scale)**(1./2.) * ifft(signal_ft * psi_ft_bar)
#W[n,:] = ifft(signal_ft * psi_ft_bar)
#W[n,:] = (sampling_scale/s)**(1./2.) * ifft(signal_ft * psi_ft_bar)
return W
#=======================================================================
# Wavelet analysis
#=======================================================================
class cross_cwt:
def __init__(self, tvar1, tvar2):
test = (tvar1._t0==tvar2._t0)
test*= (tvar1.freq==tvar2.freq)
test*= (len(tvar1)==len(tvar2))
test*= (tvar1.time_unit==tvar2.time_unit)
if not(test):
print 'not the same time span.'
return
else:
self.var1 = tvar1
self.var2 = tvar2
self.cwt1 = cwt_get(tvar1)
self.cwt2 = cwt_get(tvar2)
def compute(self):
self.cwt1.compute()
self.cwt2.compute()
self.cross = self.cwt1.cwt * self.cwt2.cwt.conjugate()
self.power = abs(self.cross)**2.
self.phase = _complex2phase(self.cross)
def plot_phase(self,dphi=30,phi=0,color='blue',show=True):
var = self.phase
#varp = numpy.ma.masked_where(abs(var)>d,abs(var))
#varm = numpy.ma.masked_where(abs(var-180)>d,abs(var))
#mcmap = pylab.cm.gist_ncar_r
var = abs(var)
tabtime,tabscales = pylab.meshgrid(self.cwt1.time,self.cwt1.scales)
#pylab.(tabtime,-log2(tabscales),var,[0,d],colors=['blue'])
#pylab.contourf(tabtime,-log2(tabscales),var,[180-d,180],colors=['red'])
pylab.contourf(tabtime,-log2(tabscales),var,[phi-dphi,phi+dphi],colors=[color])
s2f = lambda x: 2**(-x)
stix = pylab.yticks()[0][1:-1]
pylab.ylim(stix[0],stix[-1]+1)
pylab.yticks(stix,_array2ticks(stix,s2f))
pylab.grid('on')
pylab.ylabel('Period')
#- plot the cone of influence (avoid edge effect)
ts = lambda s: numpy.sqrt(2) * s # wavelet dependant
dtdeb = abs(tabtime-tabtime[0,0])
dtend = abs(tabtime-tabtime[0,-1])
coi = (dtdeb-ts(tabscales)>0)*(dtend-ts(tabscales)>0)
coicont = [-numpy.Inf,0]
colors = ['white']
pylab.contourf(tabtime,-log2(tabscales),coi,coicont, \
colors=colors)#,cmap=coicmap)
if show:
pylab.show()
[docs]def morlet_FT(f, f0=6):
"""
Fourier transform of the approximate Morlet wavelet
"""
out = (np.pi ** -.25) * np.exp(-.5 *((f - f0)) ** 2.)
return
[docs]def cwt_a(signal, scales, sampling_scale=1.0, wavelet=morlet_FT):
""" Continuous wavelet transform via fft. Scales version. """
signal_ft = fft(signal) # FFT of the signal
W = numpy.zeros((len(scales), len(signal)),'complex') # create the matrix beforehand
ftfreqs = 2*pi*fftfreq(len(signal), sampling_scale) # FFT frequencies
# Now fill in the matrix
for n,s in enumerate(scales):
psi_ft_bar = numpy.conjugate(wavelet(ftfreqs*s))
W[n,:] = (s*2*pi/sampling_scale)**(1./2.) * ifft(signal_ft * psi_ft_bar)
return W
class cwt_get:
def __init__(self, data=None, dj=0.125, dispunit=None):
self._dispunit = dispunit
if self._dispunit is None:
self.dt = data._ups
else:
self.dt = (float(d_per_t[data.freq]) /
float(d_per_t[self._dispunit]))
self.dispunit_per_timeunit = self.dt / data._ups
self.dj = dj # wavelet dependant
# wavelet dependant (the equiv fourier period is 2* dt)
self.s0 = 2. * self.dt
self.data = data
self.scales = self.set_scales()
self.waveletf = lambda x: morlet_FT(x)
self.time = data.time_axis
def set_scales(self):
jmax = numpy.int(log2(float(len(self.data)) * self.dt / self.s0)/(self.dj))
js = arange(jmax)
return self.s0 * 2 ** (js*self.dj)
def compute(self):
self.cwt = cwt_a(self.data, self.scales, sampling_scale = self.dt, wavelet=self.waveletf)
[docs]def relative_entropy(ts1, ts2):
"""
Compute the relative entropy between two time series, assuming they follow a normal distribution
"""
pass
[docs]def expfit(x, y, rcond=None, full=False, w=None, cov=False):
"""
Fit an exponential polynomial
"""
return np.polyfit(x, np.log(y), 1, rcond=rcond, full=ful, w=w,
cov=cov)