Source code for crystalmapping.callbacks

import typing
from collections import ChainMap

import event_model as em
import numpy as np
import pandas as pd
from bluesky.callbacks.stream import LiveDispatcher
from databroker import Broker, Header
from trackpy import link, locate

from crystalmapping._vend import gen_stream


[docs]def gen_processed_images(images: typing.Iterable[typing.Union[list, np.ndarray]], subtrahend: typing.Union[list, np.ndarray]) -> typing.Generator[np.ndarray, None, None]: """Generate processed image from a series of images. The process procedure is (1) turn ot numpy array, (2) average the frames to a two dimensional image, (3) subtract the image and fill zero in negative pixels. Parameters ---------- images : A iterable of images. The dimensions of each image is no less than 2. subtrahend : The subtrahend image. The dimensions of each image is no less than 2. Yields ------ processed_image : A two dimensional image. """ subtrahend = np.asarray(subtrahend) subtrahend = get_mean_frame(subtrahend) for image in images: image = np.asarray(image) image = get_mean_frame(image) image = subtract_image(image, subtrahend) yield image
[docs]def get_mean_frame(frames: np.ndarray) -> np.ndarray: """Average the frames to a two dimensional image.""" n = np.ndim(frames) if n < 2: raise ValueError("The dimension of {} < 2.".format(n)) elif n == 2: mean_frame = np.copy(frames) elif n == 3: mean_frame = np.mean(frames, axis=0) else: mean_frame = np.mean(frames, axis=tuple((i for i in range(n - 2)))) return mean_frame
[docs]def subtract_image(minuend: np.ndarray, subtrahend: np.ndarray) -> np.ndarray: """Subtract the image and fill zero in negative pixels.""" ans = np.zeros_like(minuend) np.subtract(minuend, subtrahend, out=ans, where=ans > 0) return ans
[docs]def get_dataframe(run: Header) -> pd.DataFrame: """Get the dataframe from the stream. Drop the time column.""" return run.table().drop(columns=["time"])
[docs]class ImageProcessor(LiveDispatcher): """A callback to average frames of images, subtract it by another image, and emit the document.""" def __init__(self, data_key: str, subtrahend: np.ndarray): """Initiate the instance. Parameters ---------- data_key : The key of the data to use. subtrahend : The 2d image as a subtrahend. """ super(ImageProcessor, self).__init__() self.data_key = data_key self.subtrahend = np.asarray(subtrahend)
[docs] def start(self, doc, _md=None): if _md is None: _md = {} _md = ChainMap({"analysis_stage": ImageProcessor.__name__}, _md) super(ImageProcessor, self).start(doc, _md=_md)
def event_page(self, doc): for event_doc in em.unpack_event_page(doc): self.event(event_doc)
[docs] def event(self, doc, **kwargs): frames = np.asarray(doc["data"][self.data_key]) minuend = get_mean_frame(frames) result = subtract_image(minuend, self.subtrahend) new_data = {k: v for k, v in doc["data"].items() if k != self.data_key} new_data[self.data_key] = result.tolist() self.process_event({'data': new_data, 'descriptor': doc["descriptor"]})
[docs]class PeakTracker(LiveDispatcher): """Track the peaks on a series of images and summarize their position and intensity in a dataframe.""" def __init__(self, data_key: str, diameter: typing.Union[int, tuple], **kwargs): """Initiate the instance. Parameters ---------- data_key : The key of the data to use. diameter : The pixel size of the peak. kwargs : The other kwargs for the `trackpy.locate`. """ kwargs["diameter"] = diameter super(PeakTracker, self).__init__() self.data_key = data_key self.config = kwargs
[docs] def start(self, doc, _md=None): _md = {"analysis_stage": PeakTracker.__name__} super(PeakTracker, self).start(doc, _md=_md)
def event_page(self, doc): for event_doc in em.unpack_event_page(doc): self.event(event_doc)
[docs] def event(self, doc, **kwargs): image = doc["data"][self.data_key] df = locate(image, **self.config) df = df.assign(frame=doc["seq_num"]) for data in df.to_dict("records"): self.process_event({"data": data, "descriptor": doc["descriptor"]})
[docs]class TrackLinker(LiveDispatcher): """Track the peaks in frame and link them in trajectories. When a stop is received, the data will be pulled from the databroker and processed. Then, the dataframe will be emitted row by row. """ def __init__(self, *, db: Broker = None, search_range: typing.Union[float, tuple], **kwargs): """Create the instance. Parameters ---------- db : The databroker. If None, this callback does nothing. search_range : The search_range in `trackpy.link`. kwargs : Other kwargs in `trackpy.link`. """ super(TrackLinker, self).__init__() kwargs["search_range"] = search_range self.config = kwargs self.db = db
[docs] def start(self, doc, _md=None): _md = {"analysis_stage": TrackLinker.__name__} super(TrackLinker, self).start(doc, _md=_md)
def event_page(self, doc): return
[docs] def event(self, doc, **kwargs): return
[docs] def stop(self, doc, _md=None): features = get_dataframe(self.db[doc["run_start"]]) df = link(features, **self.config) descriptor = next(iter(self.raw_descriptors.keys())) for data in df.to_dict("records"): self.process_event({"data": data, "descriptor": descriptor}) super(TrackLinker, self).stop(doc, _md=None)
[docs]class DataFrameDumper(object): """Dump the dataframe to the database using databroker.""" def __init__(self, db: Broker): """Create an instance""" super(DataFrameDumper, self).__init__() self._db = db
[docs] def dump_df(self, df: pd.DataFrame, metadata: dict = None): """Dump the data frame into the database with the metadata.""" if not metadata: metadata = {} data = df.to_dict("records") for name, doc in gen_stream(data, metadata): self._db.insert(name, doc)