Source code for demodulate

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# demodulate.py
# John A. Marohn
# 2014/06/28 -- 2014/08/07

"""

We demodulate the signal in the following steps:

1. Apply a window to the signal :math:`S(t)` in order to make it
   smoothly ramp up from zero and ramp down to zero.

2. Fast Fourier Transform the windowed signal to obtain
   :math:`\\hat{S}(f)`.

3. Identify the primary oscillation frequency :math:`f_0`.  Apply a
   bandpass filter centered at :math:`f_0` to reject signals at other
   frequencies.

4. Apply a second filter which zeros out the negative-frequency
   components of :math:`\\hat{S}(f)`.

5. Apply an Inverse Fast Fourier Transform to the resulting data to
   obtain a complex signal :math:`z(t) = x(t) + \imath \: y(t)`.

6. Compute the instantaneous phase :math:`\\phi` and amplitude
   :math:`a(t)` using the following equations. Unwrap the phase.

.. math::
    :label: Eq;phi
    
    \\begin{equation}
    \\phi(t) = \\arctan{[\\frac{y(t)}{x(t)}]}
    \\end{equation}

.. math::
    :label: Eq:a
    
    \\begin{equation}
    a(t) = \\sqrt{x(t)^2 + y(t)^2}
    \\end{equation}

7. Calculate the "instantaneous" frequency :math:`f(t)` by dividing the
   instantaneous phase data into equal-time segments and fitting each
   segment to a line.  The average frequency :math:`f(t)` during each time
   segment is the slope of the respective line.

"""

from __future__ import division, print_function, absolute_import
import h5py
import numpy as np 
import scipy as sp 
import math
import time
import datetime
import warnings
from lmfit import minimize, Parameters, fit_report
from freqdemod.hdf5 import (update_attrs, check_minimum_attrs,
                            infer_missing_attrs, infer_labels)
from freqdemod.hdf5.hdf5_util import save_hdf5, h5ls
print_hdf5_item_structure = h5ls  # Alias for backward compatibility
from freqdemod.util import (timestamp_temp_filename, infer_timestep)
from collections import OrderedDict
import six
import matplotlib.pyplot as plt 

[docs]class Signal(object):
[docs] def __init__(self, filename=None, mode='w-', driver='core', backing_store=False): """ Initialize the *Signal's* hdf5 data structure. Calling with no arguments results in an in-memory only object. To save the data to disk, provide *both* a filename and ``backing_store=True``. :param str filename: the signal's filename. :param str mode: file open mode (see h5py.File) :param str driver: hdf5 driver (see h5py.File) :param bool backing_store: If True, save the file to disk. Add the following objects to the *Signal* object :param f: an h5py object stored in core memory :param str report: a string summarizing in words what has been done to the signal (e.g., "Empty signal object created") Examples:: s = Signal() # In-memory only s = Signal('not-saved.h5') # Still in-memory only s = Signal('save-to-disk.h5', backing_store=True) # Save to disk """ new_report = [] if filename is not None: try: self.f = h5py.File(filename, mode=mode, driver=driver, backing_store=backing_store) except IOError as e: print("IOError: {}".format(e.message)) if 'file exists' in e.message or 'Unable to open file' in e.message: error_msg = """\ {0} already exists. Fix this error by either:\n 1. Change the filename to a filename that doesn't exist. 2. Remove the file: os.remove('{0}')".format(filename) 3. Explicitly overwrite the file with mode='w'. Run: Signal('{0}', mode='w', driver={1}, backing_store={2})""".format(filename, driver, backing_store) raise IOError(error_msg) else: raise else: filename = timestamp_temp_filename('.h5') self.f = h5py.File(filename, driver='core', backing_store=False) today = datetime.datetime.today() attrs = OrderedDict([ \ ('date',today.strftime("%Y-%m-%d")), ('time',today.strftime("%H:%M:%S")), ('h5py_version',h5py.__version__), ('source','demodulate.py'), ('help','Sinusoidally oscillating signal and workup') ]) update_attrs(self.f.attrs,attrs) new_report.append("HDF5 file {0} created in core memory".format(filename)) self.report = [] self.report.append(" ".join(new_report))
[docs] def load_nparray(self, s, s_name, s_unit, dt, s_help='cantilever displacement'): """ Create a *Signal* object from the following inputs. :param s: the signal *vs* time :type s: np.array or list :param str s_name: the signal's name :param str s_name: the signal's units :param float dt: the time per point [s] :param str s_help: the signal's help string Add the following objects to the *Signal* object :param h5py.File f: an h5py object stored in core memory :param str report: a string summarizing in words what has been done to the signal """ s = np.atleast_1d(s) self.f['x'] = dt * np.arange(s.size) attrs = OrderedDict([ ('name','t'), ('unit','s'), ('label','t [s]'), ('label_latex','$t \: [\mathrm{s}]$'), ('help','time'), ('initial',0.0), ('step',dt) ]) update_attrs(self.f['x'].attrs, attrs) self.f['y'] = s attrs = OrderedDict([ ('name',s_name), ('unit',s_unit), ('label','{0} [{1}]'.format(s_name,s_unit)), ('label_latex','${0} \: [\mathrm{{{1}}}]$'.format(s_name,s_unit)), ('help', s_help), ('abscissa','x'), ('n_avg',1) ]) update_attrs(self.f['y'].attrs, attrs) new_report = [] new_report.append("Add a signal {0}[{1}]".format(s_name,s_unit)) new_report.append("of length {0},".format(np.array(s).size)) new_report.append("time step {0:.3f} us,".format(1E6*dt)) new_report.append("and duration {0:.3f} s".format(np.array(s).size*dt)) self.report.append(" ".join(new_report))
[docs] def load_hdf5(self, f, s_dataset='y', t_dataset='x', infer_dt=True, infer_attrs=True,): """Load an hdf5 file saved with default freqdemod attributes. :param f: A filename, or h5py file or group object, which contains t_dataset and s_dataset :param str s_dataset: signal dataset name (relative to h5object) :param str t_dataset: time dataset name (relative to h5object) :param bool infer_dt: If True, infer the time step dt from the contents of the t_dataset :param bool infer_attrs: If True, fill in any missing attributes used by freqdemod.""" if isinstance(f, six.string_types): with h5py.File(f, 'r') as fh: self._load_hdf5_default(fh, s_dataset=s_dataset, t_dataset=t_dataset, infer_dt=infer_dt, infer_attrs=infer_attrs) else: self._load_hdf5_default(f, s_dataset=s_dataset, t_dataset=t_dataset, infer_dt=infer_dt, infer_attrs=infer_attrs)
[docs] def load_hdf5_general(self, f, s_dataset, s_name, s_unit, t_dataset=None, dt=None, s_help='cantilever displacement'): """Load data from an arbitrarily formatted hdf5 file. :param f: A filename, or h5py file or group object, which contains s_dataset :param str s_dataset: signal dataset name (relative to h5object) :param str s_name: the signal's name :param str s_unit: the signal's units :param str t_dataset: time dataset name (optional; or specify dt) :param float dt: the time per point [s] :param str s_help: the signal's help string """ if isinstance(f, six.string_types): with h5py.File(f, 'r') as fh: self._load_hdf5_general(fh, s_dataset=s_dataset, s_name=s_name, s_unit=s_unit, t_dataset=t_dataset, dt=dt, s_help=s_help) else: self._load_hdf5_general(f, s_dataset=s_dataset, s_name=s_name, s_unit=s_unit, t_dataset=t_dataset, dt=dt, s_help=s_help)
[docs] def close(self): """Update report, close the file. This will write the file to disk if backing_store=True was used to create the file.""" self.f.attrs['report'] = "\n".join(self.report) self.f.close()
[docs] def save(self, dest, save='time_workup', overwrite=False): """Save the current signal object to a new hdf5 file, with control over what datasets to save. :param dest: Copy destination. A filename, or h5py file or group object. :param save: A string describing the datasets to be saved, or a list of groups / datasets to save. all: All datasets and groups input: x, y input_no_t: y time_workup: x, y, workup/time time_workup_no_t: y, workup/time time_workup_no_s: workup/time fit_phase: x, y, workup/fit fit_phase_no_t: y, workup/fit fit_phase_no_s: workup/fit :param overwrite: If true, overwrite an existing destination file. """ self.f.attrs['report'] = "\n".join(self.report) save_options = {'all': self.f.keys(), 'input': ['x', 'y'], 'input_no_t': ['y'], 'time_workup': ['x', 'y', 'workup/time'], 'time_workup_no_t': ['y', 'workup/time'], 'time_workup_no_s': ['workup/time'], 'fit_phase': ['x', 'y', 'workup/fit'], 'fit_phase_no_t': ['y', 'workup/fit'], 'fit_phase_no_s': ['workup/fit'], } if isinstance(save, six.string_types): requested_datasets = save_options[save] # If save is a string, make sure specified datasets actually exist datasets = [dset for dset in requested_datasets if dset in self.f] if datasets != requested_datasets: warnings.warn( "Datasets {} missing, will not be saved.".format( set(requested_datasets) - set(datasets)) ) else: # If save is a list, save all datasets in list datasets = save if isinstance(dest, six.string_types): mode = 'w' if overwrite else 'w-' f_dst = h5py.File(dest, mode=mode) else: f_dst = dest save_hdf5(self.f, f_dst, datasets) # Copy top level attributes over by hand update_attrs(f_dst.attrs, self.f.attrs) if isinstance(dest, six.string_types): f_dst.close()
[docs] def open(self, filename): """Open the file for reading and writing.""" self.f = h5py.File(filename, 'r+') self.report = self.f.attrs['report'].split("\n")
[docs] def plot(self, ordinate, LaTeX=False, component='abs'): """ Plot a component of the *Signal* object. :param str ordinate: the name the y-axis data key :param str component: `abs` (default), `real`, `imag`, or `both`; if the dataset is complex, which component do we plot :param boolean LaTeX: use LaTeX axis labels; ``True`` or ``False`` (default) Plot ``self.f[ordinate]`` versus self.f[y.attrs['abscissa']] If the data is complex, plot the absolute value. """ # Get the x and y axis. y = self.f[ordinate] x = self.f[y.attrs['abscissa']] # Posslby use tex-formatted axes labels temporarily for this plot # and compute plot labels old_param = plt.rcParams['text.usetex'] if LaTeX == True: plt.rcParams['text.usetex'] = True x_label_string = x.attrs['label_latex'] y_label_string = y.attrs['label_latex'] elif LaTeX == False: plt.rcParams['text.usetex'] = False x_label_string = x.attrs['label'] y_label_string = y.attrs['label'] title_string = "{0} vs. {1}".format(y.attrs['help'],x.attrs['help']) # Create the plot. If the y-axis is complex, then # plot the abs() of it. To use the abs() method, we need to force the # HDF5 dataset to be of type np.ndarray fig=plt.figure(facecolor='w') if isinstance(y[0],complex) == True: if component == 'abs': plt.plot(x,abs(np.array(y))) y_label_string = "abs of {}".format(y_label_string) if component == 'real': plt.plot(x,(np.array(y)).real) y_label_string = "real part of {}".format(y_label_string) if component == 'imag': plt.plot(x,(np.array(y)).imag) y_label_string = "imag part of {}".format(y_label_string) if component == 'both': plt.plot(x,(np.array(y)).real) plt.plot(x,(np.array(y)).imag) y_label_string = "real and imag part of {}".format(y_label_string) else: plt.plot(x,y) # axes limits and labels plt.xlabel(x_label_string) plt.ylabel(y_label_string) plt.title(title_string) # set text spacing so that the plot is pleasing to the eye plt.locator_params(axis = 'x', nbins = 4) plt.locator_params(axis = 'y', nbins = 4) fig.subplots_adjust(bottom=0.15,left=0.12) # clean up label spacings, show the plot, and reset the tex option fig.subplots_adjust(bottom=0.15,left=0.12) plt.show() plt.rcParams['text.usetex'] = old_param
[docs] def time_mask_binarate(self, mode): """ Create a masking array of ``True``/``False`` values that can be used to mask an array so that an array is a power of two in length, as required to perform a Fast Fourier Transform. :param str mode: "start", "middle", or "end" With "start", the beginning of the array will be left intact and the end truncated; with "middle", the array will be shortened symmetically from both ends; and with "end" the end of the array will be left intact while the beginning of the array will be chopped away. Create the mask in the following ``np.array``: :param bool self.f['workup/time/mask/binarate']: the mask; a ``np.array`` of boolean values This boolean array of ``True`` and ``False`` values can be plotted directly -- ``True`` is converted to 1.0 and ``False`` is converted to 0 by the ``matplotlib`` plotting function. """ n = self.f['y'].size # number of points, n, in the signal indices = np.arange(n) # np.array of indices # nearest power of 2 to n n2 = int(math.pow(2,int(math.floor(math.log(n, 2))))) if mode == "middle": n_start = int(math.floor((n - n2)/2)) n_stop = int(n_start + n2) elif mode == "start": n_start = 0 n_stop = n2 elif mode == "end": n_start = n-n2 n_stop = n mask = (indices >= n_start) & (indices < n_stop) dset = self.f.create_dataset('workup/time/mask/binarate',data=mask) attrs = OrderedDict([ ('name','mask'), ('unit','unitless'), ('label','masking function'), ('label_latex','masking function'), ('help','mask to make data a power of two in length'), ('abscissa','x') ]) update_attrs(dset.attrs,attrs) x = self.f['x'] x_binarated = x[mask] dset = self.f.create_dataset('workup/time/x_binarated',data=x_binarated) attrs = OrderedDict([ ('name','t_masked'), ('unit','s'), ('label','t [s]'), ('label_latex','$t \: [\mathrm{s}]$'), ('help','time'), ('initial', x_binarated[0]), ('step', x_binarated[1]-x_binarated[0]) ]) update_attrs(dset.attrs,attrs) new_report = [] new_report.append("Make an array, workup/time/mask/binarate, to be used") new_report.append("to truncate the signal to be {0}".format(n2)) new_report.append("points long (a power of two). The truncated array") new_report.append("will start at point {0}".format(n_start)) new_report.append("and stop before point {0}.".format(n_stop)) self.report.append(" ".join(new_report))
[docs] def time_window_cyclicize(self, tw): """ Create a windowing function with :param float tw: the window's target rise/fall time [s] The windowing function is a concatenation of 1. the rising half of a Blackman filter; 2. a constant 1.0; and 3. the falling half of a Blackman filter. The actual rise/fall time may not exactly equal the target rise/fall time if the requested time is not an integer multiple of the signal's time per point. If the masking array workup/time/mask/binarate is defined then design the windowing function workup/time/window/cyclicize so that is is the right length to be applied to the *masked* signal array. If on the other hand the masking array is not already defined, then densign the window to be applied to the full signal array instead. If workup/time/mask/binarate is defined, then also create a masked version of the x-axis for plotting, x_binarated. Make the x_binarated array the new abscissa associated with the new workup/time/window/cyclicize array. """ if self.f.__contains__('workup/time/mask/binarate') == True: # You have to cast the HSF5 dataset to a np.array # so you can use the array of True/False values as # array indices m = np.array(self.f['workup/time/mask/binarate']) n = np.count_nonzero(m) abscissa = 'workup/time/x_binarated' else: n = self.f['y'].size abscissa = 'x' dt = self.f['x'].attrs['step'] # time per point ww = int(math.ceil((1.0*tw)/(1.0*dt))) # window width (points) tw_actual = ww*dt # actual window width (seconds) w = np.concatenate([sp.blackman(2*ww)[0:ww], np.ones(n-2*ww), sp.blackman(2*ww)[-ww:]]) dset = self.f.create_dataset('workup/time/window/cyclicize',data=w) attrs = OrderedDict([ ('name','window'), ('unit','unitless'), ('label','windowing function'), ('label_latex','windowing function'), ('help','window to force the data to start and end at zero'), ('abscissa',abscissa), ('t_window',tw), ('t_window_actual',tw_actual) ]) update_attrs(dset.attrs,attrs) new_report = [] new_report.append("Create a windowing function,") new_report.append("workup/time/window/cyclicize, with a rising/falling") new_report.append("blackman filter having a rise/fall time of") new_report.append("{0:.3f} us".format(1E6*tw_actual)) new_report.append("({0} points).".format(ww)) self.report.append(" ".join(new_report))
[docs] def fft(self): """ Take a Fast Fourier transform of the windowed signal. If the signal has units of nm, then the FT will have units of nm/Hz. """ # Start timer start = time.time() # If a mask is defined then select out a subset of the signal to be FT'ed # The signal array, s, should be a factor of two in length at this point s = np.array(self.f['y']) if self.f.__contains__('workup/time/mask/binarate') == True: m = np.array(self.f['workup/time/mask/binarate']) s = s[m] # If the cyclicizing window is defined then apply it to the signal if self.f.__contains__('workup/time/window/cyclicize') == True: w = np.array(self.f['workup/time/window/cyclicize']) s = w*s # Take the Fourier transform dt = self.f['x'].attrs['step'] freq = \ np.fft.fftshift( np.fft.fftfreq(s.size,dt)) sFT = dt * \ np.fft.fftshift( np.fft.fft(s)) # Save the data dset = self.f.create_dataset('workup/freq/freq',data=freq/1E3) attrs = OrderedDict([ ('name','f'), ('unit','kHz'), ('label','f [kHz]'), ('label_latex','$f \: [\mathrm{kHz}]$'), ('help','frequency'), ('initial',freq[0]), ('step',freq[1]-freq[0]) ]) update_attrs(dset.attrs,attrs) dset = self.f.create_dataset('workup/freq/FT',data=sFT) name_orig = self.f['y'].attrs['name'] unit_orig = self.f['y'].attrs['unit'] attrs = OrderedDict([ ('name','FT({0})'.format(name_orig)), ('unit','{0}/Hz'.format(unit_orig)), ('label','FT({0}) [{1}/Hz]'.format(name_orig,unit_orig)), ('label_latex','$\hat{{{0}}} \: [\mathrm{{{1}/Hz}}]$'.format(name_orig,unit_orig)), ('help','Fourier transform of {0}(t)'.format(name_orig)), ('abscissa','workup/freq/freq'), ('n_avg',1) ]) update_attrs(dset.attrs,attrs) # Stop the timer and make a report stop = time.time() t_calc = stop - start new_report = [] new_report.append("Fourier transform the windowed signal.") new_report.append("It took {0:.1f} ms".format(1E3*t_calc)) new_report.append("to compute the FFT.") self.report.append(" ".join(new_report))
[docs] def freq_filter_Hilbert_complex(self): """ Generate the complex Hilbert transform filter (:math:`Hc` in the attached notes). Store the filter in:: workup/freq/filter/Hc The associated filtering function is .. math:: \\begin{equation} \\mathrm{rh}(f) = \\begin{cases} 0 & \\text{if $f < 0$} \\\\ 1 & \\text{if $f = 0$} \\\\ 2 & \\text{if $f > 0$} \\end{cases} \\end{equation} """ freq = self.f['workup/freq/freq'][:] filt = 0.0*(freq < 0) + 1.0*(freq == 0) + 2.0*(freq > 0) dset = self.f.create_dataset('workup/freq/filter/Hc',data=filt) attrs = OrderedDict([ ('name','Hc'), ('unit','unitless'), ('label','Hc(f)'), ('label_latex','$H_c(f)$'), ('help','complex Hilbert transform filter'), ('abscissa','workup/freq/freq') ]) update_attrs(dset.attrs,attrs) new_report = [] new_report.append("Create the complex Hilbert transform filter.") self.report.append(" ".join(new_report))
[docs] def freq_filter_bp(self, bw, order=50, style="brick wall"): """ Create a bandpass filter with :param float bw: filter bandwidth, :math:`\\Delta f` [kHz] :param int order: filter order, :math:`n` (defaults to 50) :param string style: "brick wall" (default) or "cosine" Note that the filter width **bw** should be specified **in kHz and not Hz**. Store the filter in:: workup/freq/filter/bp The center frequency, :math:`f_0`, is determined automatically from the positive-frequency peak in the Fourier-transform of the cantilever signal. The filtering function for the brick wall filter is .. math:: \\begin{equation} \\mathrm{bp}(f) = \\frac{1}{1 + (\\frac{|f - f_0|}{\\Delta f})^n} \\end{equation} The absolute value makes the filter symmetric, even for odd values of :math:`n`. With the **bw** parameter set to 1 kHz, the filter will pass a 2 kHz band of frequencies, from 1 kHz below to 1 kHz above the center frequency. The power spectrum will show noise falling away starting 1 kHz away from the carrier. The filtering function for the cosine filter is .. math:: \\begin{equation} \\mathrm{bp}(f) = \\begin{cases} 0 & f < f_0 - \\Delta f \\\\ \\cos{\\left( \\frac{\\pi}{2} \\frac{f-f_0}{\\Delta f} \\right)} & f_0 - \\Delta f < f < f_0 + \\Delta f \\\\ 0 & f_0 + \\Delta f < f \\end{cases} \\end{equation} """ # The center frequency fc is the peak in the abs of the FT spectrum freq = np.array(self.f['workup/freq/freq'][:]) Hc = np.array(self.f['workup/freq/filter/Hc'][:]) FTrh = Hc*abs(np.array(self.f['workup/freq/FT'][:])) fc = freq[np.argmax(FTrh)] # Compute the filter freq_scaled = (freq - fc)/bw if style == "brick wall": bp = 1.0/(1.0+np.power(abs(freq_scaled),order)) elif style == "cosine": # here we use a trick bp = np.zeros(freq.shape) sub_index = (freq >= -1.0*bw + fc) & (freq <= bw + fc) sub_indices = np.arange(freq.size)[sub_index] bp[sub_indices] = np.sin(np.linspace(0,np.pi,sub_indices.size)) else: print("**ERROR**: Unrecognized filter function") dset = self.f.create_dataset('workup/freq/filter/bp',data=bp) attrs = OrderedDict([ ('name','bp'), ('unit','unitless'), ('label','bp(f)'), ('label_latex','$\mathrm{bp}(f)$'), ('help','bandpass filter'), ('abscissa','workup/freq/freq') ]) update_attrs(dset.attrs,attrs) # For fun, make an improved estimate of the center frequency # using the method of moments -- this only gives the right answer # because we have applied the nice bandpass filter first FT_filt = Hc*bp*abs(self.f['workup/freq/FT'][:]) fc_improved = (freq*FT_filt).sum()/FT_filt.sum() new_report = [] new_report.append("Create a bandpass filter with center frequency") new_report.append("= {0:.6f} kHz,".format(fc)) new_report.append("bandwidth = {0:.3f} kHz,".format(bw)) new_report.append("and order = {0}.".format(order)) new_report.append("Best estimate of the resonance") new_report.append("frequency = {0:.6f} kHz.".format(fc_improved)) self.report.append(" ".join(new_report))
[docs] def time_mask_rippleless(self, td): """ Defined using :param float td: the dead time [s] that will remove the leading and trailing rippled from the phase (and amplitude) versus time data. **Programming notes**. As a result of the applying cyclicizing window in the time domain and the bandpass filter in the frequency domain, the phase (and amplitude) will have a ripple at its leading and trailing edge. We want to define a mask that will let us focus on the uncorrupted data in the middle of the phase (and amplitude) array Defining the required mask takes a little thought.We need to first think about what the relevant time xis is. If:: self.f.__contains__('workup/time/mask/binarate') == True: then the data has been trimmed to a factor of two and the relevant time axis is:: self.f['/workup/time/x_binarated'] Otherwise, the relevant time axis is:: self.f['x'] We will call this trimming mask:: self.f['workup/time/mask/rippleless'] We will apply this mask to either ``self.f['/workup/time/x_binarated']`` or ``self.f['x']`` to yield a new time array for plotting phase (and amplitude) data:: self.f['/workup/time/x_rippleless'] If no bandpass filter was defined, then the relevant time axis for the phase (and amplitude) data is either:: self.f['/workup/time/x_binarated'] (if self.f.__contains__('workup/time/mask/binarate') == True) or:: self.f['x'] (if self.f.__contains__('workup/time/mask/binarate') == False) """ dt = self.f['x'].attrs['step'] # time per point ww = int(math.ceil((1.0*td)/(1.0*dt))) # window width (points) td_actual = ww*dt # actual dead time (seconds) if self.f.__contains__('workup/time/mask/binarate') == True: x = np.array(self.f['/workup/time/x_binarated'][:]) abscissa = '/workup/time/x_binarated' else: x = np.array(self.f['x'][:]) abscissa = 'x' n = x.size indices = np.arange(n) mask = (indices >= ww) & (indices < n - ww) x_rippleless = x[mask] dset = self.f.create_dataset('workup/time/mask/rippleless',data=mask) attrs = OrderedDict([ ('name','mask'), ('unit','unitless'), ('label','masking function'), ('label_latex','masking function'), ('help','mask to remove leading and trailing ripple'), ('abscissa',abscissa) ]) update_attrs(dset.attrs,attrs) dset = self.f.create_dataset('workup/time/x_rippleless',data=x_rippleless) attrs = OrderedDict([ ('name','t_masked'), ('unit','s'), ('label','t [s]'), ('label_latex','$t \: [\mathrm{s}]$'), ('help','time'), ('initial', x_rippleless[0]), ('step', x_rippleless[1]-x_rippleless[0]) ]) update_attrs(dset.attrs,attrs) new_report = [] new_report.append("Make an array, workup/time/mask/rippleless, to be") new_report.append("used to remove leading and trailing ripple. The") new_report.append("dead time is {0:.3f} us.".format(1E6*td_actual)) self.report.append(" ".join(new_report))
[docs] def ifft(self): """ If they are defined, * apply the complex Hilbert transform filter, * apply the bandpass filter, then * compute the inverse Fourier transform, * if a trimming window is defined then trim the result """ # Divide the FT-ed data by the timestep to recover the # digital Fourier transformed data. Carry out the # transforms. s = self.f['workup/freq/FT'][:]/self.f['x'].attrs['step'] if self.f.__contains__('workup/freq/filter/Hc') == True: s = s*self.f['workup/freq/filter/Hc'] if self.f.__contains__('workup/freq/filter/bp') == True: s = s*self.f['workup/freq/filter/bp'] # Compute the IFT sIFT = np.fft.ifft(np.fft.ifftshift(s)) # Trim if a rippleless masking array is defined # Carefullly define what we should plot the complex # FT-ed data against. if self.f.__contains__('workup/time/mask/rippleless') == True: mask = np.array(self.f['workup/time/mask/rippleless']) sIFT = sIFT[mask] abscissa = 'workup/time/x_rippleless' else: if self.f.__contains__('workup/time/mask/binarate') == True: abscissa = '/workup/time/x_binarated' else: abscissa = 'x' dset = self.f.create_dataset('workup/time/z',data=sIFT) unit_y = self.f['y'].attrs['unit'] attrs = OrderedDict([ ('name','z'), ('unit',unit_y), ('label','z [{0}]'.format(unit_y)), ('label_latex','$z \: [\mathrm{{{0}}}]$'.format(unit_y)), ('help','complex cantilever displacement'), ('abscissa',abscissa) ]) update_attrs(dset.attrs,attrs) # Compute and save the phase and amplitude p = np.unwrap(np.angle(sIFT))/(2*np.pi) dset = self.f.create_dataset('workup/time/p',data=p) attrs = OrderedDict([ ('name','phase'), ('unit','cyc'), ('label','phase [cyc]'), ('label_latex','$\phi \: [\mathrm{cyc}]$'), ('help','cantilever phase'), ('abscissa',abscissa) ]) update_attrs(dset.attrs,attrs) a = abs(sIFT) dset = self.f.create_dataset('workup/time/a',data=a) attrs = OrderedDict([ ('name','amplitude'), ('unit',unit_y), ('label','a [{0}]'.format(unit_y)), ('label_latex','$a \: [\mathrm{{{0}}}]$'.format(unit_y)), ('help','cantilever amplitude'), ('abscissa',abscissa) ]) update_attrs(dset.attrs,attrs) new_report = [] new_report.append("Apply an inverse Fourier transform.") self.report.append(" ".join(new_report))
[docs] def fit_phase(self, dt_chunk_target): """ Fit the phase *vs* time data to a line. The slope of the line is the (instantaneous) frequency. The phase data is broken into "chunks", with :param float dt_chunk_target: the target chunk duration [s] If the chosen duration is not an integer multiple of the digitization time, then find the nearest chunk duration which is. Calculate the slope :math:`m` of the phase *vs* time line using the linear-least squares formula .. math:: \\begin{equation} m = \\frac{n \\: S_{xy} - S_x S_y}{n \\: S_{xx} - (S_x)^2} \\end{equation} with :math:`x` representing time, :math:`y` representing phase, and :math:`n` the number of data points contributing to the fit. The sums involving the :math:`x` (e.g., time) data can be computed analytically because the time data here are equally spaced. With the time per point :math:`\\Delta t`, .. math:: \\begin{equation} S_x = \\sum_{k = 0}^{n-1} x_k = \\sum_{k = 0}^{n-1} k \\: \\Delta t = \\frac{1}{2} \\Delta t \\: n (n-1) \\end{equation} \\begin{equation} S_{xx} = \\sum_{k = 0}^{n-1} x_k^2 = \\sum_{k = 0}^{n-1} k^2 \\: {\\Delta t}^2 = \\frac{1}{6} \\Delta t \\: n (n-1) (2n -1) \\end{equation} The sums involving :math:`y` (e.g., phase) can not be similarly precomputed. These sums are .. math:: \\begin{equation} S_y = \\sum_{k = 0}^{n-1} y_k = \\sum_{k = 0}^{n-1} \\phi_k \\end{equation} \\begin{equation} S_{xy} = \\sum_{k = 0}^{n-1} x_k y_k = \\sum_{k = 0}^{n-1} (k \\Delta t) \\: \\phi_k \\end{equation} To avoid problems with round-off error, a constant is subtracted from the time and phase arrays in each chuck so that the time array and phase array passed to the least-square formula each start at zero. """ # work out the chunking details dt = self.f['x'].attrs['step'] # time per phase point n = np.array(self.f['workup/time/p'][:]).size # no. of phase points n_per_chunk = int(round(dt_chunk_target/dt)) # points per chunck dt_chunk = dt*n_per_chunk # actual time per chunk n_tot_chunk = int(n/n_per_chunk) # total number of chunks n_total = n_per_chunk*n_tot_chunk # (realizable) no. of phase points # report the chunking details new_report = [] new_report.append("Curve fit the phase data.") new_report.append("The target chunk duration is") new_report.append("{0:.3f} us;".format(1E6*dt_chunk_target)) new_report.append("the actual chunk duration is") new_report.append("{0:.3f} us".format(1E6*dt_chunk)) new_report.append("({0} points).".format(n_per_chunk)) new_report.append("The associated Nyquist") new_report.append("frequency is {0:.3f} kHz.".format(1/(2*1E3*dt_chunk))) new_report.append("A total of {0} chunks will be curve fit,".format(n_tot_chunk)) new_report.append("corresponding to {0:.3f} ms of data.".format(1E3*dt*n_total)) self.report.append(" ".join(new_report)) start = time.time() # Reshape the phase data and # zero the phase at start of each chunk y = np.array(self.f['workup/time/p'][0:n_total]) y_sub = y.reshape((n_tot_chunk,n_per_chunk)) y_sub_reset = y_sub - y_sub[:,:,np.newaxis][:,0,:]*np.ones(n_per_chunk) # Reshape the time data # zero the time at start of each chunk abscissa = self.f['workup/time/p'].attrs['abscissa'] x = np.array(self.f[abscissa])[0:n_total] x_sub = x.reshape((n_tot_chunk,n_per_chunk)) x_sub_reset = x_sub - x_sub[:,:,np.newaxis][:,0,:]*np.ones(n_per_chunk) # use linear least-squares fitting formulas # to calculate the best-fit slope SX = dt*0.50*(n_per_chunk-1)*(n_per_chunk) SXX = (dt)**2*(1/6.0)*(n_per_chunk)*(n_per_chunk-1)*(2*n_per_chunk-1) SY = np.sum(y_sub_reset,axis=1) SXY = np.sum(x_sub_reset*y_sub_reset,axis=1) slope = (n_per_chunk*SXY-SX*SY)/(n_per_chunk*SXX-SX*SX) stop = time.time() t_calc = stop - start # Save the time axis and the slope # # Tricky: the time we want is the time in the ~middle~ of each chunk # # old: x_sub[:,0] # new: x_sub_middle = np.mean(x_sub[:,:],axis=1) x_sub_middle = np.mean(x_sub[:,:],axis=1) dset = self.f.create_dataset('workup/fit/x',data=x_sub_middle) attrs = OrderedDict([ ('name','t'), ('unit','s'), ('label','t [s]'), ('label_latex','$t \: [\mathrm{s}]$'), ('help','time at the start of each chunk') ]) update_attrs(dset.attrs,attrs) dset = self.f.create_dataset('workup/fit/y',data=slope) attrs = OrderedDict([ ('name','f'), ('unit','cyc/s'), ('label','f [cyc/s]'), ('label_latex','$f \: [\mathrm{cyc/s}]$'), ('help','best-fit slope'), ('abscissa','workup/fit/x') ]) update_attrs(dset.attrs,attrs) # report the curve-fitting details # and prepare the report new_report.append("It took {0:.1f} ms".format(1E3*t_calc)) new_report.append("to perform the curve fit and obtain the frequency.") self.report.append(" ".join(new_report))
[docs] def fit_amplitude(self): """ Fit the data stored at:: workup/time/a To a decaying exponential. Store the resulting fit at:: workup/fit/exp **Programming Notes** * From the ``lmfit`` documentation [`link <http://newville.github.io/lmfit-py/fitting.html#fit_report>`__]: "Note that the calculation of chi-square and reduced chi-square assume that the returned residual function is scaled properly to the uncertainties in the data. For these statistics to be meaningful, the person writing the function to be minimized must scale them properly." """ # extract the data from the Datasets y_dset = self.f['workup/time/a'] x = np.array(self.f[y_dset.attrs['abscissa']][:]) y = np.array(y_dset[:]) # define objective function: returns the array to be minimized def fcn2min(params, x, y, y_stdev): """ model decaying exponentil, subtract data""" a0 = params['a0'].value a1 = params['a1'].value tau = params['tau'].value y_calc = a0*np.exp(-x/tau) + a1 return (y_calc - y)/y_stdev # create a set of Parameters params = Parameters() params.add('a0', value= 1.0, min=0) params.add('a1', value= 0.1, min=0) params.add('tau', value= 1.0) # do fit once, here with leastsq model y_stdev = np.ones(y.size) result = minimize(fcn2min, params, args=(x,y,y_stdev)) # do fit again, using the standard deviation of the residuals # as an estimate of the standard error in each data point y_stdev = np.std(result.residual)*np.ones(y.size) result = minimize(fcn2min, params, args=(x,y,y_stdev)) # calculate final result y_calc = y + y_stdev*result.residual # write error report rep = fit_report(params) # store fit results! # format string examples: http://mkaz.com/2012/10/10/python-string-format/ dset = self.f.create_group('workup/fit/exp') p = result.params title = "a(t) = a0*exp(-t/tau) + a1" \ "\n" \ "a0 = {0:.6f} +/- {1:.6f}, tau = {2:.6f} +/- {3:.6f}, a1 = {4:.6f} +/- {5:.6f}".\ format(p['a0'].value, p['a0'].stderr, p['tau'].value, p['tau'].stderr, p['a1'].value, p['a1'].stderr) title_LaTeX = r'$a(t) = a_0 \exp(-t/\tau) + a_1$' \ '\n' \ r'$a_0 = {0:.6f} \pm {1:.6f}, \tau = {2:.6f} \pm {3:.6f}, a_1 = {4:.6f} \pm {5:.6f}$'. \ format(p['a0'].value, p['a0'].stderr, p['tau'].value, p['tau'].stderr, p['a1'].value, p['a1'].stderr) attrs = OrderedDict([ ('abscissa', y_dset.attrs['abscissa']), ('ordinate', 'workup/time/a'), ('fit_report', rep), ('help', 'fit to decaying exponential'), ('title', title), ('title_LaTeX', title_LaTeX), ('tau', p['tau'].value), ('tau_stderr', p['tau'].stderr), ('a0', p['a0'].value), ('a0_stderr', p['a0'].stderr), ('a1', p['a1'].value), ('a1_stderr', p['a1'].stderr) ]) update_attrs(dset.attrs,attrs) dset = self.f.create_dataset('workup/fit/exp/y_calc',data=y_calc) a_unit = self.f['workup/time/a'].attrs['unit'] attrs = OrderedDict([ ('abscissa', y_dset.attrs['abscissa']), ('name', 'a (calc)'), ('unit', a_unit), ('label', 'a (calc) [{0}]'.format(a_unit)), ('label_latex', '$a_{{\mathrm{{calc}}}} \: [\mathrm{{{0}}}]$'.format(a_unit)), ('help', 'cantilever amplitude (calculated)') ]) update_attrs(dset.attrs,attrs) dset = self.f.create_dataset('workup/fit/exp/y_resid',data=result.residual) attrs = OrderedDict([ ('abscissa', y_dset.attrs['abscissa']), ('name', 'a (resid)'), ('unit', a_unit), ('label', 'a (resid) [{0}]'.format(a_unit)), ('label_latex', '$a - a_{{\mathrm{{calc}}}} \: [\mathrm{{{0}}}]$'.format(a_unit)), ('help', 'cantilever amplitude (residual)') ]) update_attrs(dset.attrs,attrs)
[docs] def plot_fit(self, fit_group, LaTeX=False): """ Plot the fit stored in ``fit_group``. """ # helpful # http://stackoverflow.com/questions/4209467/matplotlib-share-x-axis-but-dont-show-x-axis-tick-labels-for-both-just-one y_dset = self.f[fit_group].attrs['ordinate'] x_dset = self.f[fit_group].attrs['abscissa'] y_calc_dset = fit_group + '/y_calc' y_resid_dset = fit_group + '/y_resid' # Posslby use tex-formatted axes labels temporarily for this plot # and compute plot labels old_param = plt.rcParams['text.usetex'] if LaTeX == True: plt.rcParams['text.usetex'] = True x_label_string = self.f[x_dset].attrs['label_latex'] y_label_string = self.f[y_dset].attrs['label_latex'] y2_label_string = self.f[y_resid_dset].attrs['label_latex'] title_string = self.f[fit_group].attrs['title_LaTeX'] elif LaTeX == False: plt.rcParams['text.usetex'] = False x_label_string = self.f[x_dset].attrs['label'] y_label_string = self.f[y_dset].attrs['label'] y2_label_string = self.f[y_resid_dset].attrs['label'] title_string = self.f[fit_group].attrs['title'] y = np.array(self.f[y_dset][:]) y_calc = np.array(self.f[y_calc_dset][:]) y_resid = np.array(self.f[y_resid_dset][:]) x = np.array(self.f[x_dset][:]) fig=plt.figure(facecolor='w') ax1 = fig.add_subplot(211) ax1.plot(x,y,'k.') ax1.plot(x,y_calc,'r') plt.ylabel(y_label_string) plt.title(title_string, fontsize=16) plt.setp(ax1.get_xticklabels(), visible=False) ax2 = fig.add_subplot(212,sharex=ax1) ax2.plot(x,y_resid,'k.') plt.xlabel(x_label_string) plt.ylabel(y2_label_string) # set text spacing so that the plot is pleasing to the eye plt.locator_params(axis = 'x', nbins = 4) plt.locator_params(axis = 'y', nbins = 4) fig.subplots_adjust(bottom=0.15,left=0.12) # clean up label spacings, show the plot, ... and reset the tex option fig.subplots_adjust(bottom=0.15,left=0.12) plt.show() plt.rcParams['text.usetex'] = old_param
[docs] def __repr__(self): """ Print out the report. """ temp = [] temp.append("") temp.append("Signal report") temp.append("=============") temp.append("\n\n".join(["* " + msg for msg in self.report])) return '\n'.join(temp)
[docs] def list(self, offset='', indent =' '): """ List all file/group/dataset elements in the hdf5 file by iterating over the file contents. Source:: https://confluence.slac.stanford.edu/display/PSDM /How+to+access+HDF5+data+from+Python #HowtoaccessHDF5datafromPython-HDF5filestructure """ print("") print("Signal file summary") print("===================") h5ls(self.f)
def _load_hdf5_default(self, h5object, s_dataset='y', t_dataset='x', infer_dt=True, infer_attrs=True): """Load an hdf5 file saved with default freqdemod attributes. :param h5object: An h5py File or group object, which contains t_dataset and s_dataset :param str s_dataset: signal dataset name (relative to h5object) :param str t_dataset: time dataset name (relative to h5object) :param bool infer_dt: If True, infer the time step dt from the contents of the t_dataset :param bool infer_attrs: If True, fill in any missing attributes used by freqdemod.""" h5object.copy(t_dataset, self.f, name='x') h5object.copy(s_dataset, self.f, name='y') x_attrs = self.f['x'].attrs y_attrs = self.f['y'].attrs if infer_dt: check_minimum_attrs(x_attrs, 'permissive') x_attrs['step'] = infer_timestep(h5object[t_dataset]) if infer_attrs: check_minimum_attrs(x_attrs, 'permissive_x') infer_missing_attrs(x_attrs, dataset_type='x') check_minimum_attrs(y_attrs, 'permissive') infer_missing_attrs(y_attrs, dataset_type='y', abscissa='x') check_minimum_attrs(x_attrs, 'freqdemod_x') check_minimum_attrs(y_attrs, 'freqdemod_y') def _load_hdf5_general(self, h5object, s_dataset, s_name, s_unit, t_dataset=None, dt=None, s_help='cantilever displacement'): """Load data from an arbitrarily formatted hdf5 file. :param h5object: An h5py File or group object, which contains s_dataset :param str s_dataset: signal dataset name (relative to h5object) :param str s_name: the signal's name :param str s_name: the signal's units :param str t_dataset: time dataset name (optional; or specify dt) :param float dt: the time per point [s] :param str s_help: the signal's help string """ h5object.copy(s_dataset, self.f, name='y', without_attrs=True) y_attrs = {'name': s_name, 'unit': s_unit, 'help': s_help, 'abscissa': 'x', 'n_avg': 1} update_attrs(self.f['y'].attrs, y_attrs) infer_labels(self.f['y'].attrs) if t_dataset is not None: h5object.copy(t_dataset, self.f, name='x', without_attrs=True) dt_ = infer_timestep(h5object[t_dataset]) elif dt is not None: self.f['x'] = dt * np.arange(self.f['y'][:].size) dt_ = dt else: raise ValueError("Must specify one of 't_dataset' or 'dt'") x_attrs = {'name': 't', 'unit': 's', 'label': 't [s]', 'label_latex': '$t \: [\mathrm{s}]$', 'help': 'time', 'initial': 0.0, 'step': dt_} update_attrs(self.f['x'].attrs, x_attrs)
[docs]def testsignal_sine(): fd = 50.0E3 # digitization frequency f0 = 2.00E3 # signal frequency nt = 60E3 # number of signal points sn = 1.0 # signal zero-to-peak amplitude sn_rms = 0.01 # noise rms amplitude dt = 1/fd t = dt*np.arange(nt) s = sn*np.sin(2*np.pi*f0*t) + np.random.normal(0,sn_rms,t.size) S = Signal() S.load_nparray(s,"x","nm",dt) S.time_mask_binarate("middle") S.time_window_cyclicize(3E-3) S.fft() S.freq_filter_Hilbert_complex() #S.freq_filter_bp(1.00) S.freq_filter_bp(bw=1.00, style="cosine") S.time_mask_rippleless(15E-3) S.ifft() S.fit_phase(221.34E-6) S.plot('y', LaTeX=latex) S.plot('workup/time/mask/binarate', LaTeX=latex) S.plot('workup/time/window/cyclicize', LaTeX=latex) S.plot('workup/freq/FT', LaTeX=latex, component='abs') S.plot('workup/freq/filter/Hc', LaTeX=latex) S.plot('workup/freq/filter/bp', LaTeX=latex) S.plot('workup/time/mask/rippleless', LaTeX=latex) S.plot('workup/time/z', LaTeX=latex, component='both') S.plot('workup/time/a', LaTeX=latex) S.plot('workup/time/p', LaTeX=latex) S.plot('workup/fit/y', LaTeX=latex) print(S) S.list() return S
[docs]def testsignal_sine_fm(): fd = 100E3 # digitization frequency f_start = 4.000E3 # starting frequency f_end = 6.000E3 # ending frequency # time array dt = 1/fd t1 = dt*np.arange(int(round(0.25/dt))) t2 = dt*np.arange(int(round((0.75-0.25)/dt))) t3 = dt*np.arange(128*1024-t1.size-t2.size) t2plus = t2+t1[-1]+dt t3plus = t3+t2plus[-1]+dt t = np.append(np.append(t1,t2plus),t3plus) # frequency array f1 = f_start*np.ones(t1.size) f2 = f_start + t2*(f_end-f_start)/(t2[-1]-t2[0]) f3 = f_end*np.ones(t3.size) f = np.append(np.append(f1,f2),f3) # phase accumulator p = np.zeros(t.size) p[0] = 0.0 for k in np.arange(1,t.size): p[k] = p[k-1] + dt*f[k-1] p = 2*np.pi*p x = np.cos(p) # make the single and work it up S = Signal() S.load_nparray(x,"x","nm",dt) S.time_mask_binarate("middle") S.time_window_cyclicize(3E-3) S.fft() S.freq_filter_Hilbert_complex() S.freq_filter_bp(4.00) S.time_mask_rippleless(15E-3) S.ifft() S.fit_phase(200E-6) S.plot('y', LaTeX=latex) S.plot('workup/freq/FT', LaTeX=latex, component='abs') S.plot('workup/freq/filter/bp', LaTeX=latex) S.plot('workup/time/z', LaTeX=latex, component='both') S.plot('workup/time/p', LaTeX=latex) S.plot('workup/fit/y', LaTeX=latex) print(S) return S
[docs]def testsignal_sine_exp(): fd = 50.0E3 # digitization frequency [Hz] f0 = 2.00E3 # signal frequency [Hz] tau = 0.325 # decay time [s] nt = 2.00*fd # number of signal points (before truncation) sn = 100.0 # signal zero-to-peak amplitude [nm] sn_rms = 20.0 # noise rms amplitude [nm] dt = 1/fd t = dt*np.arange(nt) s = sn*np.sin(2*np.pi*f0*t)*np.exp(-t/tau) + np.random.normal(0,sn_rms,t.size) S = Signal('.temp_sine_exp.h5') S.load_nparray(s,"x","nm",dt) S.time_mask_binarate("start") S.fft() S.freq_filter_Hilbert_complex() S.freq_filter_bp(1.00) # 2.00 kHz => 1.0 ms filter timescale S.time_mask_rippleless(10E-3) # 10 x the filter timescale S.ifft() S.plot('y', LaTeX=latex) S.plot('workup/freq/FT', LaTeX=latex, component='abs') S.plot('workup/freq/filter/bp', LaTeX=latex) S.plot('workup/time/a', LaTeX=latex) S.fit_amplitude() S.plot_fit('/workup/fit/exp', LaTeX=latex) print(S) return S
if __name__ == "__main__": # Parge command-line arguments # https://docs.python.org/2/library/argparse.html#module-argparse import argparse from argparse import RawTextHelpFormatter parser = argparse.ArgumentParser(formatter_class=RawTextHelpFormatter, description="Determine a signal's frequency vs time.\n" "Example usage:\n" " python demodulate.py --testsignal=sine --LaTeX\n\n") parser.add_argument('--testsignal', default='sine', choices = ['sine', 'sinefm', 'sineexp'], help='create analyze a test signal') parser.add_argument('--LaTeX', dest='latex', action='store_true', help = 'use LaTeX plot labels') parser.add_argument('--no-LaTeX', dest='latex', action='store_false', help = 'do not use LaTeX plot labels (default)') parser.set_defaults(latex=False) args = parser.parse_args() # Set the default font and size for the figure font = {'family' : 'serif', 'weight' : 'normal', 'size' : 18} plt.rc('font', **font) plt.rcParams['figure.figsize'] = 8.31,5.32 latex = args.latex # Do one of the tests if args.testsignal == 'sine': S = testsignal_sine() elif args.testsignal == 'sinefm': S = testsignal_sine_fm() elif args.testsignal == 'sineexp': S = testsignal_sine_exp() else: print("**warning **") print("--testsignal={} not implimented yet".format(args.testsignal))