#!/usr/bin/env python
########################################################################
# Linear filtering package #
# #
# 2013, G. SERAZIN, PhD student at MEOM/LGGE Grenoble #
########################################################################
#======================================================================
# Module importation
#=======================================================================
import sys
import os
try:
import netCDF4_utils
from netCDF4 import Dataset
except:
print 'Package netCDF4 is not available...'
exit()
try:
import numpy as np
import numpy.ma as ma
except:
print 'Package numpy is not available...'
exit()
try:
from math import *
except:
print 'Package math is not available...'
exit()
try:
import scipy.special as spec
import scipy.ndimage as im
import scipy.signal as sig
except:
print 'Package scipy is not available...'
exit()
try:
import pylab as plt
except:
print 'Package matplotlib is not correctly installed...'
exit()
#=======================================================================
# General linear filter definition
#=======================================================================
class filter:
# Attributes of the mother class
#
# dimension
# filter_coeffs
# filter_type
#
def plot_response_function(self):
'''
Plot the response function on a grid
'''
w = self.coeffs
dim = self.dimension
if dim == 1:
delta_f = 0.01
f = np.arange(-0.5,0.5,delta_f)
plt.figure()
R = fourier_series_1d(w,f)
plt.plot(f,R)
plt.show()
#Work to do
elif dim == 2:
delta_fx = 0.01
delta_fy = 0.01
X = np.concatenate((np.arange(-0.5,0,delta_fx),
np.arange(0,0.5+delta_fx,delta_fx)),
axis=0)
Y = np.concatenate((np.arange(-0.5,0,delta_fy),
np.arange(0,0.5+delta_fy,delta_fy)),
axis=0)
f_x, f_y = np.meshgrid(X, Y)
R = fourier_series_2d(w, f_x, f_y)
Ix = (f_y == 0.)
Iy = (f_x == 0.)
fig = plt.figure(1, figsize=(8, 8))
# Definitions for the axes
left, width = 0.1, 0.65
bottom, height = 0.1, 0.65
bottom_h = left_h = left + width + 0.02
rect_2D = [left, bottom, width, height]
rect_x = [left, bottom_h, width, 0.2]
rect_y = [left_h, bottom, 0.2, height]
ax_2D = plt.axes(rect_2D)
ax_x = plt.axes(rect_x)
ax_y = plt.axes(rect_y)
ax_x.plot(f_x[Ix].squeeze(), R[Ix].squeeze())
ax_x.grid(True)
ax_x.set_title(r'Response function for $f_y=0$')
ax_x.set_xlabel(r'$f_x$')
ax_x.set_ylabel(r'$R(f_x,0)$')
ax_y.plot(R[Iy].squeeze(),f_y[Iy].squeeze())
ax_y.grid(True)
ax_x.set_title(r'Response function for $f_x=0$')
ax_y.set_ylabel(r'$f_y$')
ax_y.set_xlabel(r'$R(0,f_y)$')
ax_2D.pcolormesh(f_x,f_y,R)
ax_2D.set_title(r'Response function $R(f_x,f_y)$')
ax_2D.set_ylabel(r'$f_y$')
ax_2D.set_xlabel(r'$f_x$')
plt.show()
elif dim==3:
pass #Work to do
def plot_weights_distribution(self):
dim = self.dimension
w = self.coeffs
if dim == 1:
pass #Work to do
elif dim == 2:
n_x, n_y = self.order
fig = plt.figure()
plt.plot(np.arange(-n_x, n_x+1),
(w[n_y, :]/w[n_y, n_x]).squeeze())
plt.show()
def do_filtering1d(self, var, mode='reflect', output='valid'):
"""
This function make the 1D filtering of along the first dimension
of an array using the function convolve1d from the Scipy ndimage
library.
"""
if self.dimension != 1:
print "Error: the dimension of the kernel must be 1"
sys.exit(1)
else:
n = self.order
var_filt = im.convolve1d(var, self.coeffs, axis=0,
mode=mode)
if output == 'valid':
var_filt = var_filt[n:-n,]
elif output == 'same':
pass
return var_filt
def do_filtering2d(self, var, mode='wrap', output='same',
fast=False):
"""
This function make the 2D filtering
"""
if self.dimension != 2:
print "Error: the dimension of the kernel must be 2"
sys.exit(1)
if fast == True:
var_filt = im.convolve(var, self.coeffs, mode=mode)
else:
var_filt = sig.convolve2d(var, self.coeffs,
boundary=mode, mode=output)
return var_filt
def do_sparse_filtering2d(self, var, bound_weights, mode='wrap',
output='same', mask_cond=None, fast=False,
opt='interp'):
"""
This function make the filtering of 2D dimensions dataset using
the function convolve from the Scipy ndimage library. The
filter weights are re-evaluated when there are missing values.
"""
w = self.coeffs
n_x, n_y = self.order
m_x, m_y = self.n_zero
#Copying the input variable to avoid modifications
mask = ma.getmask(var)
fval = var.get_fill_value()
var_cp = ma.masked_array(var, mask, fill_value=fval)
var_cp[mask] = 0.
var_int = \
ma.array(self.do_filtering2d(var_cp, mode=mode, output=output,
fast=fast),
mask=mask, fill_value=fval)
if self.filter_type == 'Low pass':
var_filt = var_int / bound_weights
elif self.filter_type == 'High pass':
var_filt = var_int / (1 - bound_weights)
elif self.filter_type == 'Band pass':
pass
# Work to do
if mask_cond != None:
# If the conditioning is too bad for filtering according to a
# mask precomputed, a smaller filter is used with only positive
# weigths
if opt == 'interp':
k_x, k_y = np.meshgrid(np.arange(-m_x,m_x+1),
np.arange(-m_y,m_y+1))
w_trunc = w[n_x-m_x : n_x+m_x+1, n_y-m_y : n_y+m_y+1]
w_trunc[((k_y / float(m_y))**2 +
(k_x / float(m_x))**2) > 1] = 0.
if fast == True:
P_trunc = im.convolve(M, w_trunc, mode=mode)
C_trunc = im.convolve(var_cp, w_trunc, mode=mode)
else:
P_trunc = sig.convolve2d(M, w_trunc, mode=mode)
C_trunc = sig.convolve2d(var_cp, w_trunc, mode=mode)
var_cond = C_trunc / P_trunc
I = np.where(mask_cond == 1)
var_filt[I] = var_cond[I]
elif opt == 'zero':
var_filt[mask_cond] = 0.
var_filt[mask == 1] = 0.
var_filt.fill_value = fval
return var_filt
def check_conditioning(self, mask, threshold=0.50):
mode = 'wrap'
w = self.coeffs
n_x, n_y = self.order
m_x, m_y = self.n_zero
M = np.array(mask, dtype='f8')
#Initialisation of the kernel test
k_x, k_y = np.meshgrid(np.arange(-m_x,m_x+1),
np.arange(-m_y,m_y+1))
w_trunc = w[n_x-m_x : n_x+m_x+1, n_y-m_y : n_y+m_y+1]
w_trunc[((k_y/float(m_y))**2 +
(k_x/float(m_x))**2) > 1] = 0.
s = np.sum(w_trunc)
r = np.abs(s/(1-s))
T = im.convolve(M, w_trunc, mode=mode)
P = im.convolve(M, w, mode=mode)
r_cond = np.abs(T / (P - T))
mask_cond = ((r_cond < threshold * r) & mask)
return mask_cond
def print_properties(self):
print "Dimension: "
print self.dimension
print "Filter type: "
print self.filter_type
print "Number of weights: "
print self.order
print "Matrix of coefficients: "
print self.coeffs
#=======================================================================
# Lanczos filter definition
#=======================================================================
class lanczos(filter):
def __init__(self, typ, n, fc1, fc2=0.5, shape='square'):
"""
Compute the weight function of a 1D,2D, 3D Lanczos filter
according to the size of cut-oof frequency and weight number
vectors
Attributes
----------
typ : {'lp', 'hp', 'bp'}
'lp' for Low-pass filter
- 'hp' for High-pass filter
- 'bp' for Band-pass filter
n = number of weight:
-1D fc=[n_x]
-2D fc=[n_x,n_y]
-3D fc=[n_x,n_y,n_z]
fc1 = normalized cut-off frequency:
-1D fc=[fc_x]
-2D fc=[fc_x,fc_y]
-3D fc=[fc_x,fc_y,fc_z]
fc2 = normalized cut-off frequency in case of a Band-pass
filter (same as fc1)
shape = 'square' by default or 'axi' for an axisymmetric filter
(Only needed for 2D and 3D dimensions)
"""
dim = np.size(n)
self.dimension = dim
self.filter_type = typ
self.order = n
self.fc1 = np.array(fc1)
if dim == 1:
if typ == 'lp':
self.filter_type = 'Low pass'
fc = fc1
k = np.arange(-n,n+1)
w = (np.sin(2 * pi * fc * k) / (pi * k) *
np.sin(pi * k / n) / (pi * k / n))
#Particular case where k=0
w[n] = 2 * fc
#Normalization of coefficients
w = w / np.sum(w)
elif typ == 'hp':
self.filter_type = 'High pass'
fc = fc1
lanc = lanczos('lp', n, fc1, shape=shape)
w = - lanc.coeffs
#Particular case where k=0
w[n] = 1 - 2 * fc
elif typ == 'bp':
self.filter_type = 'Band pass'
lanc1 = lanczos('lp', n, fc1, shape=shape)
lanc2 = lanczos('lp', n, fc2, shape=shape)
w = lanc2.coeffs - lanc1.coeffs
elif dim == 2:
if typ == 'lp':
self.filter_type = 'Low pass'
fc_x, fc_y = fc1
n_x, n_y = n
#Grid definition according to the number of weights
k_x, k_y = np.meshgrid(np.arange(-n_x, n_x + 1),
np.arange(-n_y, n_y + 1))
#Computation of the response weight on the grid
z = np.sqrt((fc_x * k_x) ** 2 + (fc_y * k_y) ** 2)
w_rect = 1 / z * fc_x * fc_y * spec.j1(2 * pi * z)
w = (w_rect *
np.sin(pi * k_x / n_x) / (pi * k_x / n_x) *
np.sin(pi * k_y / n_y) / (pi * k_y /n_y ))
#Particular case where z=0
w[:, n_x] = (w_rect[:, n_x] *
1. / (pi * k_y[:, n_x] / n_y) *
np.sin(pi * k_y[:, n_x] / n_y))
w[n_y, :] = (w_rect[n_y, :] *
1. / (pi * k_x[n_y, :] / n_x) *
np.sin(pi * k_x[n_y, :] / n_x))
w[n_y, n_x] = pi * fc_x * fc_y
if shape == 'square':
pass
elif shape == 'axi':
#Forcing the circular symmetry
w[((k_y / float(n_y)) ** 2 +
(k_x / float(n_x)) ** 2) > 1] = 0
#Normalization of coefficients
w = w / np.sum(w)
self.n_zero = np.array(np.floor(spec.jn_zeros(1, 1)[0] /
(2 * pi * self.fc1)),
dtype='int')
elif typ == 'hp':
self.filter_type = 'High pass'
n_x, n_y = n
fc_x, fc_y = fc1
lanc = lanczos('lp', n, fc1, shape=shape)
w = -lanc.coeffs
w[n_y, n_x] = 1 - pi * fc_x * fc_y
elif typ == 'bp':
self.filter_type = 'Band pass'
self.fc2 = fc2
lanc1 = lanczos('lp', n, fc1, shape=shape)
lanc2 = lanczos('lp', n, fc2, shape=shape)
w = lanc2.coeffs - lanc1.coeffs
elif dim == 3:
#To complete
pass
else :
print "lanczos is not able to treat such dimensions"
self.coeffs = w
def tukeywin(n, r=None):
dim = np.size(n)
if r == None:
r = 0.5
if dim == 1:
x = np.linspace(0, 1, n)
w = np.ones(n)
i1 = np.where(x < r / 2.)
w[i1] = 0.5 * (1 + np.cos(2 * np.pi / r *
(x[i1] - r / 2.)))
i2 = np.where((r / 2. <= x) & (x < 1 - r / 2.))
w[i2] = 1
i3 = np.where(1 - r / 2. <= x)
w[i3] = 0.5 * (1 + np.cos(2 * np.pi / r *
(x[i3] -1 + r / 2.)))
elif dim == 2:
n_x, n_y = n
w_x = tukeywin(n_x, r=r)
w_y = tukeywin(n_y, r=r)
w = np.outer(w_x, w_y)
elif dim == 3:
#To complete
pass
else:
raise ValueError, "Not able to process such dimension"
return w
def hanningwin(n):
dim = np.size(n)
if dim == 1:
w = np.hanning(n)
elif dim == 2:
n_x, n_y = n
w_x = hanningwin(n_x)
w_y = hanningwin(n_y)
w = np.outer(w_x, w_y)
elif dim == 3:
#To complete
pass
else:
raise ValueError, "Not able to process such dimension"
return w
[docs]def fourier_series_2d(w, f_x, f_y):
'''
Compute the 2D Fourier series on a grid knowing the coefficients
from a 2D matrix
'''
l_y, l_x = np.shape(w)
n_y = (l_y - 1) / 2
n_x = (l_x - 1) / 2
R = np.zeros(np.shape(f_x))
k_x, k_y = np.meshgrid(np.arange(-n_x, n_x + 1),
np.arange(-n_y, n_y + 1))
#Still looking for a vectorized version of the loops...
for i in range( 2 * n_y + 1):
for j in range(2 * n_x + 1):
#Rebuilding the 2D Fourier series
R = R + w[i, j] * np.exp(2 * pi * 1j * (k_x[i, j] * f_x +
k_y[i, j] * f_y))
R = np.real(R) #Eliminate the imaginary part which is non physical
return R
[docs]def fourier_series_1d(w, f):
'''
Compute the 1D Fourier series on a grid knowing the coefficients
from a 2D matrix
'''
l = np.size(w)
n = (l-1)/2
R=np.zeros(np.size(f))
k=np.arange(-n,n+1)
#Still looking for a vectorized version of the loops...
for i in range(2*n+1):
#Rebuilding the 2D Fourier series
R=R+w[i]*np.exp(2*pi*1j*k[i]*f)
R=np.real(R) #Eliminate the imaginary part which is non physical
return R
if __name__ == '__main__':
'''
Program test:
- Compute the response function of a 1D low-pass Lanczos filter
- Compute the response function of a 2D low-pass Lanczos filter
using 2*n_x+1=21 and 2*n_y+1=21 weights
'''
#n_x = 64
#n_y = 64
#fc_x = 0.5
#fc_y = 0.5
#LP_lanc = lanczos('lp',[n_x,n_y],fc1=[fc_x,fc_y],fc2=[0.2,0.2])
#LP_lanc=lanczos('bp',n_x,fc1=0.05,fc2=0.2)
#m_x,m_y=LP_lanc.conditions()
#m_x, m_y = LP_lanc.n_zero
#w = LP_lanc.coeffs
#k_x, k_y = np.meshgrid(np.arange(-m_x,m_x+1),np.arange(-m_y,m_y+1))
#w_trunc = w[n_x-m_x:n_x+m_x+1,n_y-m_y:n_y+m_y+1]
#w_trunc[(k_y/float(m_y)) ** 2 + (k_x/float(m_x)) ** 2 > 1] = 0.
#print w_trunc
#LP_lanc.print_properties()
#LP_lanc.plot_response_function()
#LP_lanc.plot_weights_distribution()
r = 0.45
n_x = 50
n_y = 50
#win = tukeywin((n_x, n_y), r)
win = hanningwin((n_x, n_y))
print np.shape(win)
x = np.linspace(0, 1, n_x)
y = np.linspace(0, 1, n_y)
Y, X = np.meshgrid(y, x)
print win
plt.pcolormesh(X, Y, win)
plt.show()