Source code for filTools

#!/usr/bin/env python
########################################################################
# Linear filtering package                                             #
#                                                                      #
# 2013, G. SERAZIN, PhD student at MEOM/LGGE Grenoble                  #
########################################################################

#======================================================================	
# Module importation
#=======================================================================
import sys
import os
try:
	import netCDF4_utils
	from netCDF4 import Dataset
except:
	print 'Package netCDF4 is not available...'
	exit()
try:
    import numpy as np
    import numpy.ma as ma
except:
	print 'Package numpy is not available...'
	exit()	
try:
	from math import *
except:
	print 'Package math is not available...'
	exit()
try:
	import scipy.special as spec
	import scipy.ndimage as im
	import scipy.signal as sig
except:
	print 'Package scipy is not available...'
	exit()		
try:	
	import pylab as plt
except:
	print 'Package matplotlib is not correctly installed...'
	exit()	

#=======================================================================
# General linear filter definition 
#=======================================================================
class filter:
	# Attributes of the mother class
	#
	# 	dimension
	# 	filter_coeffs
	#	filter_type
	#	
	def plot_response_function(self):
		'''
		Plot the response function on a grid
		'''			
		w = self.coeffs
		dim = self.dimension	
		if dim == 1:
			delta_f = 0.01
			f = np.arange(-0.5,0.5,delta_f)
			plt.figure()
			R = fourier_series_1d(w,f)
			plt.plot(f,R)
			plt.show()			 
			#Work to do
			
		elif dim == 2:	
			delta_fx = 0.01
			delta_fy = 0.01
			X = np.concatenate((np.arange(-0.5,0,delta_fx),
			                    np.arange(0,0.5+delta_fx,delta_fx)),
			                   axis=0)
			Y = np.concatenate((np.arange(-0.5,0,delta_fy),
			                    np.arange(0,0.5+delta_fy,delta_fy)),
			                   axis=0)
			f_x, f_y = np.meshgrid(X, Y)
			R = fourier_series_2d(w, f_x, f_y)
			Ix = (f_y == 0.)
			Iy = (f_x == 0.)						
			fig = plt.figure(1, figsize=(8, 8))
			# Definitions for the axes
			left, width = 0.1, 0.65
			bottom, height = 0.1, 0.65
			bottom_h = left_h = left + width + 0.02
			rect_2D = [left, bottom, width, height]
			rect_x = [left, bottom_h, width, 0.2]
			rect_y = [left_h, bottom, 0.2, height]
			ax_2D = plt.axes(rect_2D)
			ax_x = plt.axes(rect_x)
			ax_y = plt.axes(rect_y)
			ax_x.plot(f_x[Ix].squeeze(), R[Ix].squeeze())
			ax_x.grid(True)
			ax_x.set_title(r'Response function for $f_y=0$')
			ax_x.set_xlabel(r'$f_x$')
			ax_x.set_ylabel(r'$R(f_x,0)$')		
			ax_y.plot(R[Iy].squeeze(),f_y[Iy].squeeze())
			ax_y.grid(True)
			ax_x.set_title(r'Response function for $f_x=0$')	
			ax_y.set_ylabel(r'$f_y$')
			ax_y.set_xlabel(r'$R(0,f_y)$')		
			ax_2D.pcolormesh(f_x,f_y,R)
			ax_2D.set_title(r'Response function $R(f_x,f_y)$')
			ax_2D.set_ylabel(r'$f_y$')
			ax_2D.set_xlabel(r'$f_x$')
			plt.show()													
		elif dim==3:
			pass #Work to do

	def plot_weights_distribution(self):
		dim = self.dimension
		w = self.coeffs
		if dim == 1:
			pass #Work to do
		elif dim == 2:
			n_x, n_y = self.order
			fig = plt.figure()
			plt.plot(np.arange(-n_x, n_x+1),
			         (w[n_y, :]/w[n_y, n_x]).squeeze())
			plt.show()			
	
	def do_filtering1d(self, var, mode='reflect', output='valid'):
		"""
		This function make the 1D filtering of along the first dimension 
		of an array using the function convolve1d from the Scipy ndimage 
		library.
		"""
		if self.dimension != 1:
			print "Error: the dimension of the kernel must be 1"
			sys.exit(1)
		else:
			n = self.order
			var_filt = im.convolve1d(var, self.coeffs, axis=0,
			                         mode=mode)
			if output == 'valid':
				var_filt = var_filt[n:-n,]
			elif output == 'same':
				pass
		return var_filt	
		
	def do_filtering2d(self, var, mode='wrap', output='same',
	                   fast=False):
		"""
		This function make the 2D filtering 
		"""
		if self.dimension != 2:
			print "Error: the dimension of the kernel must be 2"
			sys.exit(1)
		if fast == True:			
			var_filt = im.convolve(var, self.coeffs, mode=mode)		
		else:
			var_filt = sig.convolve2d(var, self.coeffs,
			                          boundary=mode, mode=output)
		return var_filt		
	
	def do_sparse_filtering2d(self, var, bound_weights, mode='wrap',
	                          output='same', mask_cond=None, fast=False,
	                          opt='interp'):
		"""
		This function make the filtering of 2D dimensions dataset using 
		the function convolve from the Scipy ndimage library. The 
		filter weights are re-evaluated when there are missing values.
		"""
		w = self.coeffs
		n_x, n_y = self.order
		m_x, m_y = self.n_zero		
		#Copying the input variable to avoid modifications
		mask = ma.getmask(var)
		fval = var.get_fill_value()
		var_cp = ma.masked_array(var, mask, fill_value=fval)
		var_cp[mask] = 0.						
		var_int = \
		ma.array(self.do_filtering2d(var_cp, mode=mode, output=output,
		                             fast=fast),
		         mask=mask, fill_value=fval)
		if self.filter_type == 'Low pass':
			var_filt = var_int / bound_weights
		elif self.filter_type == 'High pass':
			var_filt = var_int / (1 - bound_weights)
		elif self.filter_type == 'Band pass':
			pass
			# Work to do			
		if mask_cond != None:
		# If the conditioning is too bad for filtering according to a 
		# mask precomputed, a smaller filter is used with only positive
		# weigths
			if opt == 'interp':
				k_x, k_y = np.meshgrid(np.arange(-m_x,m_x+1),
				                       np.arange(-m_y,m_y+1))
				w_trunc = w[n_x-m_x : n_x+m_x+1, n_y-m_y : n_y+m_y+1]
				w_trunc[((k_y / float(m_y))**2 + 
				        (k_x / float(m_x))**2) > 1] = 0.
				if fast == True:
					P_trunc = im.convolve(M, w_trunc, mode=mode)	
					C_trunc = im.convolve(var_cp, w_trunc, mode=mode)
				else:
					P_trunc = sig.convolve2d(M, w_trunc, mode=mode)	
					C_trunc = sig.convolve2d(var_cp, w_trunc, mode=mode)
				var_cond = C_trunc / P_trunc
				I = np.where(mask_cond == 1)
				var_filt[I] = var_cond[I]
			elif opt == 'zero':
				var_filt[mask_cond] = 0.							
		var_filt[mask == 1] = 0.
		var_filt.fill_value = fval		
		return var_filt
					
	def check_conditioning(self, mask, threshold=0.50):
		mode = 'wrap'
		w = self.coeffs
		n_x, n_y = self.order
		m_x, m_y = self.n_zero	
		M = np.array(mask, dtype='f8')
		
		#Initialisation of the kernel test
		k_x, k_y = np.meshgrid(np.arange(-m_x,m_x+1),
		                       np.arange(-m_y,m_y+1))
		w_trunc = w[n_x-m_x : n_x+m_x+1, n_y-m_y : n_y+m_y+1]
		w_trunc[((k_y/float(m_y))**2 +
		         (k_x/float(m_x))**2) > 1] = 0.
		s = np.sum(w_trunc)
		r = np.abs(s/(1-s))
		T = im.convolve(M, w_trunc, mode=mode)
		P = im.convolve(M, w, mode=mode)	
		r_cond = np.abs(T / (P - T))		
		mask_cond = ((r_cond < threshold * r) & mask)	
		return mask_cond	

	def print_properties(self):
		print "Dimension: "
		print self.dimension
		print "Filter type: "
		print self.filter_type
		print "Number of weights: "
		print self.order
		print "Matrix of coefficients: "
		print self.coeffs
		
#=======================================================================
# Lanczos filter definition
#=======================================================================
class lanczos(filter):

	def __init__(self, typ, n, fc1, fc2=0.5, shape='square'):
		"""
		Compute the weight function of a 1D,2D, 3D Lanczos filter 
		according to the size of cut-oof frequency and weight number 
		vectors
		 
		Attributes
		----------
		typ : {'lp', 'hp', 'bp'}
			'lp' for Low-pass filter
				- 'hp' for High-pass filter
				- 'bp' for Band-pass filter
		n = number of weight:
				-1D fc=[n_x]
				-2D fc=[n_x,n_y]
				-3D fc=[n_x,n_y,n_z]	
		fc1 = normalized cut-off frequency:
				-1D fc=[fc_x]
				-2D fc=[fc_x,fc_y]
				-3D fc=[fc_x,fc_y,fc_z]
		fc2 = normalized cut-off frequency in case of a Band-pass 
		      filter (same as fc1)	
		
		shape = 'square' by default or 'axi' for an axisymmetric filter
				(Only needed for 2D and 3D dimensions)
		
		"""		
		dim = np.size(n)
		self.dimension = dim
		self.filter_type = typ
		self.order = n
		self.fc1 = np.array(fc1)	
		if dim == 1:				
			if typ == 'lp':				
				self.filter_type = 'Low pass'
				fc = fc1
				k = np.arange(-n,n+1)	
				w = (np.sin(2 * pi * fc * k) / (pi * k) *
				     np.sin(pi * k / n) / (pi * k / n))
				#Particular case where k=0	
				w[n] = 2 * fc
				#Normalization of coefficients
				w = w / np.sum(w)				
			elif typ == 'hp':				
				self.filter_type = 'High pass'
				fc = fc1
				lanc = lanczos('lp', n, fc1, shape=shape)
				w = - lanc.coeffs
				#Particular case where k=0	
				w[n] = 1 - 2 * fc		
			elif typ == 'bp':		
				self.filter_type = 'Band pass'
				lanc1 = lanczos('lp', n, fc1, shape=shape)
				lanc2 = lanczos('lp', n, fc2, shape=shape)				
				w = lanc2.coeffs - lanc1.coeffs				
		elif dim == 2:
			if typ == 'lp':			
				self.filter_type = 'Low pass'
				fc_x, fc_y = fc1
				n_x, n_y = n
				#Grid definition according to the number of weights
				k_x, k_y = np.meshgrid(np.arange(-n_x, n_x + 1),
				                       np.arange(-n_y, n_y + 1))
				#Computation of the response weight on the grid
				z = np.sqrt((fc_x * k_x) ** 2 + (fc_y * k_y) ** 2)
				w_rect = 1 / z * fc_x * fc_y * spec.j1(2 * pi * z)
				w = (w_rect *
				     np.sin(pi * k_x / n_x) / (pi * k_x / n_x) * 
				     np.sin(pi * k_y / n_y) / (pi * k_y /n_y ))
				#Particular case where z=0
				w[:, n_x] = (w_rect[:, n_x] *
				             1. / (pi * k_y[:, n_x] / n_y) *
				             np.sin(pi * k_y[:, n_x] / n_y))
				w[n_y, :] = (w_rect[n_y, :] *
				             1. / (pi * k_x[n_y, :] / n_x)	*			
				             np.sin(pi * k_x[n_y, :] / n_x))
				w[n_y, n_x] = pi * fc_x * fc_y				
				if shape == 'square':
					pass
				elif shape == 'axi':					
					#Forcing the circular symmetry
					w[((k_y / float(n_y)) ** 2 + 
					   (k_x / float(n_x)) ** 2) > 1] = 0
				#Normalization of coefficients
				w = w / np.sum(w)								
				self.n_zero = np.array(np.floor(spec.jn_zeros(1, 1)[0] /
				                                (2 * pi * self.fc1)),
				                       dtype='int')				
			elif typ == 'hp':		
				self.filter_type = 'High pass'
				n_x, n_y = n
				fc_x, fc_y = fc1
				lanc = lanczos('lp', n, fc1, shape=shape)
				w = -lanc.coeffs
				w[n_y, n_x] = 1 - pi * fc_x * fc_y				
			elif typ == 'bp':
				self.filter_type = 'Band pass'
				self.fc2 = fc2
				lanc1 = lanczos('lp', n, fc1, shape=shape)
				lanc2 = lanczos('lp', n, fc2, shape=shape)				
				w = lanc2.coeffs - lanc1.coeffs
		elif dim == 3:
		#To complete
			pass			
		else : 
			print "lanczos is not able to treat such dimensions"
		self.coeffs = w


def tukeywin(n, r=None):
	dim = np.size(n)
	if r == None:
		r = 0.5
	if dim == 1:
		x = np.linspace(0, 1, n)
		w = np.ones(n)
		i1 = np.where(x < r / 2.)
		w[i1] = 0.5 * (1 + np.cos(2 * np.pi / r * 
		                               (x[i1] - r / 2.)))
		i2 = np.where((r / 2. <= x) & (x < 1 - r / 2.))
		w[i2] = 1
		i3 = np.where(1 - r / 2. <= x)
		w[i3] = 0.5 * (1 + np.cos(2 * np.pi / r *
		                               (x[i3] -1 + r / 2.)))
	elif dim == 2:
		n_x, n_y = n
		w_x = tukeywin(n_x, r=r)
		w_y = tukeywin(n_y, r=r)
		w = np.outer(w_x, w_y)	
	elif dim == 3:
		#To complete		
		pass
	else:
		raise ValueError, "Not able to process such dimension"	
	return w
	
def hanningwin(n):
	dim = np.size(n)
	if dim == 1:
		w = np.hanning(n)
	elif dim == 2:
		n_x, n_y = n
		w_x = hanningwin(n_x)
		w_y = hanningwin(n_y)
		w = np.outer(w_x, w_y)	
	elif dim == 3:
		#To complete		
		pass
	else:
		raise ValueError, "Not able to process such dimension"	
	return w		
	
	

[docs]def fourier_series_2d(w, f_x, f_y): ''' Compute the 2D Fourier series on a grid knowing the coefficients from a 2D matrix ''' l_y, l_x = np.shape(w) n_y = (l_y - 1) / 2 n_x = (l_x - 1) / 2 R = np.zeros(np.shape(f_x)) k_x, k_y = np.meshgrid(np.arange(-n_x, n_x + 1), np.arange(-n_y, n_y + 1)) #Still looking for a vectorized version of the loops... for i in range( 2 * n_y + 1): for j in range(2 * n_x + 1): #Rebuilding the 2D Fourier series R = R + w[i, j] * np.exp(2 * pi * 1j * (k_x[i, j] * f_x + k_y[i, j] * f_y)) R = np.real(R) #Eliminate the imaginary part which is non physical return R
[docs]def fourier_series_1d(w, f): ''' Compute the 1D Fourier series on a grid knowing the coefficients from a 2D matrix ''' l = np.size(w) n = (l-1)/2 R=np.zeros(np.size(f)) k=np.arange(-n,n+1) #Still looking for a vectorized version of the loops... for i in range(2*n+1): #Rebuilding the 2D Fourier series R=R+w[i]*np.exp(2*pi*1j*k[i]*f) R=np.real(R) #Eliminate the imaginary part which is non physical return R
if __name__ == '__main__': ''' Program test: - Compute the response function of a 1D low-pass Lanczos filter - Compute the response function of a 2D low-pass Lanczos filter using 2*n_x+1=21 and 2*n_y+1=21 weights ''' #n_x = 64 #n_y = 64 #fc_x = 0.5 #fc_y = 0.5 #LP_lanc = lanczos('lp',[n_x,n_y],fc1=[fc_x,fc_y],fc2=[0.2,0.2]) #LP_lanc=lanczos('bp',n_x,fc1=0.05,fc2=0.2) #m_x,m_y=LP_lanc.conditions() #m_x, m_y = LP_lanc.n_zero #w = LP_lanc.coeffs #k_x, k_y = np.meshgrid(np.arange(-m_x,m_x+1),np.arange(-m_y,m_y+1)) #w_trunc = w[n_x-m_x:n_x+m_x+1,n_y-m_y:n_y+m_y+1] #w_trunc[(k_y/float(m_y)) ** 2 + (k_x/float(m_x)) ** 2 > 1] = 0. #print w_trunc #LP_lanc.print_properties() #LP_lanc.plot_response_function() #LP_lanc.plot_weights_distribution() r = 0.45 n_x = 50 n_y = 50 #win = tukeywin((n_x, n_y), r) win = hanningwin((n_x, n_y)) print np.shape(win) x = np.linspace(0, 1, n_x) y = np.linspace(0, 1, n_y) Y, X = np.meshgrid(y, x) print win plt.pcolormesh(X, Y, win) plt.show()