Source code for PyMAIA_scripts.nndet_compute_metric_results

#!/usr/bin/env python

import json
import pickle
from argparse import ArgumentParser, RawTextHelpFormatter
from pathlib import Path
from textwrap import dedent
from typing import Dict, List, Sequence

import numpy as np
import pandas as pd
from sklearn.model_selection import KFold

from PyMAIA.utils.log_utils import get_logger, add_verbosity_options_to_argparser, log_lvl_from_verbosity_args

DESC = dedent(
    """
    Script to perform Object Detection (COCO and FROC Metrics) and Segmentation (Dice score) evaluation on nnDetection experiments.
    
    By specifying  the ``class-file`` and ``classes`` parameters, the class-wise analysis of the metrics is performed.
    
    An Excel Spreadsheet and several PNG plots (representing FROC curves and Histogram Analysis), are produced as output.
    """  # noqa: E501
)
EPILOG = dedent(
    """
    Example call:
    ::
        {filename} --config-file /PATH/TO/CONFIG_FILE.json --output-dir /OUTPUT/PATH
        {filename} --config-file /PATH/TO/CONFIG_FILE.json --output-dir /OUTPUT/PATH --n-fold 0
    """.format(  # noqa: E501
        filename=Path(__file__).stem
    )
)


[docs] def iou_filter(image_dict: Dict[int, Dict[str, np.ndarray]], iou_idx: List[int], filter_keys: Sequence[str] = ('dtMatches', 'gtMatches', 'dtIgnore')): """ This functions can be used to filter specific IoU values from the results to make sure that the correct IoUs are passed to metric Parameters ---------- image_dict : dict dictionary containin :param:`filter_keys` which contains IoUs in the first dimension iou_idx : List[int] indices of IoU values to filter from keys filter_keys : tuple, optional keys to filter, by default ('dtMatches', 'gtMatches', 'dtIgnore') Returns ------- dict filtered dictionary """ iou_idx = list(iou_idx) filtered = {} for cls_key, cls_item in image_dict.items(): filtered[cls_key] = {key: item[iou_idx] if key in filter_keys else item for key, item in cls_item.items()} return filtered
[docs] def get_unique_iou_thresholds(metric): """ Compute unique set of iou thresholds """ iou_thresholds = [_i for _i in metric.get_iou_thresholds()] iou_thresholds = list(set(iou_thresholds)) iou_thresholds.sort() return iou_thresholds
[docs] def get_indices_of_iou(metric): """ Find indices of iou thresholds for each metric """ return [get_unique_iou_thresholds(metric).index(th) for th in metric.get_iou_thresholds()]
[docs] def get_arg_parser(): pars = ArgumentParser(description=DESC, epilog=EPILOG, formatter_class=RawTextHelpFormatter) pars.add_argument( "--config-file", type=str, required=True, help="File path for the configuration dictionary, used to retrieve experiments variables.", ) pars.add_argument( "--run-fold", type=str, default="-1", required=False, help="Index indicating which fold to run. Default: ``-1``. If set to ``-1``, runs the metric evaluation on the " "consolidated predictions.", ) pars.add_argument( "--output-dir", type=str, required=True, help="Folder path where to save the Excel spreadsheet and the PNG plots containing the evaluation metrics.", ) pars.add_argument( "--class-file", type=str, required=False, help="Optional JSON file, containing a dictionary where each Subject ID is stored with the corresponding " "Subject Class." "If not specified, all the subjects will be analysed.", ) pars.add_argument( "--classes", type=str, nargs="+", required=False, help="Optional list of Subject Classes, to be considered for the class-wise result analysis.", ) pars.add_argument( "--n-folds", type=str, default="1", required=False, help="Number of Folds used to aggregate subjects and create Object Detection metrics statistic.", ) pars.add_argument( "--model", type=str, default="RetinaUNetV001_D3V001_3d", required=False, help="nnDetection Model used for sweeping and consolidation. Default: ``RetinaUNetV001_D3V001_3d``", ) add_verbosity_options_to_argparser(pars) return pars
[docs] def main(): from nndet.evaluator.detection.coco import COCOMetric from nndet.evaluator.detection.froc import FROCMetric from nndet.evaluator.detection.hist import PredictionHistogram parser = get_arg_parser() arguments, unknown_arguments = parser.parse_known_args() args = vars(arguments) logger = get_logger( # NOQA: F841 name=Path(__file__).name, level=log_lvl_from_verbosity_args(args), ) config_file = args["config_file"] with open(config_file) as json_file: data = json.load(json_file) output_dir = args["output_dir"] Path(output_dir).mkdir(parents=True, exist_ok=True) patients_classes_dict = None if args["class_file"] is not None and args["classes"] is not None: class_file = args["class_file"] patient_classes = args["classes"] with open(class_file, "rb") as file: patients_classes_dict = json.load(file) else: patient_classes = None if args["run_fold"] != "-1": results_folder = Path(data["results_folder"]).joinpath("Task{}_{}".format(data["Task_ID"], data["Task_Name"]), args["model"], "fold{}".format(args["run_fold"]), "val_results" ) else: results_folder = Path(data["results_folder"]).joinpath( "Task{}_{}".format(data["Task_ID"], data["Task_Name"]), args["model"], "consolidated", "val_results" ) boxes_metrics_file = str(Path(results_folder).joinpath("results_boxes_per_case.pkl")) boxes_ids_file = str(Path(results_folder).joinpath("results_boxes_per_case_IDs.json")) seg_file = str(Path(results_folder).joinpath("results_seg_per_case.json")) with open(seg_file, "rb") as file: patients_seg_dict = json.load(file) df_seg = [] for patient in patients_seg_dict: if patients_classes_dict is not None: df_seg.append({"Subject": patient, "Metric": "Dice", "Score": patients_seg_dict[patient], "Class": patients_classes_dict[patient]}) else: df_seg.append({"Subject": patient, "Metric": "Dice", "Score": patients_seg_dict[patient]}) df_seg = pd.DataFrame.from_records(df_seg) with open(boxes_metrics_file, "rb") as f: boxes_metrics = pickle.load(f) with open(boxes_ids_file, "rb") as file: boxes_ids = json.load(file) iou_thresholds = np.arange(0.1, 1.0, 0.1) iou_range = (0.1, 0.5, 0.05) per_class = True verbose = False classes = [label for label in data["label_dict"]] coco = COCOMetric(classes, iou_list=iou_thresholds, iou_range=iou_range, max_detection=(100,), per_class=per_class, verbose=verbose, ) writer = pd.ExcelWriter( Path(output_dir).joinpath("{}.xlsx".format("Task{}_{}".format(data["Task_ID"], data["Task_Name"]))), engine='openpyxl') if patient_classes is None: froc = FROCMetric(classes, iou_thresholds=iou_thresholds, fpi_thresholds=(1 / 8, 1 / 4, 1 / 2, 1, 2, 4, 8), per_class=per_class, verbose=verbose, save_dir=Path(output_dir) ) histo = PredictionHistogram(classes=classes, save_dir=Path(output_dir), iou_thresholds=(0.1, 0.5), ) Path(output_dir).mkdir(parents=True, exist_ok=True) filtered_boxes_metrics = [] if int(args["n_folds"]) == 1: df = [] filtered_boxes_metrics = boxes_metrics iou_filtered_results = [iou_filter(boxes_metric, iou_idx=get_indices_of_iou(froc)) for boxes_metric in filtered_boxes_metrics] scores, curves = froc.compute(iou_filtered_results) for score in scores: df.append({"Metric": score, "Score": scores[score], "Group": 0}) iou_filtered_results = [iou_filter(boxes_metric, iou_idx=get_indices_of_iou(coco)) for boxes_metric in filtered_boxes_metrics] scores, _ = coco.compute(iou_filtered_results) for score in scores: df.append({"Metric": score, "Score": scores[score], "Group": 0}) else: kf = KFold(n_splits=int(args["n_folds"]), random_state=1234567, shuffle=True) df = [] for i, (_, indexes) in enumerate(kf.split(list(range(len(boxes_metrics))))): filtered_boxes_metrics = [] for idx in indexes: filtered_boxes_metrics.append(boxes_metrics[idx]) iou_filtered_results = [iou_filter(boxes_metric, iou_idx=get_indices_of_iou(froc)) for boxes_metric in filtered_boxes_metrics] scores, curves = froc.compute(iou_filtered_results) for score in scores: df.append({"Metric": score, "Score": scores[score], "Group": i}) iou_filtered_results = [iou_filter(boxes_metric, iou_idx=get_indices_of_iou(coco)) for boxes_metric in filtered_boxes_metrics] scores, _ = coco.compute(iou_filtered_results) for score in scores: df.append({"Metric": score, "Score": scores[score], "Group": i}) df = pd.DataFrame.from_records(df) df.to_excel(writer, sheet_name="Object Detection") iou_filtered_results = [iou_filter(boxes_metric, iou_idx=get_indices_of_iou(histo)) for boxes_metric in filtered_boxes_metrics] _, _ = histo.compute(iou_filtered_results) else: for class_name in patient_classes: if int(args["n_folds"]) == 1: froc = FROCMetric(classes, iou_thresholds=iou_thresholds, fpi_thresholds=(1 / 8, 1 / 4, 1 / 2, 1, 2, 4, 8), per_class=per_class, verbose=verbose, save_dir=Path(output_dir).joinpath(class_name) ) histo = PredictionHistogram(classes=classes, save_dir=Path(output_dir).joinpath(class_name), iou_thresholds=(0.1, 0.5), ) Path(output_dir).joinpath(class_name).mkdir(parents=True, exist_ok=True) filtered_boxes_metrics = [] for boxes_metric, id in zip(boxes_metrics, boxes_ids): if patients_classes_dict[id] == class_name: filtered_boxes_metrics.append(boxes_metric) df = [] fold_filtered_boxes_metrics = filtered_boxes_metrics iou_filtered_results = [iou_filter(boxes_metric, iou_idx=get_indices_of_iou(froc)) for boxes_metric in fold_filtered_boxes_metrics] scores, curves = froc.compute(iou_filtered_results) for score in scores: df.append({"Metric": score, "Score": scores[score], "Group": 0}) iou_filtered_results = [iou_filter(boxes_metric, iou_idx=get_indices_of_iou(coco)) for boxes_metric in fold_filtered_boxes_metrics] scores, _ = coco.compute(iou_filtered_results) for score in scores: df.append({"Metric": score, "Score": scores[score], "Group": 0}) else: kf = KFold(n_splits=int(args["n_folds"]), random_state=1234567, shuffle=True) froc = FROCMetric(classes, iou_thresholds=iou_thresholds, fpi_thresholds=(1 / 8, 1 / 4, 1 / 2, 1, 2, 4, 8), per_class=per_class, verbose=verbose, save_dir=Path(output_dir).joinpath(class_name) ) histo = PredictionHistogram(classes=classes, save_dir=Path(output_dir).joinpath(class_name), iou_thresholds=(0.1, 0.5), ) Path(output_dir).joinpath(class_name).mkdir(parents=True, exist_ok=True) filtered_boxes_metrics = [] for boxes_metric, id in zip(boxes_metrics, boxes_ids): if patients_classes_dict[id] == class_name: filtered_boxes_metrics.append(boxes_metric) df = [] for i, (_, indexes) in enumerate(kf.split(list(range(len(filtered_boxes_metrics))))): fold_filtered_boxes_metrics = [] for idx in indexes: fold_filtered_boxes_metrics.append(filtered_boxes_metrics[idx]) iou_filtered_results = [iou_filter(boxes_metric, iou_idx=get_indices_of_iou(froc)) for boxes_metric in fold_filtered_boxes_metrics] scores, curves = froc.compute(iou_filtered_results) for score in scores: df.append({"Metric": score, "Score": scores[score], "Group": i}) iou_filtered_results = [iou_filter(boxes_metric, iou_idx=get_indices_of_iou(coco)) for boxes_metric in fold_filtered_boxes_metrics] scores, _ = coco.compute(iou_filtered_results) for score in scores: df.append({"Metric": score, "Score": scores[score], "Group": i}) df = pd.DataFrame.from_records(df) df.to_excel(writer, sheet_name="Object Detection-{}".format(class_name)) iou_filtered_results = [iou_filter(boxes_metric, iou_idx=get_indices_of_iou(histo)) for boxes_metric in fold_filtered_boxes_metrics] _, _ = histo.compute(iou_filtered_results) df_seg.to_excel(writer, sheet_name="Segmentation") writer.close()
if __name__ == "__main__": main()