"""
Folder Monitor
"""
import os
import glob
import numpy as np
from . import functions as fn
from .babelscan import Scan, MultiScan
from .hdf import HdfScan, HdfWrapper, load_hdf_values
from .dat import DatScan
from .csv import CsvScan
[docs]def create_scan(data, headers, alternate_names=None, default_values=None, **kwargs):
"""
Create data holder instance
:param data: list of data
:param headers: list of headers for data
:param alternate_names: dict of aternate names to headers (or None)
:param default_values: dict of default values (or None)
:param kwargs: other options
:return: Scan
"""
name2data = {n: d for n, d in zip(headers, data)}
return Scan(name2data, alternate_names, default_values, **kwargs)
[docs]def file_loader(filename, **kwargs):
"""
Load any file type as Scan class
:param filename: .dat, .csv, .hdf
:param kwargs: options
:return: Scan
"""
name, ext = os.path.splitext(filename)
if ext.lower() in ['.dat']:
return DatScan(filename, **kwargs)
elif ext.lower() in ['.csv']:
return CsvScan(filename, **kwargs)
else:
return HdfScan(filename, **kwargs)
[docs]def hdf_loader(filename):
"""
Load hdf (nexus) file as enhanced h5py object with additional functions
:param filename: .hdf, .nxs
:return: HdfWrapper (subclass of h5py.File)
"""
return HdfWrapper(filename)
[docs]def load_files(filenames, variables=None, **kwargs):
"""
Load multiple files as MultiScan class
:param filenames: list of filenames
:param variables: str or list of str names that vary in each scan
:param kwargs: options
:return: MultiScan
"""
filenames = fn.liststr(filenames)
if len(filenames) == 1:
return file_loader(filenames[0], **kwargs)
holders = [file_loader(file, **kwargs) for file in filenames]
return MultiScan(holders, variables)
[docs]def find_files(folders, file_type='nxs'):
"""
Find scan files in folders using format specifier
:param folders: str or list of str directories
:param file_type: name of extension, 'nxs' or 'dat'
:return: list of scan files
"""
folders = fn.liststr(folders)
spec, ext = os.path.splitext(file_type)
if ext == '':
ext = spec
if ext[0] != '.':
ext = '.' + ext
filelist = []
for directory in folders:
filelist += glob.glob('%s/*%s' % (directory, ext))
filelist = np.sort(filelist)
return list(filelist)
[docs]class FolderMonitor:
"""
Monitors a folder or several folders for files following a particular format
"""
def __init__(self, data_directory, working_directory='.', scan_loader=None, **kwargs):
self._data_directories = fn.liststr(data_directory)
self._working_directory = working_directory
if scan_loader is None:
self._scan_loader = file_loader
else:
self._scan_loader = scan_loader
title = os.path.basename(self._data_directories[0])
options = {
'title': title,
'title_command': '{FolderTitle} #{scan_number:g}',
'data': {},
'filename_format': '%06d.nxs'
}
options.update(kwargs)
options['data']['FolderTitle'] = title
self._options = options
self._filename_format = options['filename_format']
self.title = options['title']
def __repr__(self):
return 'FolderMonitor(%s)' % self.title
def __str__(self):
out = 'Folder Monitor: %s\n' % self.title
out += 'Data directories:\n '
out += '\n '.join(self._data_directories)
out += '\nWorking directory:\n %s\n' % os.path.abspath(self._working_directory)
scanfiles = self.allscanfiles()
if scanfiles is None:
scanfiles = []
out += 'Number of files: %d\nFirst file: %s\nLast file: %s\n' % \
(len(scanfiles), scanfiles[0], scanfiles[-1])
return out
def __call__(self, *args, **kwargs):
try:
filename = self.getfile(args[0])
scans = self.scan(filename, **kwargs)
if len(args) == 1:
return scans
except TypeError:
scans = self.scans(args, **kwargs)
for arg in args[1:]:
try:
filename = self.getfile(arg)
scans += self.scan(filename, **kwargs)
except TypeError:
scans += self.scans(arg, **kwargs)
return scans
def set_title(self, name):
"""Set experiment title"""
self.title = name
# possibly this should be self._options['data']['FolderTitle'] = self.title
self._options['FolderTitle'] = self.title
def options(self, **kwargs):
"""Set or display options"""
if len(kwargs) == 0:
# return options
out = 'Options:\n'
for key, item in self._options.items():
out += '%20s : %s\n' % (key, item)
return out
self._options.update(kwargs)
def set_format(self, filename_format='%06d.nxs'):
"""Set the file format to monitor, uses printf-style string format, e.g. '%5d.nxs'"""
self._filename_format = filename_format
def add_data_directory(self, data_directory):
data_directory = np.asarray(data_directory, dtype=str).reshape(-1)
self._data_directories = np.append(self._data_directories, data_directory)
def set_working_directory(self, working_directory):
"""Set the directory to save output too"""
self._working_directory = working_directory
def latest_scan_number(self):
"""
Get the latest scan number from the current experiment directory (self.data_directory[-1])
Return None if no scans found.
"""
return self.allscannumbers()[-1]
latest = latest_scan_number
def allscanfiles(self):
"""
Return list of all scan files in the data directories
"""
spec, ext = os.path.splitext(self._filename_format)
filelist = []
for directory in self._data_directories:
filelist += glob.glob('%s/*%s' % (directory, ext))
filelist = np.sort(filelist)
return filelist
def allscannumbers(self):
"""
Return a list of all scan numbers in the data directories
"""
filelist = self.allscanfiles()
return [fn.scanfile2number(file) for file in filelist if
os.path.basename(file) == self._filename_format % fn.scanfile2number(file)]
def getfile(self, scan_number):
"""
Convert int scan number to file
:param scan_number: int : scan number, scans < 1 will look for the latest scan
:return: filename or '' if scan doesn't appear in directory
"""
if issubclass(type(scan_number), str) and os.path.isfile(scan_number):
return scan_number
if scan_number < 1:
scan_number = self.latest() + scan_number
for directory in self._data_directories:
filename = os.path.join(directory, self._filename_format % scan_number)
if os.path.isfile(filename):
return filename
raise Exception('Scan number: %s doesn\'t exist' % scan_number)
def scan(self, scan_number_or_filename=0, **kwargs):
"""
Generate Scan object for given scan using either scan number or filename.
:param scan_number_or_filename: int or str file identifier
:param kwargs: options to send to file loader
:return: Scan object
"""
try:
filename = self.getfile(scan_number_or_filename)
except TypeError:
raise TypeError('Scan(\'%s\') filename must be number or string' % scan_number_or_filename)
options = self._options.copy()
options.update(kwargs)
if os.path.isfile(filename):
return self._scan_loader(filename, **options)
raise Exception('Scan doesn\'t exist: %s' % filename)
loadscan = readscan = scan
def updating_scan(self, scan_number_or_filename=0, **kwargs):
"""
Generate Scan object for given scan using either scan number or filename.
Data in the scan object will update each time it is called. Useful for live scan data.
:param scan_number_or_filename: int or str file identifier
:param kwargs: options to send to file loader
:return: Scan object
"""
return self.scan(scan_number_or_filename, reload=True, **kwargs)
def _backup_loader(self, scan_number_or_filename=0, **kwargs):
"""
Generate Scan object for given scan using either scan number or filename.
:param scan_number_or_filename: int or str file identifier
:param kwargs: options to send to file loader
:return: Scan object
"""
try:
filename = self.getfile(scan_number_or_filename)
except TypeError:
raise Exception('Scan(\'%s\') filename must be number or string' % scan_number_or_filename)
if os.path.isfile(filename):
return file_loader(filename)
raise Exception('Scan doesn\'t exist: %s' % filename)
def scans(self, scan_numbers_or_filenames, variables=None, **kwargs):
"""
Generate MultiScan object for given range of scans using either scan number or filename.
:param scan_numbers_or_filenames: list of int scan numbers or str filenames
:param variables: str or list of str names that vary in each scan
:param kwargs: options to send to file loader
:return: MultiScan object
"""
scan_numbers_or_filenames = np.asarray(scan_numbers_or_filenames).reshape(-1)
scans = [self.scan(n, **kwargs) for n in scan_numbers_or_filenames]
return MultiScan(scans, variables)
loadscans = readscans = scans
def scandata(self, scan_numbers, name):
"""
Fast return of data from scan number(s)
:param scan_numbers: int or list : scan numbers to get data
:param name: str : name
:return: data
"""
scan_numbers = np.asarray(scan_numbers).reshape(-1)
out = []
for scn in scan_numbers:
out += [self.scan(scn)(name)]
if len(scan_numbers) == 1:
return out[0]
return out
def print_scan(self, scan_number=0):
"""
Print details of a scan to the console
:param scan_number: int scan number or string filename
:return: None
"""
scan = self.scan(scan_number)
print(scan)
def print_scans(self, scan_numbers=None, names='scan_command'):
"""
Print details of a list of scans to the console
:param scan_numbers: list of scans or None for all scans in directories
:param names: str or list of string of variables that change with each scan
:return:
"""
if scan_numbers is None:
scan_numbers = self.allscannumbers()
else:
scan_numbers = np.asarray(scan_numbers).reshape(-1)
for n in range(len(scan_numbers)):
try:
scan = self.scan(scan_numbers[n])
strings = fn.liststr(scan.string(names))
data = ', '.join(strings)
except Exception:
data = 'Not available'
out = '%s: %s' % (scan_numbers[n], data)
print(out)
def load_hdf_address(self, address, scan_numbers=None, default=np.nan):
"""
Rapidly load hdf data from a list of scan numbers
:param address: str hdf address
:param scan_numbers: list of int or None for all files
:param default: default value if address doesn't exist in file
:return: array of floats or strings
"""
if scan_numbers is None:
scan_files = self.allscanfiles()
else:
scan_numbers = np.reshape(scan_numbers, -1)
scan_files = [self.getfile(scn) for scn in scan_numbers]
return load_hdf_values(scan_files, address, default)
def print_hdf_address(self, address, scan_numbers=None):
"""
Rapidly load hdf data from a list of scan numbers and print result to console
:param address: str hdf address
:param scan_numbers: list of int or None for all files
:return: None
"""
if scan_numbers is None:
scan_files = self.allscanfiles()
else:
scan_numbers = np.reshape(scan_numbers, -1)
scan_files = [self.getfile(scn) for scn in scan_numbers]
values = load_hdf_values(scan_files, address, 'Not available')
scan_numbers = [fn.scanfile2number(file) for file in scan_files]
out = 'Scan number : %s\n' % address
for scn, val in zip(scan_numbers, values):
out += '%s : %s\n' % (scn, val)
print(out)
def hdf_compare(self, scan1, scan2):
"""Compare two hdf files"""
s1 = self.scan(scan1)
s2 = self.scan(scan2)
if hasattr(s1, 'hdf_compare') and hasattr(s2, 'hdf_compare'):
return s1.hdf_compare(s2)
raise ValueError('Scans are not HDF files')
def plot_scan(self, scan_number=0, xaxis='axes', yaxis='signal', *args, **kwargs):
"""
Create matplotlib figure with plot of the scan (if matplotlib available)
:param scan_number: int or string filename
:param xaxis: str name or address of array to plot on x axis
:param yaxis: str name or address of array to plot on y axis, also accepts list of names for multiplt plots
:param args: given directly to plt.plot(..., *args, **kwars)
:param axes: matplotlib.axes subplot, or None to create a figure
:param kwargs: given directly to plt.plot(..., *args, **kwars)
:return: axes object
"""
scan = self.scan(scan_number)
return scan.plot(xaxis, yaxis, *args, **kwargs)
def plot_image(self, scan_number=0, index=None, xaxis='axes', axes=None, clim=None, cmap=None,
colorbar=False, **kwargs):
"""
Plot image in matplotlib figure (if matplotlib available)
:param scan_number: int or string filename
:param index: int, detector image index, 0-length of scan, if None, use centre index
:param xaxis: name or address of xaxis dataset
:param axes: matplotlib axes to plot on (None to create figure)
:param clim: [min, max] colormap cut-offs (None for auto)
:param cmap: str colormap name (None for auto)
:param colorbar: False/ True add colorbar to plot
:param kwargs: additional arguments for plot_detector_image
:return: axes object
"""
scan = self.scan(scan_number)
return scan.plot.image(index, xaxis, axes, clim, cmap, colorbar, **kwargs)
def plot_scans(self, scan_numbers_or_filenames=None, variables=None,
xaxis='axes', yaxis='signal', *args, **kwargs):
"""
Create matplotlib figure with overlayed plots of each scan (if matplotlib available)
:param scan_numbers_or_filenames: list of int scan numbers or str filenames
:param variables: str or list of str names that vary in each scan
:param xaxis: str name or address of array to plot on x axis
:param yaxis: str name or address of array to plot on y axis
:param args: given directly to plt.plot(..., *args, **kwargs)
:param axes: matplotlib.axes subplot, or None to create a figure
:param kwargs: given directly to plt.plot(..., *args, **kwargs)
:return: axes object
"""
scans = self.scans(scan_numbers_or_filenames, variables)
return scans.plot(xaxis, yaxis, *args, **kwargs)