These instructions only apply for running the pipeline on site. Technically, the buckets system is very deprecated and this pipeline should be integrated with workflow instead (like running the pipeline on CANFAR currently is), but Kaitlyn is spread too thin and won't be working towards this for the foreseeable future. If you want to make the baseband pipeline workflow-compatible on site, send her and/or the project office a message!
Running the baseband localization pipeline on an event¶
You just need to add an event number to the bucket: baseband-pipeline
which can be done from anywhere you have your FRB Master access and refresh tokens (e.g., opening up IPython on frb-analysis):
In [1]: import chime_frb_api
In [2]: buck = chime_frb_api.bucket.Bucket(base_url="https://frb.chimenet.ca/maestro/buckets")
In [3]: work = {"event_number": 195718715}
In [4]: buck.deposit('baseband-pipeline', work=work, priority="low")
Out[4]: [True]
The default priority for an event is low
, but can also be medium
or high
. In this case, events with high
priority will be analyzed first after finishing the current event.
When you submit an event as above, it will go through the baseband pipeline in these stages, with the beamforming stages taking the bulk of the time:
initialize
beamforming: refinement
merge: refinement
analysis: refinement
beamforming: localization
merge: localization
analysis: localization
beamforming: singlebeam
merge: singlebeam
analysis: waterfall
polarization
finalize
Modifying parameters¶
You can also add parameters to the work
object; a complete list can be found by running python baseband_analysis/pipelines/cluster/manage_pipeline.py start_tiedbeam -print_arguments
, which should return:
DIRECT ARGUMENTS - change the value of the single argument and have top priority
files_pattern: glob string for input files
files_per_group: number of files to be processed simultaneously for each job
job_size: number of jobs to run
cores: number of cores to use for each job
cal_file: gain file
ra: central RA (deg)
dec: central Dec (deg)
nx: number of beams along the x coordinate
ny: number of beams along the y coordinate
delta_x: space between beams along the x coordinate (deg)
delta_y: space between beams along the y coordinate (deg)
remove_raw_baseband: remove baseband data from single receivers from the beamformed file
inputs_file: path of the pickle file containing inputs on the receivers
toa: time of arrival of the burst in unix time
dm: DM of the burst (pc/cc)
output_root: directory to save the beamformed files (must end with /)
output_filename: name of the beamformed file
remove_beamform_baseband: remove beamformed baseband data from the beamformed file
power_downsample: downsample factor to calculate the power in each beam
dm_error: uncertainty on the DM (pc/cc)
width: width of the burst (s)
event_date: date of the event, in ISOT
telescope_rotation: correct the rotation of the telescope (deg)
beam_pattern: a keyword for the shape of the pattern of beams, e.g. "grid", "refinement"
apply_dedispersion: run coherent dedispersion on the data
INDIRECT ARGUMENTS - set multiple parameters if not directly defined
baseband_pipeline: set default values for each pipeline. To know names and values, check baseband_analysis.pipeline.config.PIPELINE_ATTRS
get_parameters_from_h5: import some parameters from another beamformed file
parameters_from_pickle: path to a pickle file containing some parameters to load
Here's a quick example of what a modified work
object could look like:
work = {
"event_number": 195718715,
"run_stage": "singlebeam", # can be all/refinement/localization/singlebeam/polarization; default is "all". specifies what is actually run
"parameters":{
"ra": 156.78452056, "dec": 24.04382827,
"apply_dedispersion": False, "output_root": "/data/user-data/<user>/",
"baseband_pipeline": "singlebeam" # should only be used to set the default parameters for a specific stage. if all the stages will be run, this shouldn't be specified.
}
}
Try to always submit the work payload directly to the baseband-pipeline
bucket and just specify the stage, e.g., "run_stage: polarization"
. This would avoid issues where the same event is submitted with different priorities into the buckets and then the events conflict and the buckets crash, e.g.,
Do do this
Do not do this
# never submit work to other baseband-{pipeline} buckets. let the pipeline manager handle that
buck.deposit('baseband-polarization', work)
Monitoring an event submitted to the baseband pipeline¶
Baseband localization is a time-consuming process; from beginning to end, an event will typically take ~2-3 hours to fully process, though it could take up to ~4. This page will show what event (and what stages of the event) is running, as well as how many events are in the queue to run: https://frb.chimenet.ca/frb-web/baseband-pipeline
Monitoring the buckets payload / queue¶
In [1]: import chime_frb_api
In [2]: buck = chime_frb_api.bucket.Bucket(base_url="https://frb.chimenet.ca/maestro/buckets")
In [3]: buck.get_status() # lists all the options
Out[3]:
['header-localization',
'intensity-acquisition',
'intensity-calibration',
'intensity-fitburst',
'intensity-dm-pipeline',
'intensity-mcmc',
'intensity-ml',
'intensity-ml-classify',
'intensity-localization',
'intensity-refinement',
'baseband',
'baseband-acquisition',
'baseband-refinement',
'baseband-localization',
'baseband-singlebeam',
'calibrate-intensity',
'calculate-fluence',
'policy-evaluator-file-replica',
'policy-evaluator-file',
'baseband-finalize',
'baseband-analysis',
'baseband-merge',
'baseband-pipeline',
'baseband-beamform',
'baseband-initialize',
'canfar-baseband-initialize',
'canfar-baseband-beamform',
'baseband-polarization']
# put in the name of the pipeline you want; it'll give you what's in the bucket
# could be 'baseband-polarization' for example
In [4]: buck.get_status('baseband-pipeline')
Out[4]:
{'name': 'baseband-pipeline',
'distributors_state': {'high': {'name': 'high',
'status': {},
'stopped': False,
'items': []},
'medium': {'name': 'medium',
'status': {'{"event_number": 168011768}': {'status': 2,
'deposited_at': 1644359258,
'withdrawn_at': 1644359292,
'withdrawn_by': 'baseband-pipeline0.02108977264628875-1',
'expiry': 14400},
'{"event_number": 71628854}': {'status': 1, 'deposited_at': 1644359321}},
'stopped': False,
'items': ['{"event_number": 168011768}', '{"event_number": 71628854}']},
'low': {'name': 'low',
'status': {'{"event_number": 152308061, "run_stage": "polarization"}': {'status': 1,
'deposited_at': 1644361518}},
'stopped': False,
'items': ['{"event_number": 152308061, "run_stage": "polarization"}']}}}
There are no high priority events, two medium priority events, and one low priority event all in the queue.
Currently event 168011768
is running; after that concludes the entire pipeline, event 71628854
would run the entire pipeline, and then event 152308061
would run the polarization stage.
The 'deposited_at'
and 'withdrawn_at'
show unix timestamps.
Typically you'd only see a 'withdrawn_at'
key if the event is no longer in the queue (e.g., processing or concluded).
Removing an event from the baseband pipeline payload¶
This is simply done by using buck.conclude('<name of the bucket>', 'string of the work payload')
. You should do these steps after the steps in "Monitoring the buckets payload" above.
Say I restarted the baseband pipeline while event 168011768
was running. We may as well kill the event in the bucket so that the next events in the queue can run in the updated pipeline.
But you might still see it running on the baseband pipeline webpage. Look at the pipeline column so you can access the 'baseband-{pipeline}' bucket.
(It will be one of 'baseband-finalize', 'baseband-analysis', 'baseband-beamform', 'baseband-initialize', 'baseband-polarization', 'baseband-merge'
)
In [6]: buck.get_status('baseband-analysis')
Out[6]:
{'name': 'baseband-analysis',
'distributors_state': {'high': {'name': 'high',
'status': {},
'stopped': False,
'items': []},
'medium': {'name': 'medium', 'status': {}, 'stopped': False, 'items': []},
'low': {'name': 'low',
'status': {'{"event_number": 168011768, "baseband_pipeline": "refinement"}': {'status': 2,
'deposited_at': 1644362359,
'withdrawn_at': 1644362360,
'withdrawn_by': 'baseband-analysis0.5648397512555403-1',
'expiry': 7200}},
'stopped': False,
'items': ['{"event_number": 168011768, "baseband_pipeline": "refinement"}']}}}
In [7]: buck.conclude('baseband-analysis', '{"event_number": 168011768, "baseband_pipeline": "refinement"}')
Out[7]: True
In [8]: buck.get_status('baseband-analysis')
Out[8]:
{'name': 'baseband-analysis',
'distributors_state': {'high': {'name': 'high',
'status': {},
'stopped': False,
'items': []},
'medium': {'name': 'medium', 'status': {}, 'stopped': False, 'items': []},
'low': {'name': 'low',
'status': {'{"event_number": 168011768, "baseband_pipeline": "refinement"}': {'status': 3,
'deposited_at': 1644362359,
'withdrawn_at': 1644362360,
'withdrawn_by': 'baseband-analysis0.5648397512555403-1',
'expiry': 7200,
'concluded_at': 1644364158}},
'stopped': False,
'items': []}}}
The 'status'
key will help you identify whether or not an event has successfully concluded:
'status': 1
→ the event is in queue'status': 2
→ the event is processing'status': 3
→ the event has concluded
If the concluding doesn't behave as expected, then let's go nuclear!
In [9]: buck.delete_bucket('baseband-polarization') # gets rid of everything in the bucket, and we'd have to resubmit any events currently in the middle of processing
Out [9]: True
In [10]: buck.create_bucket('baseband-polarization') # recreates the bucket
Out [10]: True
I also do that if buck.get_status('baseband-pipeline')
returns a lot of crap:
for buckname in ['baseband-finalize','baseband-analysis','baseband-merge','baseband-pipeline','baseband-beamform','baseband-initialize']:
buck.delete_bucket(buckname)
buck.create_bucket(buckname)
Monitoring logs¶
There are two types of logs — service logs and event logs.
Service logs¶
Check these first if an event is taking much longer to process than you expected, and/or if it appears to be restarting itself.
- Go on the analysis cluster page: https://frb.chimenet.ca/frb-web/cluster
- Then search for "baseband" in the search bar
- Click on Logs for the service you want to monitor (they're automatically refreshed every ten seconds or so)
Event logs¶
- Go to the baseband pipeline page: https://frb.chimenet.ca/frb-web/baseband-pipeline
- Click on "show logs" for the stage of the event you're running
The logs may take a while to load. If there's an error, it's likely in the viewer code or in the logs themselves (i.e., the filenames are wrong and that's why you can't view them) — here's how you can figure out what's going on:
- Right click on the browser and select inspect
- Go to console and look for errors
For example, there might be an error in the console like GET [link] 500 (Internal Server Error)
and the link looks something like "https://frb.chimenet.ca/frb-master/v1/events/query-file?filename=/data/chime/baseband/processed/2021/11/05/astro_195718715/logs/baseband-195718715-finalize.log".
Then you could go check and see whether that file exists in the path directory (the 500 error is probably because it doesn't exist, so maybe there's a mismatch between the name of the file log that exists and what the viewer is trying to load). When in doubt, check the path directory.
Making sure an event is written to FRB Master¶
Again, this can be run anywhere you have your FRB Master access and refresh tokens.
In [1]: from chime_frb_api.backends import frb_master
In [2]: master = frb_master.FRBMaster()
In [3]: event = master.events.get_event(195718715) # of course, can replace with any <event_number>
In [4]: event['measured_parameters']
Out[4]: # output here.
The most recent run should be the last element in the event['measured_parameters']
list, and you should see within it 'pipeline': {'name': 'baseband', 'status': 'COMPLETE', ...}
as well as the correct datetime.
(Re)starting the baseband pipeline¶
These are (hopefully temporary) instructions as of Feb 21, 2023, right now mainly a guide for Kaitlyn.
Ideally, before restarting the pipeline, there aren't any events currently running -- you can check that here: https://frb.chimenet.ca/frb-web/baseband-pipeline. (See below subsection for what to do if you want to restart the pipeline while an event is running.)
Assuming the code is properly merged into the main
branch and all tests are passed, but the updated code isn't yet reflected in the baseband-analysis:latest
image, we should rebuild the image. To rebuild the image, go to the baseband-analysis
repo and run these lines:
export DOCKER_BUILDKIT=1
docker build -f Dockerfile --target analysis --tag chimefrb/baseband-analysis:latest --ssh github_ssh_key=~/.ssh/id_rsa .
docker push chimefrb/baseband-analysis:latest
Then go to frbadmin@frb-vsop:~/frb-devops/stacks/pipelines
. You can run these lines:
docker service ls | grep baseband
docker service rm baseband_analysis
docker service rm baseband_beamform
docker service rm baseband_finalize
docker service rm baseband_initialize
docker service rm baseband_merge
docker service rm baseband_polarization
docker service rm baseband_pipeline_manager
docker stack deploy -c baseband.yml baseband
To keep an eye on whether the services started up successfully:
and you can also look at live logs for individual services too, e.g.,
docker service logs baseband_pipeline_manager --follow
docker service logs baseband_initialize --follow
Alternatively, you could go to the cluster page, search for the 'baseband' services, and monitor them. Immediately after restarting the baseband pipeline (aka clicking on the above button), the services should appear to be "Preparing" (shaded in yellow). After a couple of minutes, they will be "Running" (green) and that's when you know the baseband pipeline's successfully restarted!
Restarting the baseband pipeline while an event is processing¶
Beta: the following script can be used to conclude all jobs running in any baseband container
import chime_frb_api
bucket_api = chime_frb_api.bucket.Bucket(base_url="https://frb.chimenet.ca/maestro/buckets")
buckets = bucket_api.get_status()
for bucket in buckets:
if bucket.startswith('baseband'):
status = bucket_api.get_status(bucket)
for priority in ['high', 'medium', 'low']:
for item in status['distributors_state'][priority]['status'].keys():
print(f"Concluding bucket {bucket}, item {item}")
bucket_api.conclude(bucket, item)
If this fails, the following instructions explain how to conclude jobs manually.
If there are lots of events in the queue and it's better to restart the pipeline immediately, there's some whack-a-mole to be done.
- Restart the pipeline through the frb-ops interface
- Monitor the cluster page to make sure all the baseband services are up and running
- As per the basic instructions for removing events from the payload here, start using the buckets interface.
- Do
buck.get_status('baseband-pipeline')
to check on the status of the event that was running when you restarted the pipeline. It should probably show"status: 2"
for "currently running" (the bucket memory is persistent, so it's better to manually make sure everything is consistent) buck.conclude('baseband-pipeline', '{"event_number": [event_number]}')
- Check the baseband pipeline webpage, and you'll likely see that blue circle processing for some pipeline stage (initialize, beamform, merge, analysis, polarization, finalize). Do
buck.get_status('baseband-[pipeline stage]')
to get the dictionary payload for that event. - Use
buck.conclude
to kill the hanging jobs. Below is an example for killing all 32 parallel jobs for the beamforming part of the refinement stage using a for-loop:
In [25]: for num in np.arange(32):
...: buck.conclude('baseband-beamform', '{"event_number": 72713970, "baseband_pipeline": "refinement", "job_id": '+str(num)+', "parameters": null}')
- Remember when I said "whack-a-mole"? If you kill the event during an earlier stage, the next stage will automatically "spawn" but still hang. Refer here for the order of processing -- if you kill the job while it's in its
beamforming: refinement
stage (as in the example above), you'll have to whack-a-mole away all the subsequent stages. - Eventually you will have "whack-a-mole"-d your way through until the end of the finalize stage. At this point,
buck.get_status('baseband-pipeline')
should show only one event with a"status: 2"
and that should match the number of the event that the baseband pipeline webpage claims is currently processing. Hopefully all went well, and you can breathe a sigh of relief now!
Integrated tests¶
Tests are automatically run upon opening a PR to main
(and also upon pushing to main
, even though that's probably redundant by now). First, pre-commit checks are run. Only upon those passing are the two sets of tests run. The simpler set of tests are run as github actions on a github container, and maybe take ~5 min total. (I think there's a finicky temperature test that sometimes randomly fails in the GHA tests.)
More elaborate pytests are marked with @pytest.mark.slow
; all the tests that use ~large datasets are marked as slow. These tests are run on a container hosted on drao (there's an idle, always-listening runner that spawns the testing container every time the tests are run through github CI). The files live on frb-baseband:/data/archive/chime/baseband/testing
and are mounted on the testing containers simply as /data/chime/baseband/processed/testing/
.
As of ~Mar 15 2023 the tests take ~40 min to completely run.
Further technical details¶
If you're frbadmin and you want to inspect the baseband services,
[frbadmin@frb-vsop ~]$ docker service ls | grep baseband
ypepapebrtv4 baseband_analysis replicated 1/1 chimefrb/baseband-localization:cluster
ggxy9laxbx0p baseband_beamform replicated 32/32 chimefrb/baseband-localization:cluster
cpfgedzo7eed baseband_finalize replicated 1/1 chimefrb/baseband-localization:cluster
ddyjwizkaayq baseband_initialize replicated 1/1 chimefrb/baseband-localization:cluster
pnwbypxqfg0f baseband_merge replicated 1/1 chimefrb/baseband-localization:cluster
3a4qyveqjn1b baseband_pipeline_manager replicated 1/1 chimefrb/baseband-localization:cluster
If restarting the baseband services from the ops page (https://frb.chimenet.ca/ops/operations) doesn't work, you can manually deploy the services with docker stack deploy -c baseband.yaml baseband docker/baseband_cluster_docker_compose.yml
. Then you should open an issue, because the button should work (lol)
For more technical details on the baseband pipeline, refer to this link: https://github.com/CHIMEFRB/baseband-analysis/blob/cluster_upgrades/docs/cluster.md
For changing aspects of the docker stack itself (i.e., allocated resources/cores/RAM/etc), go here and then submit a PR as necessary. (Or if you want to play it risky, just change the file directly on frb-vsop I guess...)
Quick tips¶
Finding an event directory¶
Each event processed by the baseband pipeline will have a unique directory based on its event number and directory, following the pattern /data/chime/baseband/processed/YYYY/MM/DD/astro_<event_no>
.
An easy way to find the directory an event number is associated with is the following command on frb-analysis: find /data/chime/baseband/processed -type d -iname astro_<event_no>
Generated files¶
The baseband pipeline generates three main .h5 files for each stage (e.g., refinement_plots/
subdirectory.
Each main .h5 file is created after merging sub-files created by the pipeline stage, which are usually found in the meta_products/
subdirectory.
To save time, if a .h5 sub-file exists, a new .h5 sub-file is not generated. (If all the sub-files in the meta_products
subdirectory already exist, the pipeline might take ~an hour to fully run.)
Of course, if we're testing new versions of the pipeline, the existence of old files could be problematic. That's why it's sometimes nice to move all the old files to a new subdirectory, e.g., Trial#_YYMMDD/
.
(If you don't have the permissions, a sneaky trick is to move all the files while in a docker container. Or just ping someone like Kaitlyn on Slack to move the files for you!)
When running the pipeline on an event, logs are also generated and can be found in the logs
subdirectory. If an event fails, it is useful to grep 'Error' *.log
in the subdirectory to see when and how the pipeline failed.
Manually obtaining localizations¶
Currently (as of Jan 2022) it is common for the baseband pipeline to successfully generate all the .h5 files and then fail at the finalize
stage, leading to no plots being produced in the plots
directory or localizations being written to L4.
In such a case, you can open a Jupyter notebook (e.g., https://github.com/CHIMEFRB/baseband-analysis/wiki/Usage-of-the-Docker-system#using-jupyter-notebook) and then run the following to obtain a localization:
from baseband_analysis.dev.localization import fit_location
fit_location(
'/data/chime/baseband/processed/YYYY/MM/DD/astro_<event_no>/localization_<event_no>.h5',
diagnostic_plots=True
)
The diagnostic plots will appear in the cell and provide visual confirmation of how reliable the resulting baseband localization may be.
The default output is RA, DEC, RA err (1 sigma), DEC err (1 sigma), rotation angle.
You can also add return_full=True
to get information such as SNR (second-to-last output) and DM (last output).
Localizations + errors should also be in the last line of /data/chime/baseband/processed/YYYY/MM/DD/astro_<event_no>/logs/baseband-<event_no>-localization_.log
.
Clearing the shown events on frb-web/baseband-pipeline¶
Beta: If the baseband pipeline monitoring page shows far too many events (ideally it should only show a minimal number, so a daemon is probably hanging), you can wait for there to be no events processing and then run the following code
import chime_frb_api
buck = chime_frb_api.bucket.Bucket(base_url="https://frb.chimenet.ca/maestro/buckets")
buck_names = ['baseband-finalize', 'baseband-analysis', 'baseband-beamform', 'baseband-initialize', 'baseband-polarization', 'baseband-merge']
for buckname in buck_names:
print(buckname)
buck.delete_bucket(buckname)
buck.create_bucket(buckname)
This should "conclude"/clear everything.
Bugs? Issues?¶
Lastly, if you run into any bugs or issues, you can check the troubleshooting FAQ page here: https://github.com/CHIMEFRB/baseband-analysis/wiki/Troubleshooting-the-baseband-pipeline
We are actively working on debugging the pipeline and you can comment on any relevant issues, or open new ones here: https://github.com/CHIMEFRB/pipelines_wg/projects/2