Source code for tsTools

#!/usr/bin/python
#=======================================================================
# Library description
#=======================================================================
# 
# 

#=======================================================================
# Modules importation
#=======================================================================
import copy
try:
    import numpy as np
    import numpy.ma as ma
except:
	print 'Package numpy is not available...'
	exit()	
try:	
	import ncTools as nct
except:
	print 'Package pyClim is not correctly installed...'
	exit()
try:
	from netcdftime import utime
except:
	print 'Package netcdftime is not available...'
	exit()	
try:
	import matplotlib.pyplot as plt
	from matplotlib import mlab
except:
	print 'Package rpy2 is not available...'
	exit()
try:
	from scipy import stats
except:
	print 'Package scipy is not available...'
	exit()
try:
	import spectrum as spec
except:
	print 'Package pySpectrum is not available...'
	exit()
try:
	import rpy2.robjects as robjects
	import rpy2.robjects.numpy2ri
	# Since rpy2 version 2.3, it is necessary to activate the automatic 
	# conversion from numpy to R arrays
	if hasattr(rpy2.robjects.numpy2ri, 'activate'):
		rpy2.robjects.numpy2ri.activate()
except:
	print ('Warning: Package rpy2 is not available, some functions '
	       'will not work without this package (i.e. loess_detrend)')	
	
#=======================================================================
# Definition of constant
#=======================================================================

n_per_yr = {'1y':1, '3m':4, '1m':12, '7d':52, '5d':73, '1d':365}
d_per_t = {'1y':365, '3m':91.25, '1m':365/12., '7d':7, '5d':5, '1d':1}
u_per_d = {'days':1., '3m':4./365, 'months':1./30, 'years':1./365}
s_per_d = 24 * 60 * 60


class timeseries():
	
#=======================================================================
# Functions definition
#=======================================================================
	def __init__(self, var, time_res='1m', time_counter=None, 
	             units='seconds since 1958-01-01 00:00:00', 
	             calendar='noleap'):
		self.var = var
		self.dim = np.shape(var)
		self.t = time_counter
		cdftime = utime(units, calendar=calendar)
		date = cdftime.num2date(time_counter)
		self.year = []
		self.month = []
		self.day = []
		for d in date:
			self.year.append(d.year)
			self.month.append(d.month)
			self.day.append(d.day)	
		self.n_t = np.size(time_counter)
		self.n_fft = self.n_t / 2 * 2
		self.l_yr = n_per_yr[time_res]	
		self.n_yr = self.n_t / self.l_yr		

#=======================================================================
# G - Get attributes functions
#=======================================================================
	def get_mean(self):
		"""
		Return the mean of the timeseries
		"""
		if not hasattr(self, 'mean'):
			self.compute_mean()
		return self.mean.filled()	 

	def get_std(self):
		"""
		Return the standard deviation of the timeseries
		"""
		if not hasattr(self, 'std'):
			self.compute_std()
		return self.std.filled()

	def get_skw(self):
		"""
		Return the skewness of the timeseries
		"""
		if not hasattr(self, 'skw'):
			self.compute_skw()
		return self.skw.filled()

	def get_kur(self):
		"""
		Return the kurtosis of the timeseries
		"""
		if not hasattr(self, 'kur'):
			self.compute_kur()
		return self.kur.filled()				

#=======================================================================
# S - Statisitical functions
#=======================================================================
	def compute_mean(self):
		"""
		Compute the mean of the timeseries
		"""
		self.mean = np.mean(self.var, axis=0)
		return self.mean
	
	def compute_std(self):
		"""
		Compute an unbiased estimator of the standard deviation 
		"""
		if not hasattr(self, 'mean'):
			self.compute_mean()
		vard2 = np.mean(self.var ** 2, axis=0)
		# Compute the biased estimator
		var_std = np.sqrt(vard2 - self.mean ** 2)
		# Get the previous estimator unbiased
		self.std = np.sqrt(self.n_t / (self.n_t - 1)) * var_std
	
	def compute_skw(self):
		"""
		Compute an unbiased estimator of the skewness
		"""			
		if not hasattr(self, 'std'):
			self.compute_std()
		vard2 = ma.mean(var ** 2, axis=0)	
		vard3 = ma.mean(var ** 3, axis=0)
		vard3.set_fill_value(self.fill_value)
		# Compute the biased estimator		
		var_skew = ((vard3 - 3 * var_mean * vard2 + 2 * var_mean ** 3) /
		            var_std ** 3)
		# Get the previous estimator unbiased
		self.skw = np.sqrt( n_t * (n_t - 1)) / (n_t - 2) * var_skew

	def compute_kur(self):
		"""
		Compute an unbiased estimator of the kurtosis
		"""
		if not hasattr(self, 'std'):
			self.compute_std()
		vard2 = ma.mean(var ** 2, axis=0)	
		vard3 = ma.mean(var ** 3, axis=0)			
		vard4 = ma.mean(var ** 4, axis=0)
		vard4.set_fill_value(self.fill_value)		
		var_kur = (vard4 - 4 * var_mean * vard3 +
		           6 * vard2 * var_mean ** 2 -
		           3 * var_mean ** 4) / var_std ** 4
		#Compute kurtosis and write in NetCDF file
		self.kur = (((n_t + 1) * (n_t - 1)) / 
		                    ((n_t - 2) * (n_t - 3)) * var_kur - 
		                    3 * (n_t - 1) ** 2 / 
		                    ((n_t - 2) * (n_t - 3))) 
		
	def clim_mean(self, fill_value=0.):
		"""
		Compute the mean seasonal cycle and its standard deviation 
		with a climatological method 
		"""
		var_yr = self.yr_reshape()
		seasonal_mean = ma.mean(var_yr, axis=1)
		var_yr2_mean = ma.mean(var_yr**2, axis=1)
		seasonal_std = ma.sqrt(var_yr2_mean - seasonal_mean ** 2)
		seasonal_mean.fill_value = fill_value
		seasonal_std.fill_value = fill_value		
		return seasonal_mean, seasonal_std
				
	def deseasonalize(self, mode='mean', normalized=False,
	                  fill_value=0.):
		"""
		Remove the seasonal component
		"""
		if mode=='mean':
			seasonal_mean, seasonal_std = \
			self.clim_mean(fill_value=fill_value)
		elif mode=='harmonic':
			#To write...
			pass
		seasonal_mean_long = ma.resize(seasonal_mean, self.dim)
		seasonal_mean_long.fill_value = fill_value
		var_deseasoned = self.var - seasonal_mean_long   
		if normalized:
			seasonal_std_long = ma.resize(seasonal_std, self.dim)
			seasonal_std_long.fill_value = fill_value			
			var_deseasoned = var_deseasoned / seasonal_std_long
		return var_deseasoned, seasonal_mean, seasonal_std

	def loess_detrend(self, span=0.99, degree=2):
		"""
		Non-linear detrending using LOESS function from R
		"""
		#Define the R objects 
		rloess = robjects.r['loess']
		fmla = robjects.Formula('y ~ x')
		env = fmla.environment
		env['x'] = self.t
		env['y'] = self.var
		self.var_detrended = np.array(rloess(fmla, span=span,degree=degree)[2])
		
	def seasonal_min_max(self):
		"""
		Determine the min and max value of each year
		"""
		var_yr = self.yr_reshape()
		var_min = np.min(var_yr, axis=0)
		var_max = np.max(var_yr, axis=0)
		return var_min, var_max
														
	def mean_deriv(self,n,m):
		"""
		Compute the mean slope using two windows with overlap
		"""
		s=np.zeros(self.dim)
		inv=np.zeros(self.dim)
		m=(m-1)/2
		for k in range(n-m-1,n_t+m-n):	
			I1=np.mean(var[k+m-n+1:k+m+1,],axis=0)
			I2=np.mean(var[k-m:k-m+n,],axis=0)		
			s[k]=I2-I1
			inv[k]=(s[k]*s[k-1]<=0 and s[k]>s[k-1])
		return s,inv		

	def yr_reshape(self):
		"""
		From a 1D-vector creates a 2D-matrix with years in columns
		"""
		t_end = self.n_yr*self.l_yr
		var = self.var[:t_end,]
		dim2 = self.dim[1:]
		dim1 = (self.l_yr,self.n_yr,)
		newdim = dim1 + dim2
		newvar = ma.reshape(var, newdim, order='F')
		return newvar
	
	#================================
	# Functions returning attributes
	#================================
	
	def get_detrended(self):
		"""
		Return, if computed, the detrended time series
		"""
		if hasattr(self,'var_detrended'):
			return self.var_detrended
		else:
			"The trend has not been computed, please compute it first."
			
	def get_trend(self):
		"""
		Return, if computed, the trend of the time series
		"""
		if hasattr(self,'var_detrended'):
			return self.var-self.var_detrended
		else:
			"The trend has not been computed, please compute it first."
				
	def compute_powerspectrum(self, NFFT=None, Fs=1, 
	                          detrend=mlab.detrend_none, 
	                          window=mlab.window_hanning, 
	                          noverlap=0, pad_to=None, 
	                          sides='default', scale_by_freq=None):
		"""
		Compute the Power Spectral Density of the time series using the 
		integrated function psd in Matplotlib .

		Arguments
		---------
		NFFT: integer
			The number of data points used in each block for the FFT.
			Must be even; a power 2 is most efficient. The default 
			value is 256. This should NOT be used to get zero padding, 
			or the scaling of the result will be incorrect. Use pad_to 
			for this instead.
		Fs: scalar
			The sampling frequency (samples per time unit). It is used 
			to calculate the Fourier frequencies, freqs, in cycles per 
			time unit. The default value is 2.
		detrend: callable
			The function applied to each segment before fft-ing, 
			designed to remove the mean or linear trend. Unlike in 
			MATLAB, where the detrend parameter is a vector, in 
			matplotlib is it a function. The pylab module defines 
			detrend_none(), detrend_mean(), and detrend_linear(), but 
			you can use a custom function as well.
		window: callable or ndarray
			A function or a vector of length NFFT. To create window 
			vectors see window_hanning(), window_none(), 
			numpy.blackman(), numpy.hamming(), numpy.bartlett(), 
			scipy.signal(), scipy.signal.get_window(), etc. The 
			default is window_hanning(). If a function is passed as 
			the argument, it must take a data segment as an argument 
			and return the windowed version of the segment.
		pad_to: integer
			The number of points to which the data segment is padded 
			when performing the FFT. This can be different from NFFT, 
			which specifies the number of data points used. While not 
			increasing the actual resolution of the psd (the minimum 
			distance between resolvable peaks), this can give more 
			points in the plot, allowing for more detail. This 
			corresponds to the n parameter in the call to fft(). The 
			default is None, which sets pad_to equal to NFFT
		scale_by_freq: boolean
			Specifies whether the resulting density values should be 
			scaled by the scaling frequency, which gives density in 
			units of Hz^-1. This allows for integration over the 
			returned frequency values. The default is True for MATLAB 
			compatibility.
		noverlap: integer
			The number of points of overlap between blocks. The 
			default value is 0 (no overlap).
		"""
		if NFFT == None:
			NFFT = self.n_fft	
		if not hasattr(self, 'std'):
			self.compute_std()
		if not hasattr(self, 'var_detrended'):
			self.loess_detrend()  
		self.periodogram, self.freqs = \
		mlab.psd(self.var_detrended / np.sqrt(self.std), NFFT=NFFT,
		         Fs=Fs, detrend=detrend, window=window,
		         noverlap=noverlap, pad_to=pad_to, sides=sides, 
		         scale_by_freq=scale_by_freq)
		print "Length of the timeseries"
		print len(self.var_detrended)
		print "Size of the periodogram"
		print len(self.periodogram)		
		print 'The power spectrum has been correctly computed'
		print "Time integral"
		print np.std((self.var_detrended/np.sqrt(self.std)))		
		print "Spectrum integral"
		print 1./2*np.mean(self.periodogram)	
		self.compute_estimated_rednoise_spectrum()
		
	def compute_variance(self):
		self.variance = np.var(self.var_detrended, axis=0)
		print "Signal variance:"
		print self.variance		

	def compute_estimated_rednoise_spectrum(self, clevel=0.95):
		AR, P, k = spec.aryule(self.var_detrended/np.sqrt(self.std), 1)
		self.mean_rednoise_spectrum = \
		spec.arma2psd(AR, NFFT=self.n_fft, rho=P,sides='centerdc')
		print "Size of the Rednoise"
		print len(self.periodogram)
		print "Rednoise integral"
		print np.mean(self.mean_rednoise_spectrum)
		self.mean_rednoise_spectrum=self.mean_rednoise_spectrum[len(self.mean_rednoise_spectrum)/2-1:]
		#Compute 95% confidence intervals
		chi22D = stats.chi2(2)
		self.higher_bound_spectrum  = (self.mean_rednoise_spectrum *                               chi22D.ppf(clevel) / 2.)
		self.lower_bound_spectrum = (self.mean_rednoise_spectrum * 
		                             1. / (chi22D.ppf(clevel) / 2.))
#
	#def compute_estimated_spectrum(self, clevel=0.95):

	
	def plot_autocorrelation(self, maxlags=None, **kwargs):
		"""
		Compute and plot the autocorrelation of the time series
		"""
		self.fig_autocorrelation = plt.figure(figsize=(11.69,8.27))
		ax = self.fig_autocorrelation.add_subplot(111)
		if maxlags == None:
			maxlags = self.n_t-1
		lags, autocorrelation,_ ,_  = \
		ax.acorr(self.var_detrended / np.sqrt(self.variance), 
		         normed=True, detrend=mlab.detrend_none,
		         usevlines=True, maxlags=maxlags, **kwargs)
		self.lags = lags[maxlags:]
		self.autocorrelation = autocorrelation[maxlags:]
	
	def plot_rawts(self,output_file=None,**kwargs):
		"""
		Plot and show the raw time series and optionally save the plot in an encapsuled postscript file.
		"""
		self.fig_rawts = plt.figure(figsize=(11.69,8.27))
		ax = self.fig_rawts.add_subplot(111)
		ax.plot(self.t,self.var_detrended/np.sqrt(self.variance),**kwargs)			
		if output_file!=None:
			plt.savefig('output_file.eps')
		plt.show()				


#-------------------------  Wavelet Analysis ---------------------------
def morlet_FT(f,f0=6):
    """Fourier transform of the approximate Morlet wavelet
    """
    return (pi**-.25)*exp(-.5*((f-f0))**2.)


def cwt_a(signal, scales, sampling_scale = 1.0, wavelet=morlet_FT):
    """ Continuous wavelet transform via fft. Scales version.  """
    signal_ft = fft(signal)                     # FFT of the signal
    W = numpy.zeros((len(scales), len(signal)),'complex') # create the matrix beforehand
    ftfreqs = 2*pi*fftfreq(len(signal), sampling_scale)       # FFT frequencies
    # Now fill in the matrix
    for n,s in enumerate(scales):
        psi_ft_bar = numpy.conjugate(wavelet(ftfreqs*s))
        W[n,:] =  (s*2*pi/sampling_scale)**(1./2.) * ifft(signal_ft * psi_ft_bar)
        #W[n,:] = ifft(signal_ft * psi_ft_bar)
        #W[n,:] =  (sampling_scale/s)**(1./2.) * ifft(signal_ft * psi_ft_bar)
    return W



#=======================================================================
# Wavelet analysis
#=======================================================================
class cross_cwt:
      def __init__(self, tvar1, tvar2):
          test = (tvar1._t0==tvar2._t0)
          test*= (tvar1.freq==tvar2.freq)
          test*= (len(tvar1)==len(tvar2))
          test*= (tvar1.time_unit==tvar2.time_unit)
          if not(test):
              print 'not the same time span.'
              return
          else:
              self.var1 = tvar1
              self.var2 = tvar2
              self.cwt1 = cwt_get(tvar1)
              self.cwt2 = cwt_get(tvar2)
       
      def compute(self):
          self.cwt1.compute()
          self.cwt2.compute()
          self.cross = self.cwt1.cwt * self.cwt2.cwt.conjugate()
          self.power = abs(self.cross)**2.
          self.phase =  _complex2phase(self.cross)

      def plot_phase(self,dphi=30,phi=0,color='blue',show=True): 
          var = self.phase
          #varp = numpy.ma.masked_where(abs(var)>d,abs(var))
          #varm = numpy.ma.masked_where(abs(var-180)>d,abs(var)) 
          #mcmap = pylab.cm.gist_ncar_r
          var = abs(var)
          tabtime,tabscales = pylab.meshgrid(self.cwt1.time,self.cwt1.scales)
          #pylab.(tabtime,-log2(tabscales),var,[0,d],colors=['blue'])
          #pylab.contourf(tabtime,-log2(tabscales),var,[180-d,180],colors=['red'])
          pylab.contourf(tabtime,-log2(tabscales),var,[phi-dphi,phi+dphi],colors=[color])
          s2f = lambda x: 2**(-x)
          stix = pylab.yticks()[0][1:-1]
          pylab.ylim(stix[0],stix[-1]+1)
          pylab.yticks(stix,_array2ticks(stix,s2f))
          pylab.grid('on')
          pylab.ylabel('Period')
          #- plot the cone of influence (avoid edge effect)
          ts = lambda s: numpy.sqrt(2) * s # wavelet dependant
          dtdeb = abs(tabtime-tabtime[0,0])
          dtend = abs(tabtime-tabtime[0,-1])
          coi = (dtdeb-ts(tabscales)>0)*(dtend-ts(tabscales)>0)
          coicont = [-numpy.Inf,0]
          colors = ['white']
          pylab.contourf(tabtime,-log2(tabscales),coi,coicont, \
                         colors=colors)#,cmap=coicmap)
          if show:
             pylab.show()
             
[docs]def morlet_FT(f, f0=6): """ Fourier transform of the approximate Morlet wavelet """ out = (np.pi ** -.25) * np.exp(-.5 *((f - f0)) ** 2.) return
[docs]def cwt_a(signal, scales, sampling_scale=1.0, wavelet=morlet_FT): """ Continuous wavelet transform via fft. Scales version. """ signal_ft = fft(signal) # FFT of the signal W = numpy.zeros((len(scales), len(signal)),'complex') # create the matrix beforehand ftfreqs = 2*pi*fftfreq(len(signal), sampling_scale) # FFT frequencies # Now fill in the matrix for n,s in enumerate(scales): psi_ft_bar = numpy.conjugate(wavelet(ftfreqs*s)) W[n,:] = (s*2*pi/sampling_scale)**(1./2.) * ifft(signal_ft * psi_ft_bar) return W
class cwt_get: def __init__(self, data=None, dj=0.125, dispunit=None): self._dispunit = dispunit if self._dispunit is None: self.dt = data._ups else: self.dt = (float(d_per_t[data.freq]) / float(d_per_t[self._dispunit])) self.dispunit_per_timeunit = self.dt / data._ups self.dj = dj # wavelet dependant # wavelet dependant (the equiv fourier period is 2* dt) self.s0 = 2. * self.dt self.data = data self.scales = self.set_scales() self.waveletf = lambda x: morlet_FT(x) self.time = data.time_axis def set_scales(self): jmax = numpy.int(log2(float(len(self.data)) * self.dt / self.s0)/(self.dj)) js = arange(jmax) return self.s0 * 2 ** (js*self.dj) def compute(self): self.cwt = cwt_a(self.data, self.scales, sampling_scale = self.dt, wavelet=self.waveletf)
[docs]def relative_entropy(ts1, ts2): """ Compute the relative entropy between two time series, assuming they follow a normal distribution """ pass
[docs]def expfit(x, y, rcond=None, full=False, w=None, cov=False): """ Fit an exponential polynomial """ return np.polyfit(x, np.log(y), 1, rcond=rcond, full=ful, w=w, cov=cov)