Skip to content

CANFAR: running the baseband localization pipeline

Running baseband localization pipeline on events on CANFAR

For more details on the baseband localization pipeline itself, refer to this wiki page here: https://github.com/CHIMEFRB/baseband-analysis/wiki/Everything-you-want-to-know-about-the-baseband-localization-pipeline

For information on getting set up on CANFAR, including getting accounts and setting up local certificates, see this wiki page here: https://bao.chimenet.ca/wiki/index.php/CANFAR_onboarding

Remember that at the moment, if baseband localization pipeline .h5 files already exist (e.g., beamformed meta_products/*.h5 files and merged *.h5 files), the pipeline will not overwrite them. If you want your job to reflect the most updated version of the run you submitted, delete (or move into e.g. "Trial_##" subfolders) the old data products before you submit new events to be processed!

The baseband pipeline at CANFAR runs with the workflow system. Assuming you have pip install --user chime-frb-api installed wherever you're running this from , you should be able to use the workflow command line interface (CLI).

As of 2023 Oct, here's how Kaitlyn runs the baseband pipeline at CANFAR. (Both steps are run from her local computer)

  1. Spawn containers using the skaha module (also pip installable wherever). Containers should now all only be the chimefrb/baseband-headless:latest image (no more need for chimefrb/maestro-datatrail-client:latest!)
  2. Using the CLI, from the command line, call workflow pipelines deploy filename.yaml

The next two sections will elaborate on those two points.

Spawning containers for the baseband pipeline

Text Only
from skaha.session import Session
import nest_asyncio
nest_asyncio.apply()

s = Session() # assuming you have a valid certificate at ~/.ssl/cadcproxy.pem

### spawning two containers for data staging and unstaging
payload1 = {
    'name': 'datatrail', # name of the session
    'image': 'images.canfar.net/chimefrb/baseband-headless:latest', # image you're pulling
    'cores': 16, # data staging now supports multithreaded pulling
    'ram': 2,
    'kind': 'headless',
    'cmd': 'workflow',
    'args': 'run --site=canfar baseband-kshin-datatrail', # the last args is the bucket name!!!!
    'env': {'SITE': 'canfar'},
    'replicas': 2
}
sid1 = s.create(**payload1)

### spawning 65 containers for all the baseband pipeline stuff (beamforming, merge, analysis)
### you're of course free to use your own chime tokens -- otherwise you'll be masquerading as `kshin` :-)
payload2 = {
    'name': 'baseband',
    'image': 'images.canfar.net/chimefrb/baseband-headless:latest',
    'cores': 2,
    'ram': 16,
    'kind': 'headless',
    'cmd': 'workflow',
    'args': 'run --site=canfar basecat1-baseband',
    'env': {'SITE': 'canfar',
            'CHIME_FRB_ACCESS_TOKEN': 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyX2lkIjoia3NoaW4iLCJleHAiOjE2NzQ2NzIwNTAsImlzcyI6ImZyYi1tYXN0ZXIiLCJpYXQiOjE2NzQ2NzAyNTB9.SVi_M7bCD8EiqwWCLBgvWrGIqYQNikWqd0JSm_mlbNM',
            'CHIME_FRB_REFRESH_TOKEN': 'a6fab3e271f6af7cfdfa21b512ac9c79788d937f9dd9d983',
            # 'PYTHONPATH': '/arc/home/kshin/baseband-analysis/' # you can set this if you want to run your local branch on this headless session
           },
    'replicas': 65
}
sid2 = s.create(**payload2)

### baseband container for specifically the "refinement analysis" ("brightest") stage
payload3 = {
    'name': 'baseband-brightest',
    'image': 'images.canfar.net/chimefrb/baseband-headless:latest',
    'cores': 8, # MCMC now supports multi-processing
    'ram': 8,
    'kind': 'headless',
    'cmd': 'workflow',
    'args': 'run --log-level=INFO --site=canfar baseband-kshin-brightest',
    'env': {'SITE': 'canfar',
            'CHIME_FRB_ACCESS_TOKEN': 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyX2lkIjoia3NoaW4iLCJleHAiOjE2NzQ2NzIwNTAsImlzcyI6ImZyYi1tYXN0ZXIiLCJpYXQiOjE2NzQ2NzAyNTB9.SVi_M7bCD8EiqwWCLBgvWrGIqYQNikWqd0JSm_mlbNM',
            'CHIME_FRB_REFRESH_TOKEN': 'a6fab3e271f6af7cfdfa21b512ac9c79788d937f9dd9d983',
            # 'PYTHONPATH': '/arc/home/kshin/baseband-analysis/' # you can set this if you want to run your local branch on this headless session
           },
    'replicas': 2
}
sid3 = s.create(**payload3)

You can call s.fetch(status='Pending') to see if your sessions are pending, or if they're running. You can also call s.info(sid) to see what the status parameters of the session IDs are.

It's normal for sessions to be pending, especially if you're requesting something like 32GB RAM instead of, say, 8GB (which should be sufficient for most baseband events that aren't from the ~first year of the baseband capture system) or even 16GB. I like to monitor the status of my sessions with a list comprehension, e.g.,

Text Only
print(len([x['id'] for x in tmp if 'baseband-headless' in x['image'] and x['status']=='Running']))
print(len([x['id'] for x in tmp if 'baseband-headless' in x['image'] and x['status']!='Running'])) #  eg pending, failed, terminating, succeeded
print(len([x['id'] for x in tmp if 'baseband-headless' in x['image'] and x['status']=='Pending']))

Occasionally, when there's a network timeout connection, the container will have a status of 'Succeeded' and then proceed to terminate itself.

Once you're done processing with these sessions, you should free up resources for other users:

Text Only
s.destroy(sid)
s.destroy(sid2)
s.destroy(sid3)

Example config file

Below is a filename.yaml file that runs the entire baseband pipeline on event 22154974 at CANFAR. More details on the workflow pipelines config syntax can be found here.

Text Only
version: "1" # version of the workflow pipeline; as of 2023 May, there's only version 1
name: "kshin-baseband-processing" # name of your pipeline
event_number: &event-number 22154974 # for easy replacement later in the yaml file
event_str: &event-str "22154974"
defaults: # every object in the pipeline will have these parameters
  user: "kshin" # to keep track of who's submitting what job on the workflow website
  site: "canfar" # to run this at canfar
  event: [*event-str]
pipeline:
  stage_data:
    stage: 1
    work:
      pipeline: "baseband-kshin-datatrail" # name of the bucket this object will be deposited into and run from
      command:
        - "datatrail"
        - "pull"
        - "chime.event.baseband.raw"
        - *event-str
        - "-vvv" # verbose
        - "-f" # force
        - "-c" # cores
        - "16"
      tags:
        - "lowDM"
        - "stage-data"
      timeout: 10000
      priority: 3 # this is the default. 1 is lowest priority, 5 is highest priority.
  initialize:
    stage: 2
    work:
      pipeline: "baseband-kshin"
      function: baseband_analysis.pipelines.cluster.cluster_cli.initialize
      parameters:
        event_number: *event-number
        site: canfar
      tags:
        - "lowDM"
        - "initialize"
      timeout: 600
  refinement_tiedbeam:
    stage: 3
    matrix:
      job_id:
        range: [1,64]
    work:
      pipeline: "baseband-kshin"
      function: baseband_analysis.pipelines.cluster.cluster_cli.tiedbeam
      parameters:
        event_number: *event-number
        site: canfar
        job_id: ${{ matrix.job_id }}
        parameters: ["baseband_pipeline", "refinement", "job_size", "64", "files_per_group", "2"]
      tags:
        - "lowDM"
        - "refinement"
        - "tiedbeam"
      retries: 3
      timeout: 900
  refinement_merge:
    stage: 4
    work:
      pipeline: "baseband-kshin"
      function: baseband_analysis.pipelines.cluster.cluster_cli.merge
      parameters:
        event_number: *event-number
        site: canfar
        parameters: ["baseband_pipeline", "refinement"]
      tags:
        - "lowDM"
        - "refinement"
        - "merge"
      timeout: 600
  refinement_analysis:
    stage: 5
    work:
      pipeline: "baseband-kshin-brightest"
      function: baseband_analysis.pipelines.cluster.cluster_cli.brightest
      parameters:
        event_number: *event-number
        site: canfar
        parameters: ["distributor_name", "baseband-refinement", "baseband_pipeline", "refinement", "num_parallel_processes", "8"] # num_parallel_processes speeds up the MCMC localization component of this stage
      tags:
        - "lowDM"
        - "refinement"
        - "analysis"
  localization_tiedbeam:
    stage: 6
    matrix:
      job_id:
        range: [1,64]
    work:
      pipeline: "baseband-kshin"
      function: baseband_analysis.pipelines.cluster.cluster_cli.tiedbeam
      parameters:
        event_number: *event-number
        site: canfar
        job_id: ${{ matrix.job_id }}
        parameters: ["baseband_pipeline", "localization", "job_size", "64", "files_per_group", "2", "get_parameters_from_h5", "true"]
      tags:
        - "lowDM"
        - "localization"
        - "tiedbeam"
      retries: 3
      timeout: 900
  localization_merge:
    stage: 7
    work:
      pipeline: "baseband-kshin"
      function: baseband_analysis.pipelines.cluster.cluster_cli.merge
      parameters:
        event_number: *event-number
        site: canfar
        parameters: ["baseband_pipeline", "localization"]
      tags:
        - "lowDM"
        - "localization"
        - "merge"
      timeout: 600
  localization_analysis:
    stage: 8
    work:
      pipeline: "baseband-kshin"
      function: baseband_analysis.pipelines.cluster.cluster_cli.localization
      parameters:
        event_number: *event-number
        site: canfar
        parameters: ["distributor_name", "baseband-localization"]
      tags:
        - "lowDM"
        - "localization"
        - "analysis"
      timeout: 600
  singlebeam_tiedbeam:
    stage: 9
    matrix:
      job_id:
        range: [1,64]
    work:
      pipeline: "baseband-kshin"
      function: baseband_analysis.pipelines.cluster.cluster_cli.tiedbeam
      parameters:
        event_number: *event-number
        site: canfar
        job_id: ${{ matrix.job_id }}
        parameters: ["baseband_pipeline", "singlebeam", "job_size", "64", "files_per_group", "2", "get_parameters_from_h5", "true"]
      tags:
        - "lowDM"
        - "singlebeam"
        - "tiedbeam"
      retries: 3
      timeout: 900
  singlebeam_merge:
    stage: 10
    work:
      pipeline: "baseband-kshin"
      function: baseband_analysis.pipelines.cluster.cluster_cli.merge
      parameters:
        event_number: *event-number
        site: canfar
        parameters: ["baseband_pipeline", "singlebeam"]
      tags:
        - "lowDM"
        - "singlebeam"
        - "merge"
      timeout: 600
  singlebeam_analysis:
    stage: 11
    work:
      pipeline: "baseband-kshin"
      function: baseband_analysis.pipelines.cluster.cluster_cli.waterfall
      parameters:
        event_number: *event-number
        site: canfar
      tags:
        - "lowDM"
        - "singlebeam"
        - "analysis"
        - "waterfall"
      timeout: 600
  finalize:
    stage: 12
    work:
      pipeline: "baseband-kshin"
      function: baseband_analysis.pipelines.cluster.cluster_cli.finalize
      parameters:
        event_number: *event-number
        site: canfar
      tags:
        - "lowDM"
        - "finalize"
      timeout: 600
  unstage_data:
    stage: 12
    if: 'always' # should always run even if an earlier stage fails... in practice, doesn't seem to always be the case
    work:
      pipeline: "baseband-kshin-datatrail"
      command:
        - "datatrail"
        - "clear"
        - "chime.event.baseband.raw"
        - *event-str
        - "-vvv"
        - "-f" # force
      tags:
        - "lowDM"
        - "unstage-data"
      priority: 5

You'll have to dig into the code and e.g. the respective configs to know all the possible options to input for parameters.

Once you are happy with the config, it can be run with workflow pipelines deploy filename.yaml.

How the baseband pipeline on CANFAR works

  1. There's a data-stager step, which pulls the raw baseband data stored on the "cloud" at CANFAR to the /arc/ directory (which has ~limited space) on CANFAR.
  2. This step can take ~20 min
  3. Then the rest of the pipeline runs through the initialize . . . finalize stages as described here.
  4. Then the data-cleaner step runs, which removes the raw baseband data that was staged in the first step and frees the space on /arc/ back up. This is crucial!! Scratch space is limited :'(

Monitoring / inspecting events

You can monitor the status of your events at https://frb.chimenet.ca/workflow/buckets for whatever bucket you deposited your events in. You can also use the CLI interface:

Text Only
workflow pipelines ps <pipelinename> # get the workflow job_IDs
workflow pipelines ps <pipelinename> <job_ID>

On the workflow website, you can filter the jobs by searching for tag: <tag name> corresponding to tags you gave the workflow objects in the config file above. If your pipeline job returns plots (eg the brightest/localization/waterfall/finalize stages), you should be able to see those at https://frb.chimenet.ca/workflow/results for the right bucket (as long as you're logged into CANFAR on the browser too).

New with v1.2.0: For the baseband pipeline, logs are written for each stage and can be found in the /arc/projects/chime_frb/data/chime/baseband/processed/YYYY/MM/DD/astro_EVENTID/logs/ directory.

You can also see if intermediary data products are being successfully being created in the /arc/projects/chime_frb/data/chime/baseband/processed/... directory.

Manually running individual stages on CANFAR

For the default version of the pipeline, aka the version of baseband-analysis that's built with the image, you can use the CLI. Here are some examples:

Text Only
cluster-cli initialize --event-number 22154974 --site canfar
cluster-cli tiedbeam --event-number 22154974 --site canfar --job-id 1 -- baseband_pipeline refinement job_size 64 files_per_group 2
cluster-cli merge --event-number 22154974 --site canfar -- baseband_pipeline refinement
cluster-cli brightest --event-number 22154974 --site canfar num_parallel_processes 1 -- distributor_name baseband-refinement baseband_pipeline refinement
cluster-cli tiedbeam --event-number 22154974 --site canfar --job-id 1 -- baseband_pipeline localization job_size 64 files_per_group 2 get_parameters_from_h5 true
cluster-cli merge --event-number 22154974 --site canfar -- baseband_pipeline localization
cluster-cli localization --event-number 22154974 --site canfar -- distributor_name baseband-localization
cluster-cli tiedbeam --event-number 22154974 --site canfar --job-id 1 -- baseband_pipeline singlebeam job_size 64 files_per_group 2 get_parameters_from_h5 true
cluster-cli merge --event-number 22154974 --site canfar -- baseband_pipeline singlebeam
cluster-cli waterfall --event-number 22154974 --site canfar
cluster-cli finalize --event-number 22154974 --site canfar

If you want to test, say, your own version from a different branch on a CANFAR session, you can git clone that branch, go to that directory, and use the click module:

Python
%env CHIME_FRB_ACCESS_TOKEN= your_own_access_token_without_quotes
%env CHIME_FRB_REFRESH_TOKEN= your_own_access_token_without_quotes

import baseband_analysis
## baseband_analysis.__file__ should give you something like '/arc/home/kshin/baseband-analysis/baseband_analysis/__init__.py'

from baseband_analysis.pipelines.cluster.cluster_cli import initialize, tiedbeam, merge, brightest, localization, waterfall, finalize
import click.testing

click.testing.CliRunner().invoke(initialize, ['--event-number', '30105124', '--site', 'canfar'], catch_exceptions=False)
click.testing.CliRunner().invoke(tiedbeam, ['--event-number', '30105124', '--site', 'canfar', '--job-id', '1', '--',
     'baseband_pipeline', 'refinement', 'job_size', '64', 'files_per_group', '2'], catch_exceptions=False)

## etc etc

FAQ

Most CANFAR questions should be addressed in this living document here: https://bao.chimenet.ca/wiki/index.php/CANFAR_onboarding#CANFAR_usage_quickstart

My CANFAR job appears to fail with "caput failed"

You probably just need to have a container spawned with more RAM.

I can't spawn a skaha container (ERROR - 404 Client Error: Not Found for url)

This happens when the image (ie images.canfar.net/chimefrb/maestro-datatrail-client) is updated, but no one has tagged the updated/latest image with the headless tag on images.canfar.net.

I'm spawning multiple skaha containers and seeing "WARNING Connection pool is full"

Don't worry about it, the containers should still launch anyway (and you can verify that by checking their status with s.info(session_id))

My workflow job is failing with chimefrb.workflow cannot unpack non-iterable NoneType object

~As of 2023 May that's a known bug. This happens at the beamforming/merge stage if the data products already exist. For now, just delete/move those intermediary data products and rerun.~

Hopefully this is solved with chimefrb/baseband-analysis:v1.0.1.

I'm failing at the finalize stage but things look fine?

~It's probably because you/the account associated with certificate own every file in the parent processed event directory (eg if someone made another plot or subfolder full of plots in there). Don't worry about it, that's the last step of finalize, so we still get all the plots and the results written to frb-master.~

~As of 2023 May we're looking into fixing this for future releases.~

Hopefully this is solved with chimefrb/baseband-analysis:v1.0.1.

How do I query the resulting baseband localizations?

Python
from chime_frb_api.backends import frb_master
master = frb_master.FRBMaster()
event=master.events.get_event(event_id)
event['measured_parameters']

The version coincides with the github hash of the pipeline versions.