Use the old peak tracking algorithm in streaming data processing

The package also offers the infrastructure for the streaming data processing old way, where the peaks are tracked on each image in the series of the exposures and the peaks will be linked in the final. The crystal maps are generated by filling the peak intensity in the zero background.

Here, we show an example to process the streaming data. The streaming data is obtained from the database and we will use databroker as the interface to the database. In real use cases, the data will be from the RunEngine.

Take a look at the raw data

First, we take a look at what our data is like. This is a grid scan on a crystal rod. At each pixel in the grid, there are multiple frames of diffraction images.

uid = UID["uid"][0]
run = DB[uid]
dask_array = run.xarray_dask()
dask_array.attrs = dict(run.start)

Here, we plot the spatial distribution of the total intensity. The upper part of the rod is made up by multiple crystalline domains while the lower part is full of powder.

stats1 = reshape(dask_array, "dexela_stats1_total")
plot_real_aspect(stats1);
_images/process_streaming_data_6_0.png

We select a image taken at the empty space. The image only contains dark current and air scattering. We will use this to subtract all other images in this scan.

image = dask_array["dexela_image"][4523].compute().mean(axis=0)
plot_real_aspect(image);
_images/process_streaming_data_8_0.png

Use callbacks to process the event stream

Here, we use the callbacks to process the data in a stream. The frames will be first go into the ImageProcessor. The frames will be averaged and subtracted by the subtrahend. Because diffraction image is non-negative, the negative pixels will be filled with zero. Then, the results will be sent to PeakTracker. It will find the Bragg peaks on the images and output each peak as a single event. The data will be dumped to DB2. During this process, TrackLinker will monitor the events. If all peaks have been found in all images, it will start linking the peaks and output where the same Bragg peak is on a series of images. The results will also be recorded in the database.

The argument dexela_image is the name of the data key of the data in the event stream. diameter=15, percentile=80, separation=100 are the key words for the trackpy.locate. search_range=50 is the key word for the trackpy.link. Please see the documents of trackpy for more information.

Here, the BestEffortCallback is used to print out the status of ImageProcessor. It is optional and does nothing to the data.

ip = ImageProcessor("dexela_image", subtrahend=image)
pt = PeakTracker("dexela_image", diameter=15, percentile=80, separation=100)
tl = TrackLinker(db=DB2, search_range=50)
bec = BestEffortCallback()
bec.disable_plots()
tl.subscribe(DB2.insert)
pt.subscribe(tl)
pt.subscribe(DB2.insert)
ip.subscribe(pt)
ip.subscribe(bec);

We pipe the data from our database to this callback function.

with Filler(
    handler_registry=dict(DB.reg.handler_reg),
    inplace=True,
    root_map=DB.reg.root_map
) as filler:
    for name, doc in run.documents(fill=False):
        filler(name, doc)
        ip(name, doc)

Use servers to process the real time event stream

We are basically replaying the data processing procedure in the last section. In this section, we will learn how the data is processed at the beamline in “real time”. We need to setup a server that wraps the callback function in last section. An example script is show here.

!cat server.py
import numpy as np
from databroker import Broker
from bluesky.callbacks.best_effort import BestEffortCallback
from crystalmapping.callbacks import ImageProcessor, PeakTrackor, TrackLinker
from bluesky.callbacks.zmq import RemoteDispatcher

# load the image to use
image = np.load("./data/PARAMID-2_background_1.npy", allow_pickle=True)

# create a remote dispatcher to receive the dispatch the data
rd = RemoteDispatcher(address=("localhost", 5568), prefix=b'raw')

# create the callback
ip = ImageProcessor("dexela_image", subtrahend=image)
pt = PeakTracker("dexela_image", diameter=15, percentile=80, separation=100)
tl = TrackLinker(db=DB2, search_range=50)
bec = BestEffortCallback()
bec.disable_plots()
tl.subscribe(DB2.insert)
pt.subscribe(tl)
pt.subscribe(DB2.insert)
ip.subscribe(pt)
ip.subscribe(bec)

# subscribe the callback to the dispatcher
rd.subscribe(ip)

# start the server
if __name__ == "__main__":
    print("Start the server ...")
    rd.start()

To start the server, we will run:

python server.py

Then, this server will receive the data from “localhost:5568” with prefix “raw” and process the data using the callback defined in the script.

Pipe the data into the database

We can also publish the data in the database to the server using Publisher. Here, we show an example to publish our data to “localhost:5567”. The server will process the data just like it is from the beamline devices.

pub = Publisher("localhost", 5567)
with Filler(
    handler_registry=dict(DB.reg.handler_reg),
    inplace=True,
    root_map=DB.reg.root_map
) as filler:
    for name, doc in run.documents(fill=False):
        filler(name, doc)
        pub(name, doc)