Tutorial¶
CHIME/FRB Cluster¶
In the tutorial provides information on how to, - Submit a job to the CHIME/FRB cluster - Get the job status - Continuously monitor the job - Get the log of the job - Kill the job
For the entirety of this tutorial, it is expected that the user has the latest version of the chime-frb-api
installed with proper CHIME/FRB credentials active.
Submit a job¶
Submitting a job to the CHIME/FRB Analysis cluster is done through the following method,
master.swarm.spawn_job(
image_name="chimefrb/hello-world",
command=["python"],
arguments=["hello_world.py", "--debug"],
job_name="hello-world",
)
where,
- image name
refers to the container image, e.g. chimefrb/maestro
- command
is the action to be executed, e.g. ["python"]
- arguments
are there user provided options, e.g. ["test.py", "--c", "config.yml"]
- job_name
is a unique name for the job. e.g. test-job-1
As an example,
master.swarm.spawn_job(
image_name="alpine", command=["ls"], arguments=["/data/chime"], job_name="test"
)
In addition to these required parameters, this command also accepts the following optional parameters:
master.swarm.spawn_job(
image_name="chimefrb/hello-world",
command=[],
arguments=[],
job_name="hello-world",
mount_archiver=True,
swarm_network=True,
job_mem_limit=4294967296,
job_mem_reservation=268435456,
job_cpu_limit=1,
job_cpu_reservation=1,
)
where,
- mount_archiver:bool mounts (The data folder is mounted on /data/chime/)
- job_mem_limit:int=4294967296 (The memory limit is ~4.29 Gigabyte)
- job_mem_reservation:int=268435456 (~268 Megabyte memory is reserved)
- job_cpu_limit:float=1 (1 core is to be used)
- job_cpu_reservation:float=1 (1 core is reserved to be used)
For baseband analysis jobs, you can also execute the following method, but for more information contact the the baseband analysis team.
master.swarm.spawn_baseband_job(
event_number=568586754, task_name="beamform", arguments=[], job_id=1
)
Job Status¶
Use the following method, to get the status of any job on the cluster
where
- the input job_name
is a string, given when the job was created.
A job can have one of the following status,
* NEW (The job was initialized.)
* PENDING (Resources for the job were allocated.)
* ASSIGNED (Docker assigned the job to nodes.)
* ACCEPTED (The job was accepted by a worker node.)
* PREPARING (Docker is preparing the job.)
* STARTING (Docker is starting the job.)
* RUNNING (The job is executing.)
* COMPLETE (The job exited without an error code.)
* FAILED (The job exited with an error code.)
* SHUTDOWN (Docker requested the job to shut down.)
* REJECTED (The worker node rejected the job.)
* ORPHANED (The node was down for too long.)
* REMOVE (The job is not terminal but the associated job was removed.)
To get the name of all currently running jobs on the analysis cluster.
One could use these two methods above to track the status of individual jobs.
Running Jobs¶
monitors job[s] on the CHIME/FRB Analysis Cluster untill they are either COMPLETE, FAILED or SHUTDOWN.
Monitor Jobs¶
When the job is running, one could use
to continuously monitor the job[s] on the CHIME/FRB Analysis Cluster.
Job Logs¶
returns logs from a CHIME/FRB Job.\ For example, the log of the submitted job "test":
gives the listed folders and timestamps:\
{'test': "b'2021-04-21T17:05:17.880273760Z baseband\\n'b'2021-04-21T17:05:17.880348870Z daily_gain_solutions\\n'b'2021-04-21T17:05:17.880357530Z intensity\\n'"}
Kill Job¶
- To remove (forcibly) the job with ANY status but with an exact match to job_name:
- To remove FAILED jobs with a regex match to job_name:
- To remove COMPLETED jobs with a regex match to argument job_name:
More examples¶
How do you query information about FRBs (including repeaters) from the CHIME/FRB database?¶
The event ID, data path, and signal-to-noise ratio (S/N) of bursts associated with a particular repeater can be queried from chime-frb-api using the following python code (credit: Paul Scholz)
from chime_frb_api import frb_master
def get_event_datapath_and_snr(event):
for par in event["measured_parameters"]:
if par["pipeline"]["name"] == "realtime":
event_date = par["datetime"].split(" ")[0].split("-")
event_snr = par["snr"]
data_path = (
"/data/frb-archiver/"
+ event_date[0]
+ "/"
+ event_date[1]
+ "/"
+ event_date[2]
+ "/astro_"
+ str(event["id"])
+ "/"
+ "intensity/processed/"
)
return data_path, event_snr
master = (
frb_master.FRBMaster()
) # base_url="https://frb.chimenet.ca/frb-master" is not needed any more.
repeater_name = "<TNS name of FRB>"
repeater_source = master.sources.get_source(repeater_name)
candidates = master.candidates.get_all_candidates()
event_ids = []
for candidate in candidates:
if candidate["id"] in repeater_source["candidate_id"]:
event_ids += candidate["events"]
for event_id in event_ids:
event = master.events.get_event(event_id)
data_path, snr = get_event_datapath_and_snr(event)
print(event_id, data_path, snr)
The python code provided above can be used to return the event_id, data_path, and snr of each burst in the CHIME/FRB database associated with a particular TNS. There are other properties that can be returned as well, which can be performed analogously by querying the desired parameters from the event_id object.
How do you get access to the raw (msgpack) and processed CHIME/FRB data?¶
- Data archiving locations on site:
- Raw intensity data (secured using read-only everywhere):
/data/chime/intensity/raw
Event dumps: /YYYY/MM/DD/astro_<event_no>/<beam_no>
Intensity streams: /acq_data/
- Processed intensity data:
Processed event data: /YYYY/MM/DD/astro_<event_no>/<beam_no>
Reduced calibration products, other pipelines...
- Raw baseband data (secured using read-only everywhere):
- Processed baseband data:
- Scratch space for users doing other analysis:
- The data may also be available at CANFAR:
Additional details and information can be found at: https://github.com/CHIMEFRB/frb-ops/issues/126
How do you get events that have baseband data?¶
import chime_frb_api, requests
master = chime_frb_api.frb_master.FRBMaster()
master.API.authorize()
auth = {"authorization": master.API.access_token}
r = requests.get(
"https://frb.chimenet.ca/frb-master/v1/verification/get-verifications/NEW SOURCE",
headers=auth,
)
events = r.json()
events_with_bb = []
for event in events:
if event["data_status"]["baseband_data"]:
events_with_bb.append(event["event_id"])
How do you get events classified by tsar as FRBs/RFI/REPEATERS/KNOWN?¶
Replace the "XXX" in the following script to:
KNOWN%20CANDIDATE
: to get REPEATERs; FAINT
: to get Faint sources; KNOWN%20SOURCE
: to get Galactic source(pulsar); New%20CANDIDATE
: to get FRB; RFI
: to get RFI (Note that the XXX is in a URL, which cannot accept spaces. Spaces are replaced with %20.
import chime_frb_api, requests
master = chime_frb_api.frb_master.FRBMaster()
master.API.authorize()
auth = {"authorization": master.API.access_token}
r = requests.get(
"https://frb.chimenet.ca/frb-master/v1/verification/get-verifications/XXX",
headers=auth,
)
# "XXX" is the user classification, which is in the content of: event['user_verification'][-1]['classification']"
events = r.json()
events_selected = []
for event in events:
events_selected.append(event["event_id"])
How to provide publication ready waterfall¶
- Quest waterfall data:
Script: get_wfall.py
import click
import numpy as np
from iautils import cascade, spectra_utils
@click.command()
@click.option("--eventid", type=click.INT)
@click.option("--dm", type=click.FLOAT)
@click.option("--nsub", type=click.INT)
def wfall_maker(eventid, dm, nsub):
event_max = cascade.make_cascade_from_eventid(eventid, beams="max")
event_max.max_beam.intensity = spectra_utils.scale_chime_data(
event_max.max_beam.intensity
)
event_max.dm = dm
event_max.max_beam.subband(nsub)
fname = "{}_wfall.npz".format(eventid)
print("save to:", fname)
np.savez(
fname,
intensity=event_max.max_beam.intensity,
dt=event_max.max_beam.dt,
df=event_max.max_beam.df,
dm=event_max.max_beam.dm,
fbottom=event_max.max_beam.fbottom,
)
if __name__ == "__main__":
wfall_maker()
Example command:
nsub is the number of channels, which could also be specified in the plotting script.
The waterfall would by default be saved as EVENTID_wfall.npz
- Generate single waterfall:
Script:plot_intensity_wfalls.py
import matplotlib.pyplot as plt
import numpy as np
from frb_common import common_utils
from iautils import spectra
import argparse
parser=argparse.ArgumentParser()
parser.add_argument("--fname",dest='fname',required=True,metavar="FILE",help="the path and name of the npz file containing waterfall, acquired from get_wfall.py")
parser.add_argument("--dm",dest='dm',type=float,help="the targeted DM, default set to the value in npz file")
parser.add_argument("--figname",dest='figname',metavar="FILE",help="the path and name of the figure file")
args = parser.parse_args()
def plot_wfall(fname,ax,dm=None):
'''
fname: the numpy file contains waterfall
ax: 2 element, 0 for the time profile, 1 for the waterfall
dm: the targeted dm, default is None, it will use the DM from the waterfall file.
'''
data = np.load(fname)
intensity = data["intensity"]
dt = data["dt"]
df = data["df"]
fbottom = data["fbottom"]
dmtrial = data["dm"]
print('DM',dm)
weights = np.ones_like(intensity, dtype=float)
weights[intensity == 0.] = 0.
spec = spectra.Spectra(intensity, common_utils.freq_bottom_mhz,
df, 0.0, dt, weights=weights, dm=dmtrial)
if dm is not None:
spec.dedisperse(dm)
#################################################
###ADD MASK here!!
################################################
#mask = np.argwhere(np.abs(np.nansum(spec.intensity, axis=1)) \
# > 2)
#spec.intensity[mask,...] = np.nan
#mask = np.argwhere(np.abs(np.nanvar(spec.intensity, axis=1)) \
# > 0.05)
#spec.intensity[mask,...] = np.nan
spec.subband(128)
spec.intensity[spec.intensity == 0.] = np.nan
# additional masking
#mask = np.argwhere(np.nanstd(spec.intensity, axis=1) > 0.2)
#spec.intensity[mask,...] = np.nan
ts = np.nansum(spec.intensity, axis=0)
pulse_peak_idx = np.argmax(ts)
window = int(np.ceil(0.025 / dt))
###
if pulse_peak_idx<window:
pulse_peak_idx=window-1
vmin = np.nanpercentile(
spec.intensity[...,pulse_peak_idx-window+1:pulse_peak_idx+window],
#100.-95.)
#100.-99.73)
2.5)
vmax = np.nanpercentile(
spec.intensity[...,pulse_peak_idx-window-40:pulse_peak_idx-window+1],
#95.)
#99.73)
100-2.5)
plot_times = (spec.times[pulse_peak_idx-window+1:pulse_peak_idx+window] \
- spec.center_times[pulse_peak_idx]) * 1000.0 # ms
plot_times = np.append(plot_times, plot_times[-1] + spec.dt)
ax[0].plot(plot_times,
np.append(ts[pulse_peak_idx-window+1:pulse_peak_idx+window],
ts[pulse_peak_idx-window+1:pulse_peak_idx+window][-1]),
lw=1, drawstyle="steps-post", color="tab:blue")
ax[1].imshow(
spec.intensity[...,pulse_peak_idx-window+1:pulse_peak_idx+window],
origin="lower", aspect="auto", interpolation="nearest",
extent=(plot_times.min(), plot_times.max(), fbottom, fbottom+400.),
vmin=vmin, vmax=vmax, cmap="viridis")
ax[1].set_xlabel("Time [ms]", fontsize=9)
xpos = (plot_times.max() - plot_times.min()) * 0.03 + plot_times.min()
ylim = ax[0].get_ylim()
ypos = (ylim[1] - ylim[0]) * 0.93 + ylim[0]
ax[0].set_xlim(plot_times.min(), plot_times.max())
ax[1].set_xlim(plot_times.min(), plot_times.max())
ax[0].set_yticklabels([], visible=False)
ax[0].set_yticks([])
ax[0].set_xticklabels([], visible=False)
ax[1].set_xticks([-20, 0, 20])
ax[1].set_xticklabels([-20, 0, 20], fontsize=9)
ax[1].set_yticks([400, 500, 600, 700, 800])
ax[1].set_yticklabels([400, 500, 600, 700, 800], fontsize=9)
ax[1].set_ylabel("Frequency [MHz]", fontsize=9)
if __name__ == "__main__":
fig, ax = plt.subplots(2, figsize=(4.26, 3),
gridspec_kw={"height_ratios": [1, 3],
"hspace": 0, "wspace": 0.1})
plot_wfall(args.fname,ax,dm=args.dm)
#plot_wfall()
#ax[0].set_title("{}".format(tns_name, fontsize=9))
if args.figname is None:
figname=args.fname.replace('.npz','.pdf')
plt.savefig(figname)
print('waterfall save to:',figname)
Example command:
If additional masks are needed, please modify the corresponding section in the code (marked in the code, starting from Line 42)
- Generate figures containing multiple waterfall:
Script:plot_intensity_wfalls_multi.py
import matplotlib.pyplot as plt
import numpy as np
from frb_common import common_utils
from iautils import spectra
if __name__ == "__main__":
fnames = ["69541452_wfall.npz", "101903385_wfall.npz", "145207799_wfall.npz"]
tns_names = ["20200120E", "20200718A", "20201129A"]
dm_goals = [87.76, 87.86, 87.81]
outname = "wfall.pdf"
fig, ax = plt.subplots(
2,
3,
figsize=(4.26, 3),
gridspec_kw={
"height_ratios": [1, 3],
"width_ratios": [1, 1, 1],
"hspace": 0,
"wspace": 0.1,
},
)
for i, fname in enumerate(fnames):
data = np.load(fname)
intensity = data["intensity"]
dt = data["dt"]
df = data["df"]
fbottom = data["fbottom"]
dm = data["dm"]
weights = np.ones_like(intensity, dtype=float)
weights[intensity == 0.0] = 0.0
spec = spectra.Spectra(
intensity, common_utils.freq_bottom_mhz, df, 0.0, dt, weights=weights, dm=dm
)
spec.dedisperse(dm_goals[i])
mask = np.argwhere(np.abs(np.nansum(spec.intensity, axis=1)) > 2)
spec.intensity[mask, ...] = np.nan
mask = np.argwhere(np.abs(np.nanvar(spec.intensity, axis=1)) > 0.05)
spec.intensity[mask, ...] = np.nan
spec.subband(128)
spec.intensity[spec.intensity == 0.0] = np.nan
# additional masking
# mask = np.argwhere(np.nanstd(spec.intensity, axis=1) > 0.2)
# spec.intensity[mask,...] = np.nan
ts = np.nansum(spec.intensity, axis=0)
pulse_peak_idx = np.argmax(ts)
window = int(np.ceil(0.025 / dt))
vmin = np.nanpercentile(
spec.intensity[..., pulse_peak_idx - window + 1 : pulse_peak_idx + window],
# 100.-95.)
# 100.-99.73)
2.5,
)
vmax = np.nanpercentile(
spec.intensity[
..., pulse_peak_idx - window - 40 : pulse_peak_idx - window + 1
],
# 95.)
# 99.73)
100 - 2.5,
)
plot_times = (
spec.times[pulse_peak_idx - window + 1 : pulse_peak_idx + window]
- spec.center_times[pulse_peak_idx]
) * 1000.0 # ms
plot_times = np.append(plot_times, plot_times[-1] + spec.dt)
ax[0, i].plot(
plot_times,
np.append(
ts[pulse_peak_idx - window + 1 : pulse_peak_idx + window],
ts[pulse_peak_idx - window + 1 : pulse_peak_idx + window][-1],
),
lw=1,
drawstyle="steps-post",
color="tab:blue",
)
ax[1, i].imshow(
spec.intensity[..., pulse_peak_idx - window + 1 : pulse_peak_idx + window],
origin="lower",
aspect="auto",
interpolation="nearest",
extent=(plot_times.min(), plot_times.max(), fbottom, fbottom + 400.0),
vmin=vmin,
vmax=vmax,
cmap="viridis",
)
ax[1, i].set_xlabel("Time [ms]", fontsize=9)
xpos = (plot_times.max() - plot_times.min()) * 0.03 + plot_times.min()
ylim = ax[0, i].get_ylim()
ypos = (ylim[1] - ylim[0]) * 0.93 + ylim[0]
ax[0, i].set_title("{}".format(tns_names[i]), fontsize=9)
ax[0, i].set_xlim(plot_times.min(), plot_times.max())
ax[1, i].set_xlim(plot_times.min(), plot_times.max())
ax[0, i].set_yticklabels([], visible=False)
ax[0, i].set_yticks([])
ax[0, i].set_xticklabels([], visible=False)
if i > 0:
plt.setp(ax[1, i].get_yticklabels(), visible=False)
ax[1, i].set_xticks([-20, 0, 20])
ax[1, i].set_xticklabels([-20, 0, 20], fontsize=9)
ax[1, 0].set_yticks([400, 500, 600, 700, 800])
ax[1, 0].set_yticklabels([400, 500, 600, 700, 800], fontsize=9)
ax[1, 0].set_ylabel("Frequency [MHz]", fontsize=9)
fig.suptitle("(a)", fontsize=9, y=1)
plt.savefig(outname)
Modify line 10,11,14,15 for the waterfall file name, dm and output image name.