Skip to content

Tutorial

CHIME/FRB Cluster

In the tutorial provides information on how to, - Submit a job to the CHIME/FRB cluster - Get the job status - Continuously monitor the job - Get the log of the job - Kill the job

For the entirety of this tutorial, it is expected that the user has the latest version of the chime-frb-api installed with proper CHIME/FRB credentials active.

from chime_frb_api import frb_master

master = frb_master.FRBMaster()

Submit a job

Submitting a job to the CHIME/FRB Analysis cluster is done through the following method,

master.swarm.spawn_job(
    image_name="chimefrb/hello-world",
    command=["python"],
    arguments=["hello_world.py", "--debug"],
    job_name="hello-world",
)

where, - image name refers to the container image, e.g. chimefrb/maestro - command is the action to be executed, e.g. ["python"] - arguments are there user provided options, e.g. ["test.py", "--c", "config.yml"] - job_name is a unique name for the job. e.g. test-job-1

As an example,

master.swarm.spawn_job(
    image_name="alpine", command=["ls"], arguments=["/data/chime"], job_name="test"
)

In addition to these required parameters, this command also accepts the following optional parameters:

master.swarm.spawn_job(
    image_name="chimefrb/hello-world",
    command=[],
    arguments=[],
    job_name="hello-world",
    mount_archiver=True,
    swarm_network=True,
    job_mem_limit=4294967296,
    job_mem_reservation=268435456,
    job_cpu_limit=1,
    job_cpu_reservation=1,
)
where,

  • mount_archiver:bool mounts (The data folder is mounted on /data/chime/)
  • job_mem_limit:int=4294967296 (The memory limit is ~4.29 Gigabyte)
  • job_mem_reservation:int=268435456 (~268 Megabyte memory is reserved)
  • job_cpu_limit:float=1 (1 core is to be used)
  • job_cpu_reservation:float=1 (1 core is reserved to be used)

For baseband analysis jobs, you can also execute the following method, but for more information contact the the baseband analysis team.

master.swarm.spawn_baseband_job(
    event_number=568586754, task_name="beamform", arguments=[], job_id=1
)

Job Status

Use the following method, to get the status of any job on the cluster

master.swarm.get_job_status(job_name="test")
where - the input job_name is a string, given when the job was created.

A job can have one of the following status,

*    NEW         (The job was initialized.)
*    PENDING     (Resources for the job were allocated.)
*    ASSIGNED    (Docker assigned the job to nodes.)
*    ACCEPTED    (The job was accepted by a worker node.)
*    PREPARING   (Docker is preparing the job.)
*    STARTING    (Docker is starting the job.)
*    RUNNING     (The job is executing.)
*    COMPLETE    (The job exited without an error code.)
*    FAILED      (The job exited with an error code.)
*    SHUTDOWN    (Docker requested the job to shut down.)
*    REJECTED    (The worker node rejected the job.)
*    ORPHANED    (The node was down for too long.)
*    REMOVE      (The job is not terminal but the associated job was removed.)

To get the name of all currently running jobs on the analysis cluster.

master.swarm.get_jobs()
One could use these two methods above to track the status of individual jobs.

Running Jobs

master.swarm.jobs_running(job_names=["hello-1", "hello-2"])
monitors job[s] on the CHIME/FRB Analysis Cluster untill they are either COMPLETE, FAILED or SHUTDOWN.

Monitor Jobs

When the job is running, one could use

master.swarm.monitor_jobs(job_name=["hello-1", "hello-2"])
to continuously monitor the job[s] on the CHIME/FRB Analysis Cluster.

Job Logs

master.swarm.get_logs(job_name="hello-1")
returns logs from a CHIME/FRB Job.\ For example, the log of the submitted job "test":
master.swarm.get_logs("test")
gives the listed folders and timestamps:\
{'test': "b'2021-04-21T17:05:17.880273760Z baseband\\n'b'2021-04-21T17:05:17.880348870Z daily_gain_solutions\\n'b'2021-04-21T17:05:17.880357530Z intensity\\n'"}

Kill Job

  1. To remove (forcibly) the job with ANY status but with an exact match to job_name:

    master.swarm.kill_job(job_name="hello-1")
    

  2. To remove FAILED jobs with a regex match to job_name:

    master.swarm.kill_failed_jobs(job_name="hello-1")
    

  3. To remove COMPLETED jobs with a regex match to argument job_name:

    master.swarm.prune_jobs(job_name="hello-1")
    

More examples

How do you query information about FRBs (including repeaters) from the CHIME/FRB database?

The event ID, data path, and signal-to-noise ratio (S/N) of bursts associated with a particular repeater can be queried from chime-frb-api using the following python code (credit: Paul Scholz)

from chime_frb_api import frb_master


def get_event_datapath_and_snr(event):
    for par in event["measured_parameters"]:
        if par["pipeline"]["name"] == "realtime":
            event_date = par["datetime"].split(" ")[0].split("-")
            event_snr = par["snr"]
    data_path = (
        "/data/frb-archiver/"
        + event_date[0]
        + "/"
        + event_date[1]
        + "/"
        + event_date[2]
        + "/astro_"
        + str(event["id"])
        + "/"
        + "intensity/processed/"
    )
    return data_path, event_snr


master = (
    frb_master.FRBMaster()
)  # base_url="https://frb.chimenet.ca/frb-master" is not needed any more.
repeater_name = "<TNS name of FRB>"
repeater_source = master.sources.get_source(repeater_name)
candidates = master.candidates.get_all_candidates()

event_ids = []

for candidate in candidates:
    if candidate["id"] in repeater_source["candidate_id"]:
        event_ids += candidate["events"]

for event_id in event_ids:
    event = master.events.get_event(event_id)
    data_path, snr = get_event_datapath_and_snr(event)
    print(event_id, data_path, snr)
The python code provided above can be used to return the event_id, data_path, and snr of each burst in the CHIME/FRB database associated with a particular TNS. There are other properties that can be returned as well, which can be performed analogously by querying the desired parameters from the event_id object.

How do you get access to the raw (msgpack) and processed CHIME/FRB data?

  1. Data archiving locations on site:
    /data/<TELESCOPE>/<DATA>/{raw/processed}/
    
  2. Raw intensity data (secured using read-only everywhere):
     /data/chime/intensity/raw
     Event dumps: /YYYY/MM/DD/astro_<event_no>/<beam_no>
     Intensity streams: /acq_data/
    
  3. Processed intensity data:
     Processed event data: /YYYY/MM/DD/astro_<event_no>/<beam_no>
     Reduced calibration products, other pipelines...
    
  4. Raw baseband data (secured using read-only everywhere):
    Event dumps: /YYYY/MM/DD/astro_<event_no>/*.hdf5
    
  5. Processed baseband data:

    Processed event data: /YYYY/MM/DD/astro_<event_no>
    

  6. Scratch space for users doing other analysis:

     /data/user-data
    

  7. The data may also be available at CANFAR:
    /arc/projects/chime_frb/data/<TELESCOPE>/<DATA>/{raw/processed}/
    

Additional details and information can be found at: https://github.com/CHIMEFRB/frb-ops/issues/126

How do you get events that have baseband data?

import chime_frb_api, requests

master = chime_frb_api.frb_master.FRBMaster()
master.API.authorize()
auth = {"authorization": master.API.access_token}

r = requests.get(
    "https://frb.chimenet.ca/frb-master/v1/verification/get-verifications/NEW SOURCE",
    headers=auth,
)
events = r.json()

events_with_bb = []
for event in events:
    if event["data_status"]["baseband_data"]:
        events_with_bb.append(event["event_id"])

How do you get events classified by tsar as FRBs/RFI/REPEATERS/KNOWN?

Replace the "XXX" in the following script to:

KNOWN%20CANDIDATE: to get REPEATERs; FAINT: to get Faint sources; KNOWN%20SOURCE: to get Galactic source(pulsar); New%20CANDIDATE: to get FRB; RFI: to get RFI (Note that the XXX is in a URL, which cannot accept spaces. Spaces are replaced with %20.

import chime_frb_api, requests

master = chime_frb_api.frb_master.FRBMaster()
master.API.authorize()
auth = {"authorization": master.API.access_token}

r = requests.get(
    "https://frb.chimenet.ca/frb-master/v1/verification/get-verifications/XXX",
    headers=auth,
)
# "XXX" is the user classification, which is in the content of: event['user_verification'][-1]['classification']"

events = r.json()

events_selected = []
for event in events:
    events_selected.append(event["event_id"])

How to provide publication ready waterfall

  • Quest waterfall data:
Script: get_wfall.py

import click
import numpy as np
from iautils import cascade, spectra_utils


@click.command()
@click.option("--eventid", type=click.INT)
@click.option("--dm", type=click.FLOAT)
@click.option("--nsub", type=click.INT)
def wfall_maker(eventid, dm, nsub):
    event_max = cascade.make_cascade_from_eventid(eventid, beams="max")
    event_max.max_beam.intensity = spectra_utils.scale_chime_data(
        event_max.max_beam.intensity
    )
    event_max.dm = dm
    event_max.max_beam.subband(nsub)
    fname = "{}_wfall.npz".format(eventid)
    print("save to:", fname)
    np.savez(
        fname,
        intensity=event_max.max_beam.intensity,
        dt=event_max.max_beam.dt,
        df=event_max.max_beam.df,
        dm=event_max.max_beam.dm,
        fbottom=event_max.max_beam.fbottom,
    )


if __name__ == "__main__":
    wfall_maker()

Example command:

python get_wfall.py --eventid 69541452 --dm 87 --nsub 128
nsub is the number of channels, which could also be specified in the plotting script.

The waterfall would by default be saved as EVENTID_wfall.npz

  • Generate single waterfall:
Script:plot_intensity_wfalls.py

import matplotlib.pyplot as plt
import numpy as np

from frb_common import common_utils
from iautils import spectra

import argparse
parser=argparse.ArgumentParser()
parser.add_argument("--fname",dest='fname',required=True,metavar="FILE",help="the path and name of the npz file containing waterfall, acquired from get_wfall.py")
parser.add_argument("--dm",dest='dm',type=float,help="the targeted DM, default set to the value in npz file")
parser.add_argument("--figname",dest='figname',metavar="FILE",help="the path and name of the figure file")
args = parser.parse_args()



def plot_wfall(fname,ax,dm=None):
        '''
        fname: the numpy file contains waterfall
        ax: 2 element, 0 for the time profile, 1 for the waterfall
        dm: the targeted dm, default is None, it will use the DM from the waterfall file.
        '''
        data = np.load(fname)

        intensity = data["intensity"]
        dt = data["dt"]
        df = data["df"]
        fbottom = data["fbottom"]
        dmtrial = data["dm"]
        print('DM',dm)

        weights = np.ones_like(intensity, dtype=float)
        weights[intensity == 0.] = 0.

        spec = spectra.Spectra(intensity, common_utils.freq_bottom_mhz,
            df, 0.0, dt, weights=weights, dm=dmtrial)
        if dm is not None:
            spec.dedisperse(dm)

        #################################################
        ###ADD MASK here!!
        ################################################
        #mask = np.argwhere(np.abs(np.nansum(spec.intensity, axis=1)) \
        #    > 2)
        #spec.intensity[mask,...] = np.nan
        #mask = np.argwhere(np.abs(np.nanvar(spec.intensity, axis=1)) \
        #    > 0.05)
        #spec.intensity[mask,...] = np.nan

        spec.subband(128)

        spec.intensity[spec.intensity == 0.] = np.nan

        # additional masking
        #mask = np.argwhere(np.nanstd(spec.intensity, axis=1) > 0.2)
        #spec.intensity[mask,...] = np.nan

        ts = np.nansum(spec.intensity, axis=0)
        pulse_peak_idx = np.argmax(ts)
        window = int(np.ceil(0.025 / dt))

        ###
        if pulse_peak_idx<window:
            pulse_peak_idx=window-1

        vmin = np.nanpercentile(
            spec.intensity[...,pulse_peak_idx-window+1:pulse_peak_idx+window],
            #100.-95.)
            #100.-99.73)
            2.5)
        vmax = np.nanpercentile(
            spec.intensity[...,pulse_peak_idx-window-40:pulse_peak_idx-window+1],
            #95.)
            #99.73)
            100-2.5)

        plot_times = (spec.times[pulse_peak_idx-window+1:pulse_peak_idx+window] \
            - spec.center_times[pulse_peak_idx]) * 1000.0  # ms
        plot_times = np.append(plot_times, plot_times[-1] + spec.dt)
        ax[0].plot(plot_times,
            np.append(ts[pulse_peak_idx-window+1:pulse_peak_idx+window],
                ts[pulse_peak_idx-window+1:pulse_peak_idx+window][-1]),
                lw=1, drawstyle="steps-post", color="tab:blue")
        ax[1].imshow(
            spec.intensity[...,pulse_peak_idx-window+1:pulse_peak_idx+window],
            origin="lower", aspect="auto", interpolation="nearest",
            extent=(plot_times.min(), plot_times.max(), fbottom, fbottom+400.),
            vmin=vmin, vmax=vmax, cmap="viridis")
        ax[1].set_xlabel("Time [ms]", fontsize=9)

        xpos = (plot_times.max() - plot_times.min()) * 0.03 + plot_times.min()
        ylim = ax[0].get_ylim()
        ypos = (ylim[1] - ylim[0]) * 0.93 + ylim[0]

        ax[0].set_xlim(plot_times.min(), plot_times.max())
        ax[1].set_xlim(plot_times.min(), plot_times.max())
        ax[0].set_yticklabels([], visible=False)
        ax[0].set_yticks([])
        ax[0].set_xticklabels([], visible=False)
        ax[1].set_xticks([-20, 0, 20])
        ax[1].set_xticklabels([-20, 0, 20], fontsize=9)

        ax[1].set_yticks([400, 500, 600, 700, 800])
        ax[1].set_yticklabels([400, 500, 600, 700, 800], fontsize=9)

        ax[1].set_ylabel("Frequency [MHz]", fontsize=9)

if __name__ == "__main__":

    fig, ax = plt.subplots(2, figsize=(4.26, 3),
        gridspec_kw={"height_ratios": [1, 3],
                     "hspace": 0, "wspace": 0.1})
    plot_wfall(args.fname,ax,dm=args.dm)
    #plot_wfall()
    #ax[0].set_title("{}".format(tns_name, fontsize=9))
    if args.figname is None:
        figname=args.fname.replace('.npz','.pdf')
    plt.savefig(figname)
    print('waterfall save to:',figname)

Example command:

plot_intensity_wfalls.py --fname 145207799_wfall.npz --dm 87.81 --figname 145207799_wfall.pdf
If additional masks are needed, please modify the corresponding section in the code (marked in the code, starting from Line 42)

  • Generate figures containing multiple waterfall:
Script:plot_intensity_wfalls_multi.py

import matplotlib.pyplot as plt
import numpy as np

from frb_common import common_utils
from iautils import spectra


if __name__ == "__main__":
    fnames = ["69541452_wfall.npz", "101903385_wfall.npz", "145207799_wfall.npz"]
    tns_names = ["20200120E", "20200718A", "20201129A"]

    dm_goals = [87.76, 87.86, 87.81]
    outname = "wfall.pdf"

    fig, ax = plt.subplots(
        2,
        3,
        figsize=(4.26, 3),
        gridspec_kw={
            "height_ratios": [1, 3],
            "width_ratios": [1, 1, 1],
            "hspace": 0,
            "wspace": 0.1,
        },
    )

    for i, fname in enumerate(fnames):
        data = np.load(fname)

        intensity = data["intensity"]
        dt = data["dt"]
        df = data["df"]
        fbottom = data["fbottom"]
        dm = data["dm"]

        weights = np.ones_like(intensity, dtype=float)
        weights[intensity == 0.0] = 0.0

        spec = spectra.Spectra(
            intensity, common_utils.freq_bottom_mhz, df, 0.0, dt, weights=weights, dm=dm
        )

        spec.dedisperse(dm_goals[i])

        mask = np.argwhere(np.abs(np.nansum(spec.intensity, axis=1)) > 2)
        spec.intensity[mask, ...] = np.nan
        mask = np.argwhere(np.abs(np.nanvar(spec.intensity, axis=1)) > 0.05)
        spec.intensity[mask, ...] = np.nan

        spec.subband(128)

        spec.intensity[spec.intensity == 0.0] = np.nan

        # additional masking
        # mask = np.argwhere(np.nanstd(spec.intensity, axis=1) > 0.2)
        # spec.intensity[mask,...] = np.nan

        ts = np.nansum(spec.intensity, axis=0)
        pulse_peak_idx = np.argmax(ts)
        window = int(np.ceil(0.025 / dt))

        vmin = np.nanpercentile(
            spec.intensity[..., pulse_peak_idx - window + 1 : pulse_peak_idx + window],
            # 100.-95.)
            # 100.-99.73)
            2.5,
        )
        vmax = np.nanpercentile(
            spec.intensity[
                ..., pulse_peak_idx - window - 40 : pulse_peak_idx - window + 1
            ],
            # 95.)
            # 99.73)
            100 - 2.5,
        )

        plot_times = (
            spec.times[pulse_peak_idx - window + 1 : pulse_peak_idx + window]
            - spec.center_times[pulse_peak_idx]
        ) * 1000.0  # ms
        plot_times = np.append(plot_times, plot_times[-1] + spec.dt)
        ax[0, i].plot(
            plot_times,
            np.append(
                ts[pulse_peak_idx - window + 1 : pulse_peak_idx + window],
                ts[pulse_peak_idx - window + 1 : pulse_peak_idx + window][-1],
            ),
            lw=1,
            drawstyle="steps-post",
            color="tab:blue",
        )
        ax[1, i].imshow(
            spec.intensity[..., pulse_peak_idx - window + 1 : pulse_peak_idx + window],
            origin="lower",
            aspect="auto",
            interpolation="nearest",
            extent=(plot_times.min(), plot_times.max(), fbottom, fbottom + 400.0),
            vmin=vmin,
            vmax=vmax,
            cmap="viridis",
        )
        ax[1, i].set_xlabel("Time [ms]", fontsize=9)

        xpos = (plot_times.max() - plot_times.min()) * 0.03 + plot_times.min()
        ylim = ax[0, i].get_ylim()
        ypos = (ylim[1] - ylim[0]) * 0.93 + ylim[0]
        ax[0, i].set_title("{}".format(tns_names[i]), fontsize=9)

        ax[0, i].set_xlim(plot_times.min(), plot_times.max())
        ax[1, i].set_xlim(plot_times.min(), plot_times.max())
        ax[0, i].set_yticklabels([], visible=False)
        ax[0, i].set_yticks([])
        ax[0, i].set_xticklabels([], visible=False)
        if i > 0:
            plt.setp(ax[1, i].get_yticklabels(), visible=False)

        ax[1, i].set_xticks([-20, 0, 20])
        ax[1, i].set_xticklabels([-20, 0, 20], fontsize=9)

    ax[1, 0].set_yticks([400, 500, 600, 700, 800])
    ax[1, 0].set_yticklabels([400, 500, 600, 700, 800], fontsize=9)

    ax[1, 0].set_ylabel("Frequency [MHz]", fontsize=9)

    fig.suptitle("(a)", fontsize=9, y=1)

    plt.savefig(outname)

Modify line 10,11,14,15 for the waterfall file name, dm and output image name.