Skip to content

These instructions only apply for running the pipeline on site. Technically, the buckets system is very deprecated and this pipeline should be integrated with workflow instead (like running the pipeline on CANFAR currently is), but Kaitlyn is spread too thin and won't be working towards this for the foreseeable future. If you want to make the baseband pipeline workflow-compatible on site, send her and/or the project office a message!

Running the baseband localization pipeline on an event

You just need to add an event number to the bucket: baseband-pipeline which can be done from anywhere you have your FRB Master access and refresh tokens (e.g., opening up IPython on frb-analysis):

Python
In [1]: import chime_frb_api

In [2]: buck = chime_frb_api.bucket.Bucket(base_url="https://frb.chimenet.ca/maestro/buckets")

In [3]: work = {"event_number": 195718715}

In [4]: buck.deposit('baseband-pipeline', work=work, priority="low")
Out[4]: [True]

The default priority for an event is low, but can also be medium or high. In this case, events with high priority will be analyzed first after finishing the current event.

When you submit an event as above, it will go through the baseband pipeline in these stages, with the beamforming stages taking the bulk of the time:

  1. initialize
  2. beamforming: refinement
  3. merge: refinement
  4. analysis: refinement
  5. beamforming: localization
  6. merge: localization
  7. analysis: localization
  8. beamforming: singlebeam
  9. merge: singlebeam
  10. analysis: waterfall
  11. polarization
  12. finalize

Modifying parameters

You can also add parameters to the work object; a complete list can be found by running python baseband_analysis/pipelines/cluster/manage_pipeline.py start_tiedbeam -print_arguments, which should return:

Text Only
DIRECT ARGUMENTS - change the value of the single argument and have top priority
files_pattern: glob string for input files
files_per_group: number of files to be processed simultaneously for each job
job_size: number of jobs to run
cores: number of cores to use for each job
cal_file: gain file
ra: central RA (deg)
dec: central Dec (deg)
nx: number of beams along the x coordinate
ny: number of beams along the y coordinate
delta_x: space between beams along the x coordinate (deg)
delta_y: space between beams along the y coordinate (deg)
remove_raw_baseband: remove baseband data from single receivers from the beamformed file
inputs_file: path of the pickle file containing inputs on the receivers
toa: time of arrival of the burst in unix time
dm: DM of the burst (pc/cc)
output_root: directory to save the beamformed files (must end with /)
output_filename:  name of the beamformed file
remove_beamform_baseband: remove beamformed baseband data from the beamformed file
power_downsample: downsample factor to calculate the power in each beam
dm_error: uncertainty on the DM (pc/cc)
width: width of the burst (s)
event_date: date of the event, in ISOT
telescope_rotation: correct the rotation of the telescope (deg)
beam_pattern: a keyword for the shape of the pattern of beams, e.g. "grid", "refinement"
apply_dedispersion: run coherent dedispersion on the data

INDIRECT ARGUMENTS - set multiple parameters if not directly defined
baseband_pipeline: set default values for each pipeline. To know names and values, check baseband_analysis.pipeline.config.PIPELINE_ATTRS
get_parameters_from_h5: import some parameters from another beamformed file
parameters_from_pickle: path to a pickle file containing some parameters to load

Here's a quick example of what a modified work object could look like:

Python
work = {
    "event_number": 195718715,
    "run_stage": "singlebeam", # can be all/refinement/localization/singlebeam/polarization; default is "all". specifies what is actually run
    "parameters":{
                    "ra": 156.78452056, "dec": 24.04382827,
                    "apply_dedispersion": False, "output_root": "/data/user-data/<user>/",
                    "baseband_pipeline": "singlebeam" # should only be used to set the default parameters for a specific stage. if all the stages will be run, this shouldn't be specified.
    }
}

Try to always submit the work payload directly to the baseband-pipeline bucket and just specify the stage, e.g., "run_stage: polarization". This would avoid issues where the same event is submitted with different priorities into the buckets and then the events conflict and the buckets crash, e.g., Do do this

Python
 # always submit work to the baseband-pipeline bucket
buck.deposit('baseband-pipeline', work)

Do not do this

Python
 # never submit work to other baseband-{pipeline} buckets. let the pipeline manager handle that
buck.deposit('baseband-polarization', work)

Monitoring an event submitted to the baseband pipeline

Baseband localization is a time-consuming process; from beginning to end, an event will typically take ~2-3 hours to fully process, though it could take up to ~4. This page will show what event (and what stages of the event) is running, as well as how many events are in the queue to run: https://frb.chimenet.ca/frb-web/baseband-pipeline

Monitoring the buckets payload / queue

Python
In [1]: import chime_frb_api

In [2]: buck = chime_frb_api.bucket.Bucket(base_url="https://frb.chimenet.ca/maestro/buckets")

In [3]: buck.get_status() # lists all the options
Out[3]:
['header-localization',
 'intensity-acquisition',
 'intensity-calibration',
 'intensity-fitburst',
 'intensity-dm-pipeline',
 'intensity-mcmc',
 'intensity-ml',
 'intensity-ml-classify',
 'intensity-localization',
 'intensity-refinement',
 'baseband',
 'baseband-acquisition',
 'baseband-refinement',
 'baseband-localization',
 'baseband-singlebeam',
 'calibrate-intensity',
 'calculate-fluence',
 'policy-evaluator-file-replica',
 'policy-evaluator-file',
 'baseband-finalize',
 'baseband-analysis',
 'baseband-merge',
 'baseband-pipeline',
 'baseband-beamform',
 'baseband-initialize',
 'canfar-baseband-initialize',
 'canfar-baseband-beamform',
 'baseband-polarization']

# put in the name of the pipeline you want; it'll give you what's in the bucket
# could be 'baseband-polarization' for example
In [4]: buck.get_status('baseband-pipeline')
Out[4]:
{'name': 'baseband-pipeline',
 'distributors_state': {'high': {'name': 'high',
   'status': {},
   'stopped': False,
   'items': []},
  'medium': {'name': 'medium',
   'status': {'{"event_number": 168011768}': {'status': 2,
     'deposited_at': 1644359258,
     'withdrawn_at': 1644359292,
     'withdrawn_by': 'baseband-pipeline0.02108977264628875-1',
     'expiry': 14400},
    '{"event_number": 71628854}': {'status': 1, 'deposited_at': 1644359321}},
   'stopped': False,
   'items': ['{"event_number": 168011768}', '{"event_number": 71628854}']},
  'low': {'name': 'low',
   'status': {'{"event_number": 152308061, "run_stage": "polarization"}': {'status': 1,
     'deposited_at': 1644361518}},
   'stopped': False,
   'items': ['{"event_number": 152308061, "run_stage": "polarization"}']}}}

There are no high priority events, two medium priority events, and one low priority event all in the queue. Currently event 168011768 is running; after that concludes the entire pipeline, event 71628854 would run the entire pipeline, and then event 152308061 would run the polarization stage. The 'deposited_at' and 'withdrawn_at' show unix timestamps. Typically you'd only see a 'withdrawn_at' key if the event is no longer in the queue (e.g., processing or concluded).

Removing an event from the baseband pipeline payload

This is simply done by using buck.conclude('<name of the bucket>', 'string of the work payload'). You should do these steps after the steps in "Monitoring the buckets payload" above.

Say I restarted the baseband pipeline while event 168011768 was running. We may as well kill the event in the bucket so that the next events in the queue can run in the updated pipeline.

Python
In [5]: buck.conclude('baseband-pipeline', '{"event_number": 168011768}')
Out[5]: True

But you might still see it running on the baseband pipeline webpage. Look at the pipeline column so you can access the 'baseband-{pipeline}' bucket. (It will be one of 'baseband-finalize', 'baseband-analysis', 'baseband-beamform', 'baseband-initialize', 'baseband-polarization', 'baseband-merge')

Python
In [6]: buck.get_status('baseband-analysis')
Out[6]:
{'name': 'baseband-analysis',
 'distributors_state': {'high': {'name': 'high',
   'status': {},
   'stopped': False,
   'items': []},
  'medium': {'name': 'medium', 'status': {}, 'stopped': False, 'items': []},
  'low': {'name': 'low',
   'status': {'{"event_number": 168011768, "baseband_pipeline": "refinement"}': {'status': 2,
     'deposited_at': 1644362359,
     'withdrawn_at': 1644362360,
     'withdrawn_by': 'baseband-analysis0.5648397512555403-1',
     'expiry': 7200}},
   'stopped': False,
   'items': ['{"event_number": 168011768, "baseband_pipeline": "refinement"}']}}}

In [7]: buck.conclude('baseband-analysis', '{"event_number": 168011768, "baseband_pipeline": "refinement"}')
Out[7]: True

In [8]: buck.get_status('baseband-analysis')
Out[8]:
{'name': 'baseband-analysis',
 'distributors_state': {'high': {'name': 'high',
   'status': {},
   'stopped': False,
   'items': []},
  'medium': {'name': 'medium', 'status': {}, 'stopped': False, 'items': []},
  'low': {'name': 'low',
   'status': {'{"event_number": 168011768, "baseband_pipeline": "refinement"}': {'status': 3,
     'deposited_at': 1644362359,
     'withdrawn_at': 1644362360,
     'withdrawn_by': 'baseband-analysis0.5648397512555403-1',
     'expiry': 7200,
     'concluded_at': 1644364158}},
   'stopped': False,
   'items': []}}}

The 'status' key will help you identify whether or not an event has successfully concluded:

  • 'status': 1 → the event is in queue
  • 'status': 2 → the event is processing
  • 'status': 3 → the event has concluded

If the concluding doesn't behave as expected, then let's go nuclear!

Python
In [9]: buck.delete_bucket('baseband-polarization') # gets rid of everything in the bucket, and we'd have to resubmit any events currently in the middle of processing
Out [9]: True

In [10]: buck.create_bucket('baseband-polarization') # recreates the bucket
Out [10]: True

I also do that if buck.get_status('baseband-pipeline') returns a lot of crap:

Python
for buckname in ['baseband-finalize','baseband-analysis','baseband-merge','baseband-pipeline','baseband-beamform','baseband-initialize']:
    buck.delete_bucket(buckname)
    buck.create_bucket(buckname)

Monitoring logs

There are two types of logs — service logs and event logs.

Service logs

Check these first if an event is taking much longer to process than you expected, and/or if it appears to be restarting itself.

  • Go on the analysis cluster page: https://frb.chimenet.ca/frb-web/cluster
  • Then search for "baseband" in the search bar
  • Click on Logs for the service you want to monitor (they're automatically refreshed every ten seconds or so)

Event logs

The logs may take a while to load. If there's an error, it's likely in the viewer code or in the logs themselves (i.e., the filenames are wrong and that's why you can't view them) — here's how you can figure out what's going on:

  • Right click on the browser and select inspect
  • Go to console and look for errors

For example, there might be an error in the console like GET [link] 500 (Internal Server Error) and the link looks something like "https://frb.chimenet.ca/frb-master/v1/events/query-file?filename=/data/chime/baseband/processed/2021/11/05/astro_195718715/logs/baseband-195718715-finalize.log". Then you could go check and see whether that file exists in the path directory (the 500 error is probably because it doesn't exist, so maybe there's a mismatch between the name of the file log that exists and what the viewer is trying to load). When in doubt, check the path directory.

Making sure an event is written to FRB Master

Again, this can be run anywhere you have your FRB Master access and refresh tokens.

Python
In [1]: from chime_frb_api.backends import frb_master

In [2]: master = frb_master.FRBMaster()

In [3]: event = master.events.get_event(195718715) # of course, can replace with any <event_number>

In [4]: event['measured_parameters']
Out[4]: # output here.

The most recent run should be the last element in the event['measured_parameters'] list, and you should see within it 'pipeline': {'name': 'baseband', 'status': 'COMPLETE', ...} as well as the correct datetime.

(Re)starting the baseband pipeline

These are (hopefully temporary) instructions as of Feb 21, 2023, right now mainly a guide for Kaitlyn.

Ideally, before restarting the pipeline, there aren't any events currently running -- you can check that here: https://frb.chimenet.ca/frb-web/baseband-pipeline. (See below subsection for what to do if you want to restart the pipeline while an event is running.)

Assuming the code is properly merged into the main branch and all tests are passed, but the updated code isn't yet reflected in the baseband-analysis:latest image, we should rebuild the image. To rebuild the image, go to the baseband-analysis repo and run these lines:

Text Only
export DOCKER_BUILDKIT=1
docker build -f Dockerfile --target analysis --tag chimefrb/baseband-analysis:latest --ssh github_ssh_key=~/.ssh/id_rsa .
docker push chimefrb/baseband-analysis:latest

Then go to frbadmin@frb-vsop:~/frb-devops/stacks/pipelines. You can run these lines:

Text Only
docker service ls | grep baseband
docker service rm baseband_analysis
docker service rm baseband_beamform
docker service rm baseband_finalize
docker service rm baseband_initialize
docker service rm baseband_merge
docker service rm baseband_polarization
docker service rm baseband_pipeline_manager
docker stack deploy -c baseband.yml baseband

To keep an eye on whether the services started up successfully:

Text Only
docker stack ps baseband --no-trunc

and you can also look at live logs for individual services too, e.g.,

Text Only
docker service logs baseband_pipeline_manager --follow
docker service logs baseband_initialize --follow

Alternatively, you could go to the cluster page, search for the 'baseband' services, and monitor them. Immediately after restarting the baseband pipeline (aka clicking on the above button), the services should appear to be "Preparing" (shaded in yellow). After a couple of minutes, they will be "Running" (green) and that's when you know the baseband pipeline's successfully restarted!

Restarting the baseband pipeline while an event is processing

Beta: the following script can be used to conclude all jobs running in any baseband container

Text Only
import chime_frb_api
bucket_api = chime_frb_api.bucket.Bucket(base_url="https://frb.chimenet.ca/maestro/buckets")

buckets = bucket_api.get_status()
for bucket in buckets:
    if bucket.startswith('baseband'):
        status = bucket_api.get_status(bucket)
        for priority in ['high', 'medium', 'low']:
            for item in status['distributors_state'][priority]['status'].keys():
                print(f"Concluding bucket {bucket}, item {item}")
                bucket_api.conclude(bucket, item)

If this fails, the following instructions explain how to conclude jobs manually.

If there are lots of events in the queue and it's better to restart the pipeline immediately, there's some whack-a-mole to be done.

  1. Restart the pipeline through the frb-ops interface
  2. Monitor the cluster page to make sure all the baseband services are up and running
  3. As per the basic instructions for removing events from the payload here, start using the buckets interface.
  4. Do buck.get_status('baseband-pipeline') to check on the status of the event that was running when you restarted the pipeline. It should probably show "status: 2" for "currently running" (the bucket memory is persistent, so it's better to manually make sure everything is consistent)
  5. buck.conclude('baseband-pipeline', '{"event_number": [event_number]}')
  6. Check the baseband pipeline webpage, and you'll likely see that blue circle processing for some pipeline stage (initialize, beamform, merge, analysis, polarization, finalize). Do buck.get_status('baseband-[pipeline stage]') to get the dictionary payload for that event.
  7. Use buck.conclude to kill the hanging jobs. Below is an example for killing all 32 parallel jobs for the beamforming part of the refinement stage using a for-loop:
Python
In [25]: for num in np.arange(32):
    ...:     buck.conclude('baseband-beamform', '{"event_number": 72713970, "baseband_pipeline": "refinement", "job_id": '+str(num)+', "parameters": null}')
  1. Remember when I said "whack-a-mole"? If you kill the event during an earlier stage, the next stage will automatically "spawn" but still hang. Refer here for the order of processing -- if you kill the job while it's in its beamforming: refinement stage (as in the example above), you'll have to whack-a-mole away all the subsequent stages.
  2. Eventually you will have "whack-a-mole"-d your way through until the end of the finalize stage. At this point, buck.get_status('baseband-pipeline') should show only one event with a "status: 2" and that should match the number of the event that the baseband pipeline webpage claims is currently processing. Hopefully all went well, and you can breathe a sigh of relief now!

Integrated tests

Tests are automatically run upon opening a PR to main (and also upon pushing to main, even though that's probably redundant by now). First, pre-commit checks are run. Only upon those passing are the two sets of tests run. The simpler set of tests are run as github actions on a github container, and maybe take ~5 min total. (I think there's a finicky temperature test that sometimes randomly fails in the GHA tests.)

More elaborate pytests are marked with @pytest.mark.slow; all the tests that use ~large datasets are marked as slow. These tests are run on a container hosted on drao (there's an idle, always-listening runner that spawns the testing container every time the tests are run through github CI). The files live on frb-baseband:/data/archive/chime/baseband/testing and are mounted on the testing containers simply as /data/chime/baseband/processed/testing/.

As of ~Mar 15 2023 the tests take ~40 min to completely run.

Further technical details

If you're frbadmin and you want to inspect the baseband services,

Text Only
[frbadmin@frb-vsop ~]$ docker service ls | grep baseband
ypepapebrtv4   baseband_analysis                      replicated   1/1        chimefrb/baseband-localization:cluster
ggxy9laxbx0p   baseband_beamform                      replicated   32/32      chimefrb/baseband-localization:cluster
cpfgedzo7eed   baseband_finalize                      replicated   1/1        chimefrb/baseband-localization:cluster
ddyjwizkaayq   baseband_initialize                    replicated   1/1        chimefrb/baseband-localization:cluster
pnwbypxqfg0f   baseband_merge                         replicated   1/1        chimefrb/baseband-localization:cluster
3a4qyveqjn1b   baseband_pipeline_manager              replicated   1/1        chimefrb/baseband-localization:cluster

If restarting the baseband services from the ops page (https://frb.chimenet.ca/ops/operations) doesn't work, you can manually deploy the services with docker stack deploy -c baseband.yaml baseband docker/baseband_cluster_docker_compose.yml. Then you should open an issue, because the button should work (lol)

For more technical details on the baseband pipeline, refer to this link: https://github.com/CHIMEFRB/baseband-analysis/blob/cluster_upgrades/docs/cluster.md

For changing aspects of the docker stack itself (i.e., allocated resources/cores/RAM/etc), go here and then submit a PR as necessary. (Or if you want to play it risky, just change the file directly on frb-vsop I guess...)

Quick tips

Finding an event directory

Each event processed by the baseband pipeline will have a unique directory based on its event number and directory, following the pattern /data/chime/baseband/processed/YYYY/MM/DD/astro_<event_no>. An easy way to find the directory an event number is associated with is the following command on frb-analysis: find /data/chime/baseband/processed -type d -iname astro_<event_no>

Generated files

The baseband pipeline generates three main .h5 files for each stage (e.g., refinement_.h5, then localization.h5, then singlebeam.h5), as well as plots from the associated pipeline stage found in the plots/ subdirectory. Each main .h5 file is created after merging sub-files created by the pipeline stage, which are usually found in the meta_products/ subdirectory. To save time, if a .h5 sub-file exists, a new .h5 sub-file is not generated. (If all the sub-files in the meta_products subdirectory already exist, the pipeline might take ~an hour to fully run.)

Of course, if we're testing new versions of the pipeline, the existence of old files could be problematic. That's why it's sometimes nice to move all the old files to a new subdirectory, e.g., Trial#_YYMMDD/. (If you don't have the permissions, a sneaky trick is to move all the files while in a docker container. Or just ping someone like Kaitlyn on Slack to move the files for you!)

When running the pipeline on an event, logs are also generated and can be found in the logs subdirectory. If an event fails, it is useful to grep 'Error' *.log in the subdirectory to see when and how the pipeline failed.

Manually obtaining localizations

Currently (as of Jan 2022) it is common for the baseband pipeline to successfully generate all the .h5 files and then fail at the finalize stage, leading to no plots being produced in the plots directory or localizations being written to L4. In such a case, you can open a Jupyter notebook (e.g., https://github.com/CHIMEFRB/baseband-analysis/wiki/Usage-of-the-Docker-system#using-jupyter-notebook) and then run the following to obtain a localization:

Python
from baseband_analysis.dev.localization import fit_location

fit_location(
   '/data/chime/baseband/processed/YYYY/MM/DD/astro_<event_no>/localization_<event_no>.h5',
   diagnostic_plots=True
)

The diagnostic plots will appear in the cell and provide visual confirmation of how reliable the resulting baseband localization may be. The default output is RA, DEC, RA err (1 sigma), DEC err (1 sigma), rotation angle. You can also add return_full=True to get information such as SNR (second-to-last output) and DM (last output).

Localizations + errors should also be in the last line of /data/chime/baseband/processed/YYYY/MM/DD/astro_<event_no>/logs/baseband-<event_no>-localization_.log.

Clearing the shown events on frb-web/baseband-pipeline

Beta: If the baseband pipeline monitoring page shows far too many events (ideally it should only show a minimal number, so a daemon is probably hanging), you can wait for there to be no events processing and then run the following code

Text Only
import chime_frb_api
buck = chime_frb_api.bucket.Bucket(base_url="https://frb.chimenet.ca/maestro/buckets")
buck_names = ['baseband-finalize', 'baseband-analysis', 'baseband-beamform', 'baseband-initialize', 'baseband-polarization', 'baseband-merge']
for buckname in buck_names:
    print(buckname)
    buck.delete_bucket(buckname)
    buck.create_bucket(buckname)

This should "conclude"/clear everything.

Bugs? Issues?

Lastly, if you run into any bugs or issues, you can check the troubleshooting FAQ page here: https://github.com/CHIMEFRB/baseband-analysis/wiki/Troubleshooting-the-baseband-pipeline

We are actively working on debugging the pipeline and you can comment on any relevant issues, or open new ones here: https://github.com/CHIMEFRB/pipelines_wg/projects/2