Skip to content

Registration API

Event Registration

datatrail_admin.registration.event_registration

Pipeline to register CHIME/FRB event callback data.

EventRegistrar

Python
EventRegistrar(raw_intensity_base_path: str = '/data/chime/intensity/raw', raw_baseband_base_path: str = '/data/chime/baseband/raw', root_path: str = '/', test_mode: bool = False, site: Optional[str] = SITE, verbose: bool = False)

Bases: object

Class for data product registration.

Eventregistrar registers CHIME/FRB's intensity and baseband data.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/registration/event_registration.py
Python
def __init__(
    self,
    raw_intensity_base_path: str = "/data/chime/intensity/raw",
    raw_baseband_base_path: str = "/data/chime/baseband/raw",
    root_path: str = "/",
    test_mode: bool = False,
    site: Optional[str] = SITE,
    verbose: bool = False,
):
    """Eventregistrar registers CHIME/FRB's intensity and baseband data."""
    self.site = site
    self.raw_intensity_base_path = raw_intensity_base_path
    self.raw_baseband_base_path = raw_baseband_base_path
    self.root_path = root_path
    self.test_mode = test_mode
    if verbose:
        log.setLevel(10)
    log.info(f"Test mode: {self.test_mode}")
    log.info(f"Site: {self.site}")
    log.info(f"Raw intensity base path: {self.raw_intensity_base_path}")
    log.info(f"Raw baseband base path: {self.raw_baseband_base_path}")

    self.backend = chime_frb_api.frb_master.FRBMaster()
    self.results = Results()
    self.registry = CollectorRegistry()
    self.time_last_active = Gauge(
        "datatrail_event_registration_last_run",
        "Last active time.",
        registry=self.registry,  # noqa
    )
    self.time_per_event = Gauge(
        "datatrail_event_registration_time_per_event",
        "Time per event.",
        registry=self.registry,  # noqa
    )
add_unregistered_event
Python
add_unregistered_event(event_number: int, data_type: str, date_captured: str, action_picker_dataset: str, reason: str)

Add a new event to the unregistered events collection for tracking.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/registration/event_registration.py
Python
def add_unregistered_event(
    self,
    event_number: int,
    data_type: str,
    date_captured: str,
    action_picker_dataset: str,
    reason: str,
):
    """Add a new event to the unregistered events collection for tracking."""
    payload = {
        "event_number": event_number,
        "data_type": data_type,
        "date_captured": date_captured,
        "action_picker_dataset": action_picker_dataset,
        "reason": reason,
    }
    res = utilities.result_view(
        self.results,
        "datatrail-unregistered-events",
        query={"event": {"$in": [event_number]}},
        projection={},
        limit=1,
    )
    if len(res) == 0:
        w = Work(
            pipeline="datatrail-unregistered-events",
            user="datatrail",
            site="chime",
            event=[event_number],
            results=payload,
            retries=1,
        )
        utilities.deposit(w)
        w = utilities.withdraw("datatrail-unregistered-events", event=[event_number])
        w.status = "success"
        utilities.update(w)
        utilities.result_deposit(self.results, [w.payload])
    else:
        res[0]["results"] = payload
        utilities.result_update(self.results, [res[0]])
delete_unregistered_event
Python
delete_unregistered_event(event_number: int, data_type: str)

Delete unregistered event from the tracking database.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/registration/event_registration.py
Python
def delete_unregistered_event(self, event_number: int, data_type: str):
    """Delete unregistered event from the tracking database."""
    res = utilities.result_view(
        self.results,
        "datatrail-unregistered-events",
        query={"event": {"$in": [event_number]}},
        projection={},
        limit=1,
    )
    if res:
        utilities.result_delete_ids(
            self.results, "datatrail-unregistered-events", [res[0]["id"]]
        )
dump_payloads
Python
dump_payloads(date: datetime.datetime, intensity: Dict[int, Any], baseband: Dict[int, Any]) -> bool

Dump event file registration payloads to local JSON file.

Parameters

date : datetime.datetime Date of the event data. intensity : Dict[int, Any] Intensity event file registration payloads, mapped to event number. baseband : Dict[int, Any] Baseband event file registration payloads, mapped to event number.

Returns

bool True, if file write was successful.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/registration/event_registration.py
Python
def dump_payloads(
    self,
    date: datetime.datetime,
    intensity: Dict[int, Any],
    baseband: Dict[int, Any],
) -> bool:
    """Dump event file registration payloads to local JSON file.

    Parameters
    ----------
    date : datetime.datetime
        Date of the event data.
    intensity : Dict[int, Any]
        Intensity event file registration payloads, mapped to event number.
    baseband : Dict[int, Any]
        Baseband event file registration payloads, mapped to event number.

    Returns
    -------
    bool
        True, if file write was successful.
    """
    payloads = [intensity, baseband]
    file_types = ["intensity", "baseband"]
    base_path = "./"
    if self.site == "chime":
        base_path = "/data/chime/intensity/processed/datatrail_registration"
    for payload, file_type in zip(payloads, file_types):
        fname = os.path.join(
            base_path, f"{date.date()}_{file_type}_file_registration_payloads.json"
        )
        with open(fname, "w") as f:
            f.write(json.dumps(payload))
            os.chmod(fname, 0o777)
            log.warning(f"Wrote {file_type} payloads to {f.name}")
    return True
get_event_directories
Python
get_event_directories(dt: datetime.datetime) -> Tuple[str, str]

Get directory structure for intensity and baseband events for a given day.

Parameters

dt : datetime.datetime Date from which to construct the directory structure. Assumes event data are stored in /YYYY/MM/DD/.

Returns

Tuple[str, str] Full paths to intensity and baseband event data, respectively.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/registration/event_registration.py
Python
def get_event_directories(self, dt: datetime.datetime) -> Tuple[str, str]:
    """Get directory structure for intensity and baseband events for a given day.

    Parameters
    ----------
    dt : datetime.datetime
        Date from which to construct the directory structure.
        Assumes event data are stored in /YYYY/MM/DD/.

    Returns
    -------
    Tuple[str, str]
        Full paths to intensity and baseband event data, respectively.
    """
    log.info(f"Creating intensity and baseband data paths for {dt.date()}")
    intensity = os.path.join(
        self.raw_intensity_base_path,
        "{}".format(str(dt.year).zfill(4)),
        "{}".format(str(dt.month).zfill(2)),
        "{}".format(str(dt.day).zfill(2)),
    )
    baseband = os.path.join(
        self.raw_baseband_base_path,
        "{}".format(str(dt.year).zfill(4)),
        "{}".format(str(dt.month).zfill(2)),
        "{}".format(str(dt.day).zfill(2)),
    )
    return intensity, baseband
get_event_files
Python
get_event_files(paths: Dict[int, str]) -> Dict[int, List[str]]

Get a list of all event files for all events.

Parameters

paths : List[str] List of paths to event folders.

Returns

Dict[int, List[str]] Mapping of event number to all event files for that event.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/registration/event_registration.py
Python
def get_event_files(self, paths: Dict[int, str]) -> Dict[int, List[str]]:
    """Get a list of all event files for all events.

    Parameters
    ----------
    paths : List[str]
        List of paths to event folders.

    Returns
    -------
    Dict[int, List[str]]
        Mapping of event number to all event files for that event.
    """
    log.info("Getting event files")
    all_files: Dict[int, Any] = {}
    for event_number in paths:
        path = paths[event_number]
        all_files[event_number] = []
        if os.path.exists(path):
            sub_paths = os.listdir(path)
            try:
                sub_paths.remove("MD5SUMS")  # Ignore this file if it exists
            except ValueError:
                log.info(f"No MD5SUMS file is listed for {event_number}")
            for p in sub_paths:
                sp = os.path.join(path, p)
                if os.path.isdir(sp):
                    event_files = os.listdir(sp)
                    for ef in event_files:
                        file_name = os.path.join(sp, ef)
                        all_files[event_number].append(file_name)
                else:
                    all_files[event_number].append(sp)
    log.info(f"Got files for {len(all_files)} events in {len(paths)} paths")
    return all_files
get_event_paths
Python
get_event_paths(path: str) -> Dict[int, str]

Get all event folder paths.

Parameters

path : str Full path to the event folder.

Returns

Dict[int, str] Paths to event folders, mapped to event number.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/registration/event_registration.py
Python
def get_event_paths(self, path: str) -> Dict[int, str]:
    """Get all event folder paths.

    Parameters
    ----------
    path : str
        Full path to the event folder.

    Returns
    -------
    Dict[int, str]
        Paths to event folders, mapped to event number.
    """
    log.info(f"Getting paths to event folders in {path}")
    paths: Dict[int, Any] = {}
    if os.path.exists(path):
        items = os.listdir(path)
        try:
            # Assume events are saved in /astro_{event_number}
            for i in items:
                event_number = int(i.split("_")[1])
                paths[event_number] = f"{path}/astro_{event_number}"
        except IndexError:
            log.error(f"Failed to extract folders with name 'astro_' in {path}")
            return paths
        except ValueError:
            log.error(f"Extracted non-event number folder name from {path}")
            return paths
    else:
        log.info(f"Path {path} does not exist!!!")
    log.info(f"Got {len(paths)} event folder paths in {path}")
    return paths
get_md5sum
Python
get_md5sum(file: str)

Try to read the MD5SUM for a file from the MD5SUMS file.

Parameters

file : str Event file full path.

Returns

str MD5SUM value if it is found, else None.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/registration/event_registration.py
Python
def get_md5sum(self, file: str):
    """Try to read the MD5SUM for a file from the MD5SUMS file.

    Parameters
    ----------
    file : str
        Event file full path.

    Returns
    -------
    str
        MD5SUM value if it is found, else None.
    """
    log.info(f"Getting MD5SUM value for {file}")
    filename = file.split("/")[-1]
    path = self.root_path + file.replace(file.split("/")[-1], "") + "MD5SUMS"
    if not os.path.exists(path):
        path = os.path.join(self.root_path, *path.split("/")[:-2], "MD5SUMS")
    try:
        with open(path) as f:
            for line in f:
                if filename in line:
                    md5sum = line.split(" = ")[1]
                    return md5sum.replace("\n", "")  # Delete newline chrs
        log.info(f"MD5SUM value not found for {file}")
        return None
    except FileNotFoundError:
        log.info(f"MD5SUM file not found for {file}")
        return None
get_registration_payloads
Python
get_registration_payloads(files: Dict[int, List[str]]) -> Dict[int, List[Dict[str, Any]]]

Get a list of all file payloads ready for registration.

Parameters

files : Dict[int, List[str]] List of all file full paths for all events, mapped to event number.

Returns

Dict[int, List[Dict[str, Any]]] List of payloads complete with all required registration fields, mapped to event number.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/registration/event_registration.py
Python
def get_registration_payloads(
    self, files: Dict[int, List[str]]
) -> Dict[int, List[Dict[str, Any]]]:
    """Get a list of all file payloads ready for registration.

    Parameters
    ----------
    files : Dict[int, List[str]]
        List of all file full paths for all events,
        mapped to event number.

    Returns
    -------
    Dict[int, List[Dict[str, Any]]]
        List of payloads complete with all required registration fields,
        mapped to event number.
    """
    log.info(f"Getting registration data for {len(files)} files")
    payloads: Dict[int, Any] = {}
    mounts = list(set(MOUNTS.values()))
    mounts.sort(key=lambda s: len(s), reverse=True)
    for event_number in files:
        payloads[event_number] = []
        for file in files[event_number]:
            for m in mounts:
                if file.startswith(m):
                    filename = file.lstrip(m)
                    break
                else:
                    filename = file
            if self.validate_filename(filename):
                payloads[event_number].append(
                    {
                        "name": filename,
                        "path": file,
                        "md5sum": self.get_md5sum(file),
                        "date_created": pytz.utc.localize(
                            datetime.datetime.utcnow()
                        ).isoformat(),
                        "storage_element_captured_at": self.site,
                    }
                )
        if payloads[event_number] == []:
            del payloads[event_number]
    return payloads
query_action_picker_dataset
Python
query_action_picker_dataset(event_numbers: List[int], data_type: str)

Query the L4 action picker database to get the dataset name.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/registration/event_registration.py
Python
def query_action_picker_dataset(self, event_numbers: List[int], data_type: str):
    """Query the L4 action picker database to get the dataset name."""
    assert data_type in ["intensity", "baseband"]
    r = requests.post(
        "https://frb.chimenet.ca/chimefrb/astro_events/get_event_dataset/",
        data={"event_nos": event_numbers, "data_type": data_type},
    )
    actions = r.json()
    final_actions = {}
    for a in event_numbers:
        if str(a) in actions:
            final_actions[a] = actions[str(a)]
    return final_actions
query_last_completed_date
Python
query_last_completed_date()

Get the last completed date from the tracking db.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/registration/event_registration.py
Python
def query_last_completed_date(self):
    """Get the last completed date from the tracking db."""
    t = utilities.result_view(
        self.results,
        "datatrail-last-completed-date",
        query={},
        projection={"_id": 0},
        limit=1,
    )
    self.last_completed_date = None
    if t is not None:
        t = t[0]["results"]["last_completed_date"]
        self.last_completed_date = datetime.datetime.strptime(t, "%Y-%m-%d")
query_tsar_verification
Python
query_tsar_verification(event_number: int) -> str

Fetch tsar verification from frb-master's verifications.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/registration/event_registration.py
Python
def query_tsar_verification(self, event_number: int) -> str:
    """Fetch tsar verification from frb-master's verifications."""
    verification = self.backend.API.get(
        f"/v1/verification/get-verification/{event_number}"
    )
    if verification is None:
        raise Exception(
            f"Verification for event {event_number} not found in FRB Master! Check if this was a duplicate event of another."  # noqa
        )
    user_verifications = verification.get("user_verification")
    ratings = []
    for uv in user_verifications:
        if uv["id"] == "l4_pipeline":
            continue
        if (
            uv["classification"] == "KNOWN SOURCE"
            and uv["comments"] in "known pulsar"
        ):
            ratings.append("classified.PULSAR")
        elif uv["classification"] == "FAINT":
            ratings.append("classified.NOISE")
        elif uv["classification"] == "RFI":
            ratings.append("classified.RFI")
        else:
            # Even if one person thinks that this is an FRB,
            # save it for safety (less agreessive action).
            return "classified.FRB"
    if "classified.PULSAR" in ratings:
        return "classified.PULSAR"
    elif "classified.NOISE" in ratings:
        return "classified.NOISE"
    return "classified.RFI"
query_unregistered_events
Python
query_unregistered_events()

Query previously unregiistered events from the tracking database.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/registration/event_registration.py
Python
def query_unregistered_events(self):
    """Query previously unregiistered events from the tracking database."""
    res = utilities.result_view(
        self.results,
        "datatrail-unregistered-events",
        query={},
        projection={"_id": 0},
        limit=1_000_000_000,
    )
    self.unregistered_events = [r["results"] for r in res]
register_date
Python
register_date(date: datetime.datetime, dry_run: bool = True)

Register events for the given date.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/registration/event_registration.py
Python
def register_date(self, date: datetime.datetime, dry_run: bool = True):
    """Register events for the given date."""
    log.info(f"Registering events from {date.date()}")

    dirs = self.get_event_directories(date)

    intensity_event_paths = self.get_event_paths(dirs[0])
    baseband_event_paths = self.get_event_paths(dirs[1])

    intensity_event_files = self.get_event_files(intensity_event_paths)
    baseband_event_files = self.get_event_files(baseband_event_paths)

    # Create the file payloads for the event.
    intensity_file_payloads = self.get_registration_payloads(intensity_event_files)
    baseband_file_payloads = self.get_registration_payloads(baseband_event_files)

    # Query the datasets that were suggested in L4 actions.
    intensity_action_dataset = self.query_action_picker_dataset(
        list(intensity_file_payloads.keys()), "intensity"
    )
    baseband_action_dataset = self.query_action_picker_dataset(
        list(baseband_file_payloads.keys()), "baseband"
    )

    # Now register the payloads
    self.register_payloads(
        date,
        intensity_file_payloads,
        baseband_file_payloads,
        intensity_action_dataset,
        baseband_action_dataset,
        dry_run,
    )
    # Update the last successful date
    if not dry_run:
        self.update_last_successful_date(date)
register_event
Python
register_event(event_number: int, data_type: str, files: List[Dict[str, Any]], attach_to_dataset: str) -> Tuple[bool, str]

Register event into the datatrail database.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/registration/event_registration.py
Python
def register_event(
    self,
    event_number: int,
    data_type: str,
    files: List[Dict[str, Any]],
    attach_to_dataset: str,
) -> Tuple[bool, str]:
    """Register event into the datatrail database."""
    scope = f"{self.site}.event.{data_type}.raw"
    if attach_to_dataset == "realtime-pipeline":
        try:
            attach_to_dataset = self.query_tsar_verification(event_number)
        except Exception as err:
            return False, str(err)
    else:
        dataset = query.dataset_get(scope, attach_to_dataset)
        if type(dataset) != dict:
            return (
                False,
                f"dataset {attach_to_dataset}, scope {scope} could not be queried. Check datatrail database!!!!",  # noqa
            )
    status = commit.file_register(
        dataset_name=str(event_number),
        dataset_scope=scope,
        storage_name=self.site,
        storage_element_captured_at=self.site,
        files=files,
        attach_to_dataset=attach_to_dataset,
        test_mode=self.test_mode,
    )
    if status[0] is not True:
        return False, str(status[0])
    if len(status) > 1 and status[1] is not True:
        return False, str(status[1])
    return True, ""
register_payloads
Python
register_payloads(date, intensity_file_payloads, baseband_file_payloads, intensity_action_dataset, baseband_action_dataset, dry_run)

Helper function to register payloads for a gien list of events.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/registration/event_registration.py
Python
def register_payloads(
    self,
    date,
    intensity_file_payloads,
    baseband_file_payloads,
    intensity_action_dataset,
    baseband_action_dataset,
    dry_run,
):
    """Helper function to register payloads for a gien list of events."""
    if dry_run:
        log.info(f"Dry run -- intensity payloads: {intensity_file_payloads}")
        log.info(f"Dry run -- baseband payloads: {baseband_file_payloads}")
        self.dump_payloads(date, intensity_file_payloads, baseband_file_payloads)
    else:
        for event_number in intensity_file_payloads:
            t = time.time()
            data_type = "intensity"
            if (
                event_number not in intensity_action_dataset
                or intensity_action_dataset[event_number] == "unregistered"
            ):
                reason = f"No actions were found for event {event_number} in the Action Picker: L4 database."  # noqa
                self.add_unregistered_event(
                    event_number,
                    data_type,
                    date.strftime("%Y-%m-%d"),
                    "missing",
                    reason,
                )
                continue
            success, reason = self.register_event(
                event_number,
                data_type,
                intensity_file_payloads[event_number],
                intensity_action_dataset[event_number],
            )
            if success is False:
                self.add_unregistered_event(
                    event_number,
                    data_type,
                    date.strftime("%Y-%m-%d"),
                    intensity_action_dataset[event_number],
                    reason,
                )
            else:
                self.delete_unregistered_event(event_number, data_type)
            self.time_per_event.set(time.time() - t)
            if self.test_mode is False:
                push_to_gateway(
                    "frb-vsop.chime:9091",
                    job="datatrail_event_registration",
                    registry=self.registry,  # noqa
                )
        for event_number in baseband_file_payloads:
            t = time.time()
            data_type = "baseband"
            if event_number not in baseband_action_dataset:
                reason = f"No actions were found for event {event_number} in the Action Picker: L4 database."  # noqa
                self.add_unregistered_event(
                    event_number,
                    data_type,
                    date.strftime("%Y-%m-%d"),
                    "missing",
                    reason,
                )
                continue
            success, reason = self.register_event(
                event_number,
                data_type,
                baseband_file_payloads[event_number],
                baseband_action_dataset[event_number],
            )
            if success is False:
                self.add_unregistered_event(
                    event_number,
                    data_type,
                    date.strftime("%Y-%m-%d"),
                    baseband_action_dataset[event_number],
                    reason,
                )
            else:
                self.delete_unregistered_event(event_number, data_type)
            self.time_per_event.set(time.time() - t)
            if self.test_mode is False:
                push_to_gateway(
                    "frb-vsop.chime:9091",
                    job="datatrail_event_registration",
                    registry=self.registry,  # noqa
                )
register_unregistered_events
Python
register_unregistered_events(dry_run: bool = True)

Register backlog of unregistered into datatrail.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/registration/event_registration.py
Python
def register_unregistered_events(self, dry_run: bool = True):
    """Register backlog of unregistered into datatrail."""
    self.query_unregistered_events()
    for ue in self.unregistered_events:
        date = datetime.datetime.strptime(ue["date_captured"], "%Y-%m-%d")
        dirs = self.get_event_directories(date)

        if ue["data_type"] == "intensity":
            event_paths = self.get_event_paths(dirs[0])
        else:
            event_paths = self.get_event_paths(dirs[1])

        if int(ue["event_number"]) not in event_paths:
            log.warning(
                f'data for {ue["event_number"]}: {ue["data_type"]} is no longer on the archiver.'  # noqa
            )
            self.delete_unregistered_event(int(ue["event_number"]), ue["data_type"])
            return

        event_path = {ue["event_number"]: event_paths[int(ue["event_number"])]}
        event_files = self.get_event_files(event_path)
        file_payloads = self.get_registration_payloads(event_files)
        action_payloads = self.query_action_picker_dataset(
            [ue["event_number"]], ue["data_type"]
        )
        if ue["data_type"] == "intensity":
            self.register_payloads(
                date, file_payloads, [], action_payloads, [], dry_run
            )
        else:
            self.register_payloads(
                date, [], file_payloads, [], action_payloads, dry_run
            )
run
Python
run(dry_run: bool = True, run_until: Optional[datetime.datetime] = None, register_single_date: Optional[datetime.datetime] = None)

Run the registration system.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/registration/event_registration.py
Python
def run(
    self,
    dry_run: bool = True,
    run_until: Optional[datetime.datetime] = None,
    register_single_date: Optional[datetime.datetime] = None,
):
    """Run the registration system."""
    if register_single_date is not None:
        log.info(f"Registering events for date: {register_single_date}")
        self.register_date(register_single_date, dry_run)
        log.info(f"Registered events for date: {register_single_date}")
        return

    # First register_unregistered_events
    self.register_unregistered_events(dry_run=dry_run)
    # Register new events
    if run_until is None:
        run_until = datetime.datetime.utcnow()
    log.info(f"Registering events until date: {run_until}")
    self.query_last_completed_date()
    if self.last_completed_date is None:
        log.error("Uh oh!!!")
        log.error("You first need to create a result for last completed date.")
        log.error("Exiting...")
        return
    date = self.last_completed_date + datetime.timedelta(days=1)
    if run_until < date:
        log.info("You have already caught up")
        log.info("Exiting...")
        return
    while run_until - date >= datetime.timedelta(days=2):
        self.register_date(date, dry_run)
        date = date + datetime.timedelta(days=1)
        self.time_last_active.set(time.time())
        if self.test_mode is False:
            push_to_gateway(
                "frb-vsop.chime:9091",
                job="datatrail_event_registration",
                registry=self.registry,  # noqa
            )
    log.info("Registered events up to two days ago.")
update_last_successful_date
Python
update_last_successful_date(t: datetime.datetime)

Update the last completed date after registering all events.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/registration/event_registration.py
Python
def update_last_successful_date(self, t: datetime.datetime):
    """Update the last completed date after registering all events."""
    payload = {"last_completed_date": t.strftime("%Y-%m-%d")}
    w = Work(
        pipeline="datatrail-last-completed-date",
        user="datatrail",
        site="chime",
        results=payload,
        retries=1,
    )

    old_ts = utilities.result_view(
        self.results,
        "datatrail-last-completed-date",
        query={},
        projection={"_id": 0},
        limit=1,
    )
    if not old_ts:
        utilities.deposit(w)
        w = utilities.withdraw("datatrail-last-completed-date")
        w.status = "success"
        utilities.update(w)
        utilities.result_deposit(self.results, [w.payload])
    else:
        old_ts[0]["results"] = w.payload["results"]
        utilities.result_update(self.results, [old_ts[0]])
validate_filename
Python
validate_filename(filename: str) -> bool

Validate that the file name is an intensity or baseband file name.

Parameters

filename : str File name to check.

Returns

bool True only if the file name is recognized.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/registration/event_registration.py
Python
def validate_filename(self, filename: str) -> bool:
    """Validate that the file name is an intensity or baseband file name.

    Parameters
    ----------
    filename : str
        File name to check.

    Returns
    -------
    bool
        True only if the file name is recognized.
    """
    if "astro" in filename or "baseband" in filename:
        return True
    log.warning(f"Failed to validate file named {filename}")
    return False