CANFAR: running the baseband localization pipeline
Running baseband localization pipeline on events on CANFAR¶
For more details on the baseband localization pipeline itself, refer to this wiki page here: https://github.com/CHIMEFRB/baseband-analysis/wiki/Everything-you-want-to-know-about-the-baseband-localization-pipeline
For information on getting set up on CANFAR, including getting accounts and setting up local certificates, see this wiki page here: https://bao.chimenet.ca/wiki/index.php/CANFAR_onboarding
Remember that at the moment, if baseband localization pipeline .h5 files already exist (e.g., beamformed meta_products/*.h5
files and merged *.h5
files), the pipeline will not overwrite them. If you want your job to reflect the most updated version of the run you submitted, delete (or move into e.g. "Trial_##" subfolders) the old data products before you submit new events to be processed!
The baseband pipeline at CANFAR runs with the workflow system. Assuming you have pip install --user chime-frb-api
installed wherever you're running this from , you should be able to use the workflow command line interface (CLI).
As of 2023 Oct, here's how Kaitlyn runs the baseband pipeline at CANFAR. (Both steps are run from her local computer)
- Spawn containers using the
skaha
module (also pip installable wherever). Containers should now all only be thechimefrb/baseband-headless:latest
image (no more need forchimefrb/maestro-datatrail-client:latest
!) - Using the CLI, from the command line, call
workflow pipelines deploy filename.yaml
The next two sections will elaborate on those two points.
Spawning containers for the baseband pipeline¶
from skaha.session import Session
import nest_asyncio
nest_asyncio.apply()
s = Session() # assuming you have a valid certificate at ~/.ssl/cadcproxy.pem
### spawning two containers for data staging and unstaging
payload1 = {
'name': 'datatrail', # name of the session
'image': 'images.canfar.net/chimefrb/baseband-headless:latest', # image you're pulling
'cores': 16, # data staging now supports multithreaded pulling
'ram': 2,
'kind': 'headless',
'cmd': 'workflow',
'args': 'run --site=canfar baseband-kshin-datatrail', # the last args is the bucket name!!!!
'env': {'SITE': 'canfar'},
'replicas': 2
}
sid1 = s.create(**payload1)
### spawning 65 containers for all the baseband pipeline stuff (beamforming, merge, analysis)
### you're of course free to use your own chime tokens -- otherwise you'll be masquerading as `kshin` :-)
payload2 = {
'name': 'baseband',
'image': 'images.canfar.net/chimefrb/baseband-headless:latest',
'cores': 2,
'ram': 16,
'kind': 'headless',
'cmd': 'workflow',
'args': 'run --site=canfar basecat1-baseband',
'env': {'SITE': 'canfar',
'CHIME_FRB_ACCESS_TOKEN': 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyX2lkIjoia3NoaW4iLCJleHAiOjE2NzQ2NzIwNTAsImlzcyI6ImZyYi1tYXN0ZXIiLCJpYXQiOjE2NzQ2NzAyNTB9.SVi_M7bCD8EiqwWCLBgvWrGIqYQNikWqd0JSm_mlbNM',
'CHIME_FRB_REFRESH_TOKEN': 'a6fab3e271f6af7cfdfa21b512ac9c79788d937f9dd9d983',
# 'PYTHONPATH': '/arc/home/kshin/baseband-analysis/' # you can set this if you want to run your local branch on this headless session
},
'replicas': 65
}
sid2 = s.create(**payload2)
### baseband container for specifically the "refinement analysis" ("brightest") stage
payload3 = {
'name': 'baseband-brightest',
'image': 'images.canfar.net/chimefrb/baseband-headless:latest',
'cores': 8, # MCMC now supports multi-processing
'ram': 8,
'kind': 'headless',
'cmd': 'workflow',
'args': 'run --log-level=INFO --site=canfar baseband-kshin-brightest',
'env': {'SITE': 'canfar',
'CHIME_FRB_ACCESS_TOKEN': 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyX2lkIjoia3NoaW4iLCJleHAiOjE2NzQ2NzIwNTAsImlzcyI6ImZyYi1tYXN0ZXIiLCJpYXQiOjE2NzQ2NzAyNTB9.SVi_M7bCD8EiqwWCLBgvWrGIqYQNikWqd0JSm_mlbNM',
'CHIME_FRB_REFRESH_TOKEN': 'a6fab3e271f6af7cfdfa21b512ac9c79788d937f9dd9d983',
# 'PYTHONPATH': '/arc/home/kshin/baseband-analysis/' # you can set this if you want to run your local branch on this headless session
},
'replicas': 2
}
sid3 = s.create(**payload3)
You can call s.fetch(status='Pending')
to see if your sessions are pending, or if they're running. You can also call s.info(sid)
to see what the status parameters of the session IDs are.
It's normal for sessions to be pending, especially if you're requesting something like 32GB RAM instead of, say, 8GB (which should be sufficient for most baseband events that aren't from the ~first year of the baseband capture system) or even 16GB. I like to monitor the status of my sessions with a list comprehension, e.g.,
print(len([x['id'] for x in tmp if 'baseband-headless' in x['image'] and x['status']=='Running']))
print(len([x['id'] for x in tmp if 'baseband-headless' in x['image'] and x['status']!='Running'])) # eg pending, failed, terminating, succeeded
print(len([x['id'] for x in tmp if 'baseband-headless' in x['image'] and x['status']=='Pending']))
Occasionally, when there's a network timeout connection, the container will have a status of 'Succeeded'
and then proceed to terminate itself.
Once you're done processing with these sessions, you should free up resources for other users:
Example config file¶
Below is a filename.yaml
file that runs the entire baseband pipeline on event 22154974
at CANFAR. More details on the workflow pipelines config syntax can be found here.
version: "1" # version of the workflow pipeline; as of 2023 May, there's only version 1
name: "kshin-baseband-processing" # name of your pipeline
event_number: &event-number 22154974 # for easy replacement later in the yaml file
event_str: &event-str "22154974"
defaults: # every object in the pipeline will have these parameters
user: "kshin" # to keep track of who's submitting what job on the workflow website
site: "canfar" # to run this at canfar
event: [*event-str]
pipeline:
stage_data:
stage: 1
work:
pipeline: "baseband-kshin-datatrail" # name of the bucket this object will be deposited into and run from
command:
- "datatrail"
- "pull"
- "chime.event.baseband.raw"
- *event-str
- "-vvv" # verbose
- "-f" # force
- "-c" # cores
- "16"
tags:
- "lowDM"
- "stage-data"
timeout: 10000
priority: 3 # this is the default. 1 is lowest priority, 5 is highest priority.
initialize:
stage: 2
work:
pipeline: "baseband-kshin"
function: baseband_analysis.pipelines.cluster.cluster_cli.initialize
parameters:
event_number: *event-number
site: canfar
tags:
- "lowDM"
- "initialize"
timeout: 600
refinement_tiedbeam:
stage: 3
matrix:
job_id:
range: [1,64]
work:
pipeline: "baseband-kshin"
function: baseband_analysis.pipelines.cluster.cluster_cli.tiedbeam
parameters:
event_number: *event-number
site: canfar
job_id: ${{ matrix.job_id }}
parameters: ["baseband_pipeline", "refinement", "job_size", "64", "files_per_group", "2"]
tags:
- "lowDM"
- "refinement"
- "tiedbeam"
retries: 3
timeout: 900
refinement_merge:
stage: 4
work:
pipeline: "baseband-kshin"
function: baseband_analysis.pipelines.cluster.cluster_cli.merge
parameters:
event_number: *event-number
site: canfar
parameters: ["baseband_pipeline", "refinement"]
tags:
- "lowDM"
- "refinement"
- "merge"
timeout: 600
refinement_analysis:
stage: 5
work:
pipeline: "baseband-kshin-brightest"
function: baseband_analysis.pipelines.cluster.cluster_cli.brightest
parameters:
event_number: *event-number
site: canfar
parameters: ["distributor_name", "baseband-refinement", "baseband_pipeline", "refinement", "num_parallel_processes", "8"] # num_parallel_processes speeds up the MCMC localization component of this stage
tags:
- "lowDM"
- "refinement"
- "analysis"
localization_tiedbeam:
stage: 6
matrix:
job_id:
range: [1,64]
work:
pipeline: "baseband-kshin"
function: baseband_analysis.pipelines.cluster.cluster_cli.tiedbeam
parameters:
event_number: *event-number
site: canfar
job_id: ${{ matrix.job_id }}
parameters: ["baseband_pipeline", "localization", "job_size", "64", "files_per_group", "2", "get_parameters_from_h5", "true"]
tags:
- "lowDM"
- "localization"
- "tiedbeam"
retries: 3
timeout: 900
localization_merge:
stage: 7
work:
pipeline: "baseband-kshin"
function: baseband_analysis.pipelines.cluster.cluster_cli.merge
parameters:
event_number: *event-number
site: canfar
parameters: ["baseband_pipeline", "localization"]
tags:
- "lowDM"
- "localization"
- "merge"
timeout: 600
localization_analysis:
stage: 8
work:
pipeline: "baseband-kshin"
function: baseband_analysis.pipelines.cluster.cluster_cli.localization
parameters:
event_number: *event-number
site: canfar
parameters: ["distributor_name", "baseband-localization"]
tags:
- "lowDM"
- "localization"
- "analysis"
timeout: 600
singlebeam_tiedbeam:
stage: 9
matrix:
job_id:
range: [1,64]
work:
pipeline: "baseband-kshin"
function: baseband_analysis.pipelines.cluster.cluster_cli.tiedbeam
parameters:
event_number: *event-number
site: canfar
job_id: ${{ matrix.job_id }}
parameters: ["baseband_pipeline", "singlebeam", "job_size", "64", "files_per_group", "2", "get_parameters_from_h5", "true"]
tags:
- "lowDM"
- "singlebeam"
- "tiedbeam"
retries: 3
timeout: 900
singlebeam_merge:
stage: 10
work:
pipeline: "baseband-kshin"
function: baseband_analysis.pipelines.cluster.cluster_cli.merge
parameters:
event_number: *event-number
site: canfar
parameters: ["baseband_pipeline", "singlebeam"]
tags:
- "lowDM"
- "singlebeam"
- "merge"
timeout: 600
singlebeam_analysis:
stage: 11
work:
pipeline: "baseband-kshin"
function: baseband_analysis.pipelines.cluster.cluster_cli.waterfall
parameters:
event_number: *event-number
site: canfar
tags:
- "lowDM"
- "singlebeam"
- "analysis"
- "waterfall"
timeout: 600
finalize:
stage: 12
work:
pipeline: "baseband-kshin"
function: baseband_analysis.pipelines.cluster.cluster_cli.finalize
parameters:
event_number: *event-number
site: canfar
tags:
- "lowDM"
- "finalize"
timeout: 600
unstage_data:
stage: 12
if: 'always' # should always run even if an earlier stage fails... in practice, doesn't seem to always be the case
work:
pipeline: "baseband-kshin-datatrail"
command:
- "datatrail"
- "clear"
- "chime.event.baseband.raw"
- *event-str
- "-vvv"
- "-f" # force
tags:
- "lowDM"
- "unstage-data"
priority: 5
You'll have to dig into the code and e.g. the respective configs to know all the possible options to input for parameters
.
Once you are happy with the config, it can be run with workflow pipelines deploy filename.yaml
.
How the baseband pipeline on CANFAR works¶
- There's a data-stager step, which pulls the raw baseband data stored on the "cloud" at CANFAR to the
/arc/
directory (which has ~limited space) on CANFAR. - This step can take ~20 min
- Then the rest of the pipeline runs through the
initialize . . . finalize
stages as described here. - Then the data-cleaner step runs, which removes the raw baseband data that was staged in the first step and frees the space on
/arc/
back up. This is crucial!! Scratch space is limited :'(
Monitoring / inspecting events¶
You can monitor the status of your events at https://frb.chimenet.ca/workflow/buckets for whatever bucket you deposited your events in. You can also use the CLI interface:
workflow pipelines ps <pipelinename> # get the workflow job_IDs
workflow pipelines ps <pipelinename> <job_ID>
On the workflow website, you can filter the jobs by searching for tag: <tag name>
corresponding to tags you gave the workflow objects in the config file above.
If your pipeline job returns plots (eg the brightest/localization/waterfall/finalize stages), you should be able to see those at https://frb.chimenet.ca/workflow/results for the right bucket (as long as you're logged into CANFAR on the browser too).
New with v1.2.0
: For the baseband pipeline, logs are written for each stage and can be found in the /arc/projects/chime_frb/data/chime/baseband/processed/YYYY/MM/DD/astro_EVENTID/logs/
directory.
You can also see if intermediary data products are being successfully being created in the /arc/projects/chime_frb/data/chime/baseband/processed/...
directory.
Manually running individual stages on CANFAR¶
For the default version of the pipeline, aka the version of baseband-analysis that's built with the image, you can use the CLI. Here are some examples:
cluster-cli initialize --event-number 22154974 --site canfar
cluster-cli tiedbeam --event-number 22154974 --site canfar --job-id 1 -- baseband_pipeline refinement job_size 64 files_per_group 2
cluster-cli merge --event-number 22154974 --site canfar -- baseband_pipeline refinement
cluster-cli brightest --event-number 22154974 --site canfar num_parallel_processes 1 -- distributor_name baseband-refinement baseband_pipeline refinement
cluster-cli tiedbeam --event-number 22154974 --site canfar --job-id 1 -- baseband_pipeline localization job_size 64 files_per_group 2 get_parameters_from_h5 true
cluster-cli merge --event-number 22154974 --site canfar -- baseband_pipeline localization
cluster-cli localization --event-number 22154974 --site canfar -- distributor_name baseband-localization
cluster-cli tiedbeam --event-number 22154974 --site canfar --job-id 1 -- baseband_pipeline singlebeam job_size 64 files_per_group 2 get_parameters_from_h5 true
cluster-cli merge --event-number 22154974 --site canfar -- baseband_pipeline singlebeam
cluster-cli waterfall --event-number 22154974 --site canfar
cluster-cli finalize --event-number 22154974 --site canfar
If you want to test, say, your own version from a different branch on a CANFAR session, you can git clone
that branch, go to that directory, and use the click
module:
%env CHIME_FRB_ACCESS_TOKEN= your_own_access_token_without_quotes
%env CHIME_FRB_REFRESH_TOKEN= your_own_access_token_without_quotes
import baseband_analysis
## baseband_analysis.__file__ should give you something like '/arc/home/kshin/baseband-analysis/baseband_analysis/__init__.py'
from baseband_analysis.pipelines.cluster.cluster_cli import initialize, tiedbeam, merge, brightest, localization, waterfall, finalize
import click.testing
click.testing.CliRunner().invoke(initialize, ['--event-number', '30105124', '--site', 'canfar'], catch_exceptions=False)
click.testing.CliRunner().invoke(tiedbeam, ['--event-number', '30105124', '--site', 'canfar', '--job-id', '1', '--',
'baseband_pipeline', 'refinement', 'job_size', '64', 'files_per_group', '2'], catch_exceptions=False)
## etc etc
FAQ¶
Most CANFAR questions should be addressed in this living document here: https://bao.chimenet.ca/wiki/index.php/CANFAR_onboarding#CANFAR_usage_quickstart
My CANFAR job appears to fail with "caput failed"¶
You probably just need to have a container spawned with more RAM.
I can't spawn a skaha container (ERROR - 404 Client Error: Not Found for url)¶
This happens when the image (ie images.canfar.net/chimefrb/maestro-datatrail-client) is updated, but no one has tagged the updated/latest image with the headless tag on images.canfar.net.
I'm spawning multiple skaha containers and seeing "WARNING Connection pool is full"¶
Don't worry about it, the containers should still launch anyway (and you can verify that by checking their status with s.info(session_id)
)
My workflow job is failing with chimefrb.workflow cannot unpack non-iterable NoneType object
¶
~As of 2023 May that's a known bug. This happens at the beamforming/merge stage if the data products already exist. For now, just delete/move those intermediary data products and rerun.~
Hopefully this is solved with chimefrb/baseband-analysis:v1.0.1
.
I'm failing at the finalize stage but things look fine?¶
~It's probably because you/the account associated with certificate own every file in the parent processed event directory (eg if someone made another plot or subfolder full of plots in there). Don't worry about it, that's the last step of finalize, so we still get all the plots and the results written to frb-master.~
~As of 2023 May we're looking into fixing this for future releases.~
Hopefully this is solved with chimefrb/baseband-analysis:v1.0.1
.
How do I query the resulting baseband localizations?¶
from chime_frb_api.backends import frb_master
master = frb_master.FRBMaster()
event=master.events.get_event(event_id)
event['measured_parameters']
The version
coincides with the github hash of the pipeline versions.