"""
babelscan object for holding many types of scan data
"""
import numpy as np
from . import functions as fn
from . import EVAL_MODE
from .settings import init_scan_plot_manager, init_multiscan_plot_manager
from .settings import init_scan_fit_manager, init_multiscan_fit_manager
from .volume import ArrayVolume, ImageVolume
"----------------------------------------------------------------------------------------------------------------------"
"------------------------------------------------- Scan ---------------------------------------------------------------"
"----------------------------------------------------------------------------------------------------------------------"
[docs]class Scan:
"""
Scan class
Contains a namespace of data associated with names and a seperate dictionary of name associations,
allowing multiple names to reference the same data.
namespace = {
'data1': [1,2,3],
'data2': [10,20,30]
}
alt_names = {
'xaxis': 'data1',
'yaxis': 'data2',
}
dh = Scan(namespace, alt_names)
dh('xaxis') >> returns [1,2,3]
Scan is usually inhereted by subclasses HdfScan, DatScan or CsvScan.
:param namespace: dict : dict of names and data {name: data}
:param alt_names: dict or None* : dict of alternative names to names in namespace
:param kwargs: key-word-argments as options shwon below, keywords and argmuents will be added to the namespace.
Options:
reload - True/False*, if True, reload mode is activated, reloading data on each operation
label_name - str, add a name to use to automatically find the label
label_command - str, format specifier for label, e.g. '{scan_number}'
title_name - str, add a name to use to automatically find the title
title_command - str, format specifier for title, e.g. '#{scan_number} Energy={en:5.2f} keV'
scan_command_name - str, add a name to use to automatically find the scan command
start_time_name - str, add a name to use to automatically find the start_time
end_time_name - str, add a name to use to automatically find the end_time
axes_name - str, add a name to use to automatically find the axes (xaxis)
signal_name - str, add a name to use to automatically find the signal (yaxis)
image_name - str, add a name to use to automatically find the detector image
str_list - list of str, list of names to display when print(self)
signal_operation - str, operation to perform on signal, e.g. '/Transmission'
error_function - func., operation to perform on signal to generate errors. e.g. np.sqrt
debug - str or list of str, options for debugging, options:
'namespace' - displays when items are added to the namespace
'eval' - displays when eval operations are used
Functions
add2namespace(name, data=None, other_names=None, hdf_address=None)
set data in namespace
add2strlist(names)
Add to list of names in str output
array(names, array_length=None)
Return numpy array of data with same length
axes()
Return default axes (xaxis) data
eval(operation)
Evaluate operation using names in dataset or in associated names
find_image(multiple=False)
Return address of image data in hdf file
get_plot_data(xname=None, yname=None, signal_op=None, error_op=None)
Return xdata, ydata, yerror, xname, yname
image(idx=None, image_address=None)
Load image from hdf file, works with either image addresses or stored arrays
image_roi(cen_h=None, cen_v=None, wid_h=31, wid_v=31)
Create new region of interest from detector images
image_roi_op(operation)
Create new region of interest (roi) from image data and return sum and maxval
image_roi_sum(cen_h=None, cen_v=None, wid_h=31, wid_v=31)
Create new region of interest
image_size()
Returns the image size
label(new_label=None)
Set or Return the scan label. The label is a short identifier for the scan, such as scan number
name(name)
Return corrected name from namespace
options(**kwargs)
Set or display options
reload_mode(mode=None)
Turns on reload mode - reloads the dataset each time
reset()
Reset the namespace
scan_command()
Returns scan command
show_namespace()
return str of namespace
signal()
Return default signal (yaxis) data
string(names, str_format=None)
Return formated string of data
string_format(operation)
Process a string with format specified in {} brackets, values will be returned.
title(new_title=None)
Set or Return the title
value(names, array_function=None)
Return single value of data
"""
def __init__(self, namespace, alt_names=None, default_values=None, **kwargs):
self._namespace = {}
self._alt_names = {}
self._namespace.update(namespace)
if alt_names is not None:
self._alt_names.update(alt_names)
self._default_values = {}
if default_values is not None:
self._default_values.update(default_values)
# Managers
self.plot = init_scan_plot_manager(self)
self.fit = init_scan_fit_manager(self)
# Options and defaults
self._options = {}
self._axes_str = ['axes', 'xaxis']
self._signal_str = ['signal', 'yaxis']
self._axes_cmd_names = {}
self._signal_cmd_names = {}
self._image_name = None
self._image_size = None
self._print_list = ['scan_command', 'axes', 'signal']
self._reload_mode = False
self._set_options(**kwargs)
self._volume = None
self._use_signal_op = False
self.add2namespace(
name=['scan_command', 'command', 'cmd'],
other_names=['_scan_command', 'scan_command', 'command', 'cmd'],
default_value='scan axes signal'
)
self.add2namespace(
name=['scan_number', 'filename'],
other_names='_label',
default_value='Scan'
)
self.add2namespace(
name=['filetitle'],
other_names='_title',
default_value='Scan Title'
)
self._debug('init', '%r' % self)
"------------------------------- Basic Operations -------------------------------------------"
def reset(self):
"""Regenerate data lists"""
self._namespace = {}
def reload_mode(self, mode=None):
"""
Turns on reload mode - reloads the dataset each time
:param mode: Bool or None, True to turn on, None to return current mode
:return: None or str
"""
if mode is None:
if self._reload_mode:
return "Reload mode is ON"
return "Reload mode is OFF"
self._reload_mode = mode
def add2namespace(self, name, data=None, other_names=None, default_value=None):
"""
set data in namespace
:param name: str name or list of names (each name will store the same data)
:param data: any or None, data to store in namespace (nothing stored if None)
:param other_names: str, list of str or None - strings to associate with name, giving the same result
:param default_value: any or None, data to store in default_value namespace (nothing stored if None)
:return: None
"""
names = fn.liststr(name)
if data is not None:
for name in names:
self._namespace[name] = data
self._debug('namespace', 'Add to namespace: %s: %s' % (name, fn.data_string(data)))
if other_names is not None:
other_names = fn.liststr(other_names)
for other_name in other_names:
if other_name in self._alt_names:
self._alt_names[other_name] += names
else:
self._alt_names[other_name] = names
self._debug('namespace', 'Add alt. name: %s: %s' % (other_name, names))
if default_value is not None:
for name in names:
self._default_values[name] = default_value
self._debug('namespace', 'Add to default values: %s: %s' % (name, fn.data_string(data)))
def update_namespace(self, *args, **kwargs):
"""
Update internal datespace with dicts
:param args: dict, namespace will be updated in order
:param kwargs: keyword arguments will be added to namespace
:return: None
"""
for arg in args:
self._namespace.update(arg)
self._namespace.update(kwargs)
def isinnamespace(self, name):
"""Check if name is in namespcae (includes alt_names)"""
if name in self._namespace:
return True
return name in self._alt_names
def show_namespace(self):
"""return str of namespace"""
out = 'Namespace %r:\n' % self
out += '%-20s %-60s | %s\n' % ('Name', 'Alternate Names', 'Data')
for key, item in self._namespace.items():
other_names = ', '.join(okey for okey, oitem in self._alt_names.items() if key in oitem)
out += '%-20s %-60s | %s\n' % (key, other_names, fn.data_string(item))
return out
info = show_namespace # info maybe overloaded
def add2strlist(self, names):
"""Add to list of names in str output"""
self._print_list += fn.liststr(names)
def options(self, *args, **kwargs):
"""
Set, get or display options
scan.options() >> returns str of current options
scan.options('name') >> returns options['name'] or None
scan.options(name=value) >> set option 'name' as value
scan.options(**options_dict) >> set options using dict
:param args: option to look for
:param kwargs: options to set
:return:
"""
if len(args) == 0 and len(kwargs) == 0:
# return options
out = 'Options:\n'
for key, item in self._options.items():
out += '%20s : %s\n' % (key, item)
return out
for arg in args:
if arg in self._options:
return self._options[arg]
else:
return None
self._set_options(**kwargs)
def _set_options(self, **kwargs):
"""Set options"""
self._options.update(kwargs)
if 'data' in kwargs:
self._namespace.update(kwargs['data'])
if 'names' in kwargs:
self._alt_names.update(kwargs['names'])
if 'alt_names' in kwargs:
self._alt_names.update(kwargs['alt_names'])
if 'defaults' in kwargs:
self._default_values.update(kwargs['defaults'])
if 'reload' in kwargs:
self._reload_mode = kwargs['reload']
if 'axes_name' in kwargs:
self._axes_str = fn.liststr(kwargs['axes_name'])
if 'signal_name' in kwargs:
self._signal_str = fn.liststr(kwargs['signal_name'])
if 'axes_cmd_names' in kwargs:
self._axes_cmd_names.update(kwargs['axes_cmd_names'])
if 'signal_cmd_names' in kwargs:
self._signal_cmd_names.update(kwargs['signal_cmd_names'])
if 'image_name' in kwargs:
self._image_name = fn.liststr(kwargs['image_name'])
if 'str_list' in kwargs:
self._print_list = fn.liststr(kwargs['str_list'])
if 'start_time_name' in kwargs:
self.add2namespace(kwargs['start_time_name'], other_names='_time_start')
if 'end_time_name' in kwargs:
self.add2namespace(kwargs['end_time_name'], other_names='_time_end')
if 'scan_plot_manager' in kwargs:
self.plot = kwargs['scan_plot_manager'](self)
if 'scan_fit_manager' in kwargs:
self.fit = kwargs['scan_fit_manager'](self)
def load_config(self, config_file, run_formats=True):
"""Load settings from config file, adds various parameters to the namespaces"""
name, default_names, formats, default_values, options = fn.load_from_config(config_file)
self.options(**options)
for name, alt_names in default_names.items():
self.add2namespace(name, other_names=alt_names)
for name, value in default_values.items():
self.add2namespace(name, default_value=value)
if run_formats:
for name, operation in formats.items():
string = self.string_format(operation)
self.add2namespace(name, string)
def _debug(self, debug_name, message):
"""
Returns message if debug option active
:param debug_name: str name to match in self._options['debug']
:param message: str message to print if true
:return: None
"""
if 'debug' in self._options and debug_name in self._options['debug']:
m = 'db:%s: %s' % (debug_name, message)
print(m)
elif 'debug' in self._options and 'all' in self._options['debug']:
m = 'db:%s: %s' % (debug_name, message)
print(m)
"------------------------------- class operations -------------------------------------------"
def __repr__(self):
return 'Scan(namespace: %d, alt_names: %d)' % (len(self._namespace), len(self._alt_names))
def __str__(self):
out = self.__repr__()
out += '\n' + '\n'.join(self.string(self._print_list))
return out
def __call__(self, name):
return self.eval(name)
def __getitem__(self, name):
name, data = self._get_list_data(name)
if len(data) == 1:
return data[0]
return data
def __len__(self):
return self.scan_length()
def __add__(self, addee):
"""
Add two scans together somehow
"""
return MultiScan([self, addee])
"------------------------------- data -------------------------------------------"
def _load_data(self, name):
"""
Check name in external dictionary, add to internal namespace
This function will be overloaded in subclasses
If 'name' not available, raise KeyError
:param name: str
"""
self._debug('load', 'Searching external databases for close match to: %s' % name)
# Search for close match
keys = [k.lower() for k in self._namespace.keys()]
if name.lower() in keys:
data = self._namespace[keys[keys.index(name.lower())]]
self.add2namespace(name, data)
return
for key in keys:
if name.lower() in key:
data = self._namespace[key]
self.add2namespace(name, data)
return
raise KeyError('\'%s\' not available in %r' % (name, self))
def _get_name_data(self, name):
"""
Get name and data from stored dicts
Search hierachy:
1. Check namespace for name, return namespace[name]
2. Check alt_names for name, return namespace[alt_names[name][i]]
3. Check name against special names in axes_str, signal_str
4. Check name against 'nroi'
5. Check name in external source e.g. hdf file
6. Check name, alt_names in defaults_namespace
7. If not available, raise KeyError
:param name: str, key or associated key in namespace
:return name, data: from namespace dict
"""
if self._reload_mode:
self._load_data(name)
self._debug('load', 'Looking for %s in namespace, alt_names, specials' % name)
if name in self._namespace:
return name, self._namespace[name]
if name in self._alt_names:
for alt_name in self._alt_names[name]:
if alt_name in self._namespace:
return alt_name, self._namespace[alt_name]
# Check defaults
if name in self._axes_str: # e.g. 'axes'
return self.axes() # return _get_name_data('eta')
if name in self._signal_str:
return self.signal()
# Check new region of interest
if 'nroi' in name:
roi_sum, roi_max = self.image_roi_op(name)
return name, roi_sum
# Load data from external dictionary (e.g. hdf file)
# 'name' will be added to namespace, or KeyError will be raised
# _load_data can be overloaded in subclasses
try:
self._load_data(name)
return self._get_name_data(name)
except KeyError as ke:
# Finally, check the defaults namespace
if name in self._default_values:
self.add2namespace(name, self._default_values[name])
return name, self._default_values[name]
if name in self._alt_names:
for alt_name in self._alt_names[name]:
if alt_name in self._default_values:
self.add2namespace(alt_name, self._default_values[alt_name], name)
return alt_name, self._default_values[alt_name]
raise ke
def _get_data(self, name):
return self._get_name_data(name)[1]
def _get_list_data(self, names):
"""
Get data from stored dicts
:param names: str or list of str, key or associated key in namespace
:return: list of data from namespace dict
"""
names = fn.liststr(names)
data = []
new_name = []
for name in names:
n, d = self._get_name_data(name)
data += [d]
new_name += [n]
return new_name, data
def array(self, names, array_length=None):
"""
Return numpy array of data with same length
data with length 1 will be cast over the full length
data with length >1 and < array_length will be filled with nans
:param names: str or list of str, key or associated key in namespace
:param array_length: int or None, length of arrays returned
:return: array(n,array_length) where n is the length of list names
"""
names, data = self._get_list_data(names)
if array_length is None:
array_length = np.max([np.size(d) for d in data])
out = np.nan * np.zeros(shape=(len(data), array_length))
for n in range(len(data)):
if np.size(data[n]) == 1:
out[n, :] = data[n]
else:
out[n, :len(data[n])] = data[n]
return out
def value(self, names, array_function=None):
"""
Return single value of data
:param names: str or list of str, key or associated key in namespace
:param array_function: function to return a single value from an array
:return: value or list of values
"""
names, data = self._get_list_data(names)
if array_function is None:
array_function = fn.VALUE_FUNCTION
out = [array_function(val) for val in data]
if len(out) == 1:
return out[0]
return out
def name(self, name):
"""
Return corrected name from namespace
:param name: str or list of str
:return: str or list of str
"""
names = np.asarray(name, dtype=str).reshape(-1)
out = [self._get_name_data(name)[0] for name in names]
if len(out) == 1:
return out[0]
return out
def string(self, names, str_format=None):
"""
Return formated string of data
:param names: str or list of str, key or associated key in namespace
:param str_format: format to use, e.g. '%s:%s'
:return: str or list of str
"""
names, data = self._get_list_data(names)
if str_format is None:
str_format = fn.OUTPUT_FORMAT
out = [str_format % (name, fn.data_string(val)) for name, val in zip(names, data)]
if len(out) == 1:
return out[0]
return out
def time(self, names, date_format=None):
"""
Return datetime object from data name
:param names: str or list of str, key or associated key in namespace
:param date_format: str format used in datetime.strptime (see https://strftime.org/)
:return: list of datetime ojbjects
"""
if date_format is None and 'date_format' in self._options:
date_format = self._options['date_format']
names, data = self._get_list_data(names)
return fn.data_datetime(data, date_format)
"------------------------------- Operations -----------------------------------------"
def _prep_operation(self, operation):
"""
prepare operation string, replace names with names in namespace
:param operation: str
:return operation: str, names replaced to match namespace
"""
# First look for addresses in operation to seperate addresses from divide operations
# addresses = fn.re_address.findall(operation)
old_op = operation
# Determine custom regions of interest 'nroi'
rois = fn.re_nroi.findall(operation)
for name in rois:
new_name, data = self._get_name_data(name)
operation = operation.replace(name, new_name)
# Determine data for other variables
names = fn.re_varname.findall(operation)
for name in names:
try:
new_name, data = self._get_name_data(name)
if new_name != name:
operation = operation.replace(name, new_name)
except KeyError:
pass
self._debug('eval', 'Prepare eval operation\n initial: %s\n final: %s' % (old_op, operation))
return operation
def _name_eval(self, operation):
"""
Evaluate operation using names in dataset or in associated names
:param operation: str
:return: corrected operation, output of operation
"""
if not EVAL_MODE:
return self._get_name_data(operation)
fn.check_naughty_eval(operation) # raise error if bad
operation = self._prep_operation(operation)
result = eval(operation, globals(), self._namespace)
if operation in self._namespace or operation in self._alt_names:
return operation, result
# add to namespace
n = 1
while 'operation%d' % n in self._namespace:
n += 1
self.add2namespace('operation%d' % n, result, operation)
return operation, result
def eval(self, operation):
"""
Evaluate operation using names in dataset or in associated names
:param operation: str
:return: output of operation
"""
_, out = self._name_eval(operation)
return out
def string_format(self, operation):
"""
Process a string with format specified in {} brackets, values will be returned.
e.g.
operation = 'the energy is {energy} keV'
out = string_command(operation)
# energy is found within hdf tree
out = 'the energy is 3.00 keV'
:param operation: str format operation e.g. '#{scan_number}: {title}'
:return: str
"""
# get values inside brackets
ops = fn.re_strop.findall(operation)
format_namespace = {}
for op in ops:
op = op.split(':')[0] # remove format specifier
name, data = self._name_eval(op)
try:
value = fn.VALUE_FUNCTION(data) # handles numbers, arrays of numbers
except TypeError:
value = fn.data_string(data) # anything else
format_namespace[name] = value
operation = operation.replace(op, name)
return operation.format(**format_namespace)
def set_error_operation(self, operation=None):
"""Set the default error operation using str e.g. 'np.sqrt(x+1)"""
if operation is None:
operation = 'np.sqrt(np.abs(x)+1)'
self.options(error_function=operation)
def _get_error(self, name, operation=None):
"""
Return uncertainty on data using operation
:param operation: function to apply to signal, e.g. np.sqrt or 'np.sqrt(x+0.1)'
:param operation: None* will default to zero, unless "error_function" in options
:return: operation(array)
"""
_, data = self._name_eval(name)
if operation is None:
if 'error_function' in self._options:
operation = self._options['error_function']
else:
return np.zeros(np.shape(data))
error_fun = fn.function_generator(operation)
return error_fun(data)
def _get_signal_operation(self, name, signal_op=None, error_op=None):
"""
Return data after operation with error
:param name: str name in namespace
:param signal_op: str operation to perform on name, e.g. '/Transmission'
:param error_op: str operation to perform on name to generate error, e.g. 'np.sqrt(x)'
:return: signal_name, output, error arrays
"""
name, data = self._name_eval(name)
error = self._get_error(name, error_op)
# add error array to namespace
error_name = '%s_error' % name
self.add2namespace(error_name, error)
if not self._use_signal_op:
return name, data, error
if signal_op is None:
if 'signal_operation' in self._options:
signal_op = self._options['signal_operation']
else:
return name, data, error
# Create operations
operation = name + signal_op
operation_error = error_name + signal_op
signal = self.eval(operation)
error = self.eval(operation_error)
return operation, signal, error
"------------------------------- Defaults -------------------------------------------"
def label(self):
"""
Set or Return the scan label. The label is a short identifier for the scan, such as scan number
:return: None or str
"""
if 'label_command' in self._options:
return self.string_format(self._options['label_command'])
return self._get_data('_label')
def title(self):
"""
Set or Return the title
:return: None or str
"""
if 'title_command' in self._options:
return self.string_format(self._options['title_command'])
return self._get_data('_title')
def scan_command(self):
"""
Returns scan command
:return: str
"""
return self._get_data('_scan_command')
def time_start(self):
"""
Return scan time_start time
:return: datetime
"""
return self.time('_time_start')[0]
def time_end(self):
"""
Return scan end time
:return: datetime
"""
return self.time('_time_end')[-1]
def duration(self, start_time=None, end_time=None):
"""
Calculate time difference between two times
:param start_time: str name of date dataset or array of timestamps
:param end_time: None or str name of date dataset
:return: datetime.timedelta
"""
if end_time is not None:
end_time = self.time(end_time)[-1]
if start_time is None:
start_time = self.time_start()
else:
lst = self.time(start_time)
start_time = lst[0]
if len(lst) > 1 and end_time is None:
end_time = lst[-1]
if end_time is None:
end_time = self.time_end()
return end_time - start_time
def _find_defaults(self):
"""
Find default axes and signal (x-axis/y-axis), adds to namespace
This function may be overloaded in subclasses
:return: axes_name, signal_name
"""
scan_command = self.scan_command()
# axes / x-axis
axes_name = fn.axes_from_cmd(scan_command, self._axes_cmd_names)
axes_data = self._get_data(axes_name)
if np.ndim(axes_data) == 0:
axes_data = np.array(axes_data)
self.add2namespace(axes_name, axes_data, self._axes_str)
# signal / y-axis
signal_name = fn.signal_from_cmd(scan_command, self._signal_cmd_names)
signal_data = self._get_data(signal_name)
if np.ndim(signal_data) == 0:
signal_data = np.array(signal_data)
self.add2namespace(signal_name, signal_data, self._signal_str)
return axes_name, signal_name
def axes(self):
"""
Return default axes (xaxis) data
:return: array
"""
add2othernames = []
for name in self._axes_str:
if name in self._namespace:
self.add2namespace(name, other_names=add2othernames)
return name, self._namespace[name]
if name in self._alt_names:
for alt_name in self._alt_names[name]:
if alt_name in self._namespace:
self.add2namespace(name, other_names=add2othernames)
return alt_name, self._namespace[alt_name]
add2othernames += [name]
# axes not in namespace, get from scan command
axes_name, signal_name = self._find_defaults()
return self._get_name_data(axes_name)
def signal(self):
"""
Return default signal (yaxis) data
:return: array
"""
add2othernames = []
for name in self._signal_str:
if name in self._namespace:
self.add2namespace(name, other_names=add2othernames)
return name, self._namespace[name]
if name in self._alt_names:
for alt_name in self._alt_names[name]:
if alt_name in self._namespace:
self.add2namespace(name, other_names=add2othernames)
return alt_name, self._namespace[alt_name]
add2othernames += [name]
# signal not in namespace, get from scan command
axes_name, signal_name = self._find_defaults()
return self._get_name_data(signal_name)
def scan_length(self):
"""
Return the number of points in the scan (length of 'axes')
:return: int
"""
return np.size(self.axes()[1])
def toggle_signal_operation(self, use_signal_op=None):
"""Turn on/off signal operations (normalisation)"""
if use_signal_op is not None:
self._use_signal_op = use_signal_op
return
if self._use_signal_op:
self._use_signal_op = False
else:
self._use_signal_op = True
def get_plot_data(self, xname=None, yname=None, signal_op=None, error_op=None):
"""
Return xdata, ydata, yerror, xname, yname
x, y, dy, xlabel, ylabel = scan.get_plot_data('axes', 'signal', '/Transmission', np.sqrt)
:param xname: str name of value to use as x-axis
:param yname: str name of value to use as y-axis
:param signal_op: operation to perform on yaxis, e.g. '/Transmission'
:param error_op: function to use on yaxis to generate error, e.g. np.sqrt
:return xdata: array
:return ydata: array
:return yerror: array
:return xname: str
:return yname: str
"""
if xname is None:
xname = self._axes_str[0]
if yname is None:
yname = self._signal_str[0]
xname, xdata = self._name_eval(xname)
yname, ydata, yerror = self._get_signal_operation(yname, signal_op, error_op)
return xdata, ydata, yerror, xname, yname
def save_plot_data(self, filename=None, xname=None, yname=None, signal_op=None, error_op=None):
"""
Return xdata, ydata, yerror, xname, yname
x, y, dy, xlabel, ylabel = scan.get_plot_data('axes', 'signal', '/Transmission', np.sqrt)
:param filename: str filename to save data to
:param xname: str name of value to use as x-axis
:param yname: str name of value to use as y-axis
:param signal_op: operation to perform on yaxis, e.g. '/Transmission'
:param error_op: function to use on yaxis to generate error, e.g. np.sqrt
:return xdata: array
:return ydata: array
:return yerror: array
:return xname: str
:return yname: str
"""
xdata, ydata, yerror, xlabel, ylabel = self.get_plot_data(xname, yname, signal_op, error_op)
if filename is None:
filename = 'Scan_%s_%s_%s.csv' % (self.label(), xlabel, ylabel)
with open(filename, 'wt') as f:
f.write('# %r\n' % self)
title = '\n# '.join(self.title().split('\n'))
f.write('# %s\n' % title)
labels = ','.join([xlabel, ylabel, 'error'])
f.write('# %s\n' % labels)
for x, y, e in zip(xdata, ydata, yerror):
f.write('%s, %s, %s\n' % (x, y, e))
print('(%s, %s, error) written to %s' % (xlabel, ylabel, filename))
"------------------------------- images -------------------------------------------"
def image(self, idx=None):
"""
Load image from hdf file, works with either image addresses or stored arrays
:param idx: int image number, or:
'sum' - returns vertical sum of all images
'max' - returns image with bightest point
'peak' - returns image at peak position (volume.peak_search)
:return: numpy.array with ndim 2
"""
volume = self.volume()
if idx is None:
idx = len(volume) // 2
elif idx == 'sum':
return np.sum(volume, axis=0)
elif idx == 'peak':
i, j, k = volume.peak_search()
return volume[i]
elif idx == 'max':
i, j, k = volume.argmax()
return volume[i]
return volume[idx]
def _set_volume(self, array=None, image_file_list=None):
"""
Set the scan file volume
:param array: None or [scan_len, i, j] size array
:param image_file_list: list of str path locations for [scan_len] image files
:return: None, sets self._volume
"""
if array is not None:
self._volume = ArrayVolume(array)
elif image_file_list is not None:
self._volume = ImageVolume(image_file_list)
else:
self._volume = None
def volume(self):
"""
Rerturn volume
:return: babelscan.volume.Volume object
"""
if self._volume is None:
scanlen = self.scan_length()
vol = np.zeros([scanlen, 100, 100])
self._set_volume(vol)
return self._volume
def image_size(self):
"""
Returns the image size
:return: tuple
"""
if self._image_size:
return self._image_size
image = self.image(0)
shape = np.shape(image)
self._image_size = shape
return shape
def image_roi(self, cen_h=None, cen_v=None, wid_h=31, wid_v=31):
"""
Create new region of interest from detector images
:param cen_h: int or None
:param cen_v: int or None
:param wid_h: int or None
:param wid_v: int or None
:return: l*v*h array
"""
return self.volume().roi(cen_h, cen_v, wid_h, wid_v)
def image_roi_sum(self, cen_h=None, cen_v=None, wid_h=31, wid_v=31):
"""
Create new region of interest
:param cen_h: int or None
:param cen_v: int or None
:param wid_h: int or None
:param wid_v: int or None
:return: roi_sum, roi_max
"""
volume = self.volume()
roi_sum, roi_max = volume.roi_sum(cen_h, cen_v, wid_h, wid_v)
# Add to namespace
n = 1
while 'nroi%d_sum' % n in self._namespace:
n += 1
full_name = 'nroi[%d,%d,%d,%d]' % (cen_h, cen_v, wid_h, wid_v)
self.add2namespace('nroi%d_sum' % n, roi_sum, other_names=full_name)
self.add2namespace('nroi%d_max' % n, roi_max)
return roi_sum, roi_max
def image_roi_op(self, operation):
"""
Create new region of interest (roi) from image data and return sum and maxval
The roi centre and size is defined by an operation:
operation = 'nroi[210, 97, 75, 61]'
'nroi' - creates a region of interest in the detector centre with size 31x31
'nroi[h,v]' - creates a roi in the detector centre with size hxv, where h is horizontal, v is vertical
'nroi[m,n,h,v] - create a roi with cen_h, cen_v, wid_h, wid_v = n, m, h, v
:param operation: str : operation string
:return: sum, maxval : [o] length arrays
"""
volume = self.volume()
cen_h, cen_v, wid_h, wid_v, _ = volume.check_roi_op(operation)
roi_sum, roi_max = self.image_roi_sum(cen_h, cen_v, wid_h, wid_v)
# Add operation to associated namespace
n = 1
while 'nroi%d_sum' % n in self._namespace:
n += 1
n -= 1
name = 'nroi%d_sum' % n
self._debug('nroi', 'New ROI created: %s, saved in namespace as %s' % (operation, name))
self.add2namespace(name, other_names=operation)
return roi_sum, roi_max
"----------------------------------------------------------------------------------------------------------------------"
"----------------------------------------------- MultiScan ------------------------------------------------------------"
"----------------------------------------------------------------------------------------------------------------------"
[docs]class MultiScan:
"""
Class for holding multiple Scan objects
"""
def __init__(self, scan_list, variables=None):
self._scan_list = []
for scan in scan_list:
if issubclass(type(scan), MultiScan):
self._scan_list.extend(scan._scan_list)
else:
self._scan_list.append(scan)
if variables is None:
self._variables = []
else:
self._variables = fn.liststr(variables)
# Managers
self.plot = init_multiscan_plot_manager(self)
self.fit = init_multiscan_fit_manager(self)
def __repr__(self):
return 'MultiScan(%d items)' % len(self._scan_list)
def __str__(self):
variables = self.string(self._variables)
out = ''
for n in range(len(self._scan_list)):
out += '%3d %s: %s\n' % (n, self._scan_list[n].label(), variables[n])
return out
def __add__(self, other):
return MultiScan([self, other])
def __call__(self, name):
return [scan(name) for scan in self._scan_list]
def __getitem__(self, item):
return self._scan_list[item]
def __len__(self):
return len(self._scan_list)
def add_variable(self, name):
"""
Add variable
:param name: name of variable parameter between scans
:return:
"""
names = fn.liststr(name)
self._variables += names
def _get_variable_data(self):
"""
Return array of variable data such that
data = self._get_variable_data()
data[0] == data for self._variables[0]
"""
return np.transpose(self.value(self._variables)).reshape(len(self._variables), -1)
def _get_variable_string(self):
"""
Return string of variable data
:return: str
"""
return '\n'.join(self.string(self._variables))
def _get_name(self, name):
"""
Return corrected name from first scan
:param name: str
:return: str
"""
try:
name = self._scan_list[0].name(name)
except (IndexError, KeyError):
pass
return name
def scan_numbers(self):
return [scan.scan_number for scan in self._scan_list]
def title(self):
"""Return str title"""
scn_range = fn.findranges(self.scan_numbers())
return scn_range
def array(self, name, array_length=None):
data = self.__call__(name)
if array_length is None:
array_length = np.max([np.size(d) for d in data])
return np.array([scan.array(name, array_length)[0] for scan in self._scan_list])
def value(self, name):
return [scan.value(name) for scan in self._scan_list]
def string(self, name):
out = []
for scan in self._scan_list:
strlist = np.asarray(scan.string(name), dtype=str).reshape(-1)
out += [', '.join(s.strip() for s in strlist)]
return out
def labels(self, variable_names=None):
"""
Return list of str labels
:param variable_names: str or list of str names
:return: list
"""
if variable_names is None:
variable_names = self._variables
variables = self.string(variable_names)
return ['%s: %s' % (self._scan_list[n].label(), variables[n]) for n in range(len(self))]
def string_format(self, command):
return [scan.string_format(command) for scan in self._scan_list]
def griddata(self, axes=None, signal='signal', repeat_after=None):
"""
Generate 2D square grid of single values for each scan
Return x, y axis when taking single values from each scan
:param axes: str or list of str, names of axes data
:param signal: str name of signal data
:param repeat_after: int or None, defines repeat length of data
:return: xaxis, yaxis, zaxis square [n,m] arrays
"""
if axes is None:
axes = self._variables
else:
axes = np.asarray(axes).reshape(-1)
if len(axes) > 0:
xaxis = self.value(axes[0])
else:
xaxis = np.arange(len(self._scan_list))
yaxis = self.value(axes[-1])
zaxis = self.value(signal)
xaxis, yaxis, zaxis = fn.square_array(xaxis, yaxis, zaxis, repeat_after)
return xaxis, yaxis, zaxis
def get_plot_data(self, xname, yname):
"""
Get plotting data - simple extraction of data field from each scan
:param xname: str name of value to use as x-axis
:param yname: str name of value to use as y-axis
:return: xdata, ydata, xlabel, ylabel
"""
xdata = self.__call__(xname)
ydata = self.__call__(yname)
first = self[0]
xlabel = first.name(xname)
ylabel = first.name(yname)
return xdata, ydata, xlabel, ylabel
def get_plot_variable(self, yname, variable=None):
"""
Get plotting data for plotting data points from each scan
x, y, xlabel, ylabel = scans.get_plot_variable('signal', 'scanno')
e.g.
for n in range(len(x)):
plt.plot(x[n], y[n])
plt.xlabel(xlabel)
plt.ylabel(ylabel)
:param yname: str name of value to use as y-axis
:param variable: str, list of str or None for default list of scans variables
:return xdata: list of arrays for each scan
:return ydata: list of arrays for each scan
:return yerror: list of arrays for each scan
:return labels: list of str for each scan
:return xlabel: str, axis label for x-axis
:return ylabel: str, axis label for y-axis
"""
if variable is None:
variable = []
variables = fn.liststr(variable) + self._variables
xlabel = variables[0]
xdata = self.value(xlabel)
ylabel = yname
ydata = self.value(yname)
return xdata, ydata, xlabel, ylabel
def get_plot_lines(self, xname=None, yname=None, signal_op=None, error_op=None):
"""
Get plotting data for plotting as series of lines
x, y, dy, labels, xlabel, ylabel = scans.get_plot_lines('axes', 'signal', '/Transmission', np.sqrt)
e.g.
for n in range(len(x)):
plt.errorbar(x[n], y[n], dy[n])
plt.legend(labels)
plt.xlabel(xlabel)
plt.ylabel(ylabel)
:param xname: str name of value to use as x-axis
:param yname: str name of value to use as y-axis
:param signal_op: operation to perform on yaxis, e.g. '/Transmission'
:param error_op: function to use on yaxis to generate error, e.g. np.sqrt
:return xdata: list of arrays for each scan
:return ydata: list of arrays for each scan
:return yerror: list of arrays for each scan
:return labels: list of str for each scan
:return xlabel: str, axis label for x-axis
:return ylabel: str, axis label for y-axis
"""
xdata = []
ydata = []
dydata = []
xlabel, ylabel = xname, yname
for scan in self._scan_list:
x, y, dy, xlabel, ylabel = scan.get_plot_data(xname, yname, signal_op, error_op)
xdata += [x]
ydata += [y]
dydata += [dy]
labels = self.__str__().splitlines()
return xdata, ydata, dydata, labels, xlabel, ylabel
def get_plot_mesh(self, xname=None, yname=None, signal_op=None, error_op=None):
"""
Return array data for plotting as mesh
x, y, z, xlabel, ylabel, zlabel = scans.get_plot_mesh('axes', 'signal', '/Transmission', np.sqrt)
e.g.
plt.pcolormesh(x, y, z)
plt.xlabel(xlabel)
plt.ylabel(ylabel)
:param xname: str name of value to use as x-axis
:param yname: str name of value to use as y-axis
:param signal_op: operation to perform on yaxis, e.g. '/Transmission'
:param error_op: function to use on yaxis to generate error, e.g. np.sqrt
:return xdata: list of arrays for each scan
:return ydata: list of arrays for each scan
:return yerror: list of arrays for each scan
:return labels: list of str for each scan
:return xlabel: str, axis label for x-axis
:return ylabel: str, axis label for y-axis
"""
xname = self._get_name(xname)
yname = self._get_name(yname)
pass