#!/usr/bin/env python3 """ Compute semantic segmentation evaluation metrics TODO:: - RRMSE (relative root mean squared error) RMSE normalized by root mean sqare value where each residual is scaled against the actual value sqrt((1 / n) * sum((y - y_hat) ** 2) / sum(y ** 2)) TODO: - [ ] Move to kwcoco proper """ import json import kwarray import kwcoco import kwimage import numpy as np import os import pandas as pd import sklearn.metrics as skm import ubelt as ub import warnings from kwcoco.coco_evaluator import CocoSingleResult from kwcoco.metrics.confusion_vectors import BinaryConfusionVectors from kwcoco.metrics.confusion_measures import OneVersusRestMeasureCombiner from kwcoco.metrics.confusion_vectors import OneVsRestConfusionVectors from kwcoco.metrics.confusion_measures import MeasureCombiner # from kwcoco.metrics.confusion_measures import PerClass_Measures from kwcoco.metrics.confusion_measures import Measures from typing import Dict import scriptconfig as scfg from shapely.ops import unary_union from geowatch.utils import kwcoco_extensions from geowatch.utils import process_context from kwutil import util_progress from kwutil import util_parallel from geowatch import heuristics try: from line_profiler import profile except Exception: profile = ub.identity # The colors I traditionally use for truth and predictions # TRUE_GREEN = 'limegreen' # PRED_BLUE = 'dodgerblue' # If we have a recent kwimage we can use kitware colors, which look pretty good # in these roles too. TRUE_GREEN = 'kitware_green' PRED_BLUE = 'kitware_blue' class SegmentationEvalConfig(scfg.DataConfig): """ Evaluation script for change/segmentation task """ true_dataset = scfg.Value(None, help='path to the groundtruth dataset') pred_dataset = scfg.Value(None, help='path to the predicted dataset') eval_dpath = scfg.Value(None, help='directory to dump results') eval_fpath = scfg.Value(None, help='path to dump result summary') # options draw_curves = scfg.Value('auto', help='flag to draw curves or not') draw_heatmaps = scfg.Value('auto', help='flag to draw heatmaps or not') score_space = scfg.Value('auto', help='can score in image or video space. If auto, chooses video if there are any, otherwise image') resolution = scfg.Value(None, help='if specified, override the default resolution to score at') workers = scfg.Value('auto', help='number of parallel scoring workers') draw_workers = scfg.Value('auto', help='number of parallel drawing workers') viz_thresh = scfg.Value('auto', help='visualization threshold') balance_area = scfg.Value(False, isflag=True, help='upweight small instances, downweight large instances') def main(cmdline=True, **kwargs): """ Entry point: todo: doctest and CLI structure todo: ProcessContext to track resource usage """ full_config = SegmentationEvalConfig.cli( cmdline=cmdline, data=kwargs, strict=True) import rich rich.print('full_config = {}'.format(ub.urepr(full_config, nl=1))) full_config = ub.udict(full_config) true_coco = kwcoco.CocoDataset.coerce(full_config['true_dataset']) pred_coco = kwcoco.CocoDataset.coerce(full_config['pred_dataset']) eval_fpath = full_config['eval_fpath'] eval_dpath = full_config['eval_dpath'] config = full_config - { 'true_dataset', 'pred_dataset', 'eval_dpath', 'eval_fpath'} evaluate_segmentations(true_coco, pred_coco, eval_dpath, eval_fpath, config) @profile def single_image_segmentation_metrics(pred_coco_img, true_coco_img, true_classes, true_dets, video1=None, thresh_bins=None, config=None): """ Args: true_coco_img (kwcoco.CocoImage): detatched true coco image pred_coco_img (kwcoco.CocoImage): detatched predicted coco image thresh_bins (int): if specified rounds scores into this many bins to make calculating metrics more efficient """ if config is None: config = {} viz_thresh = config.get('viz_thresh', None) score_space = config.get('score_space', 'auto') resolution = config.get('resolution', None) balance_area = config.get('balance_area', False) if score_space == 'auto': pred_vidid = pred_coco_img.img.get('video_id', None) true_vidid = true_coco_img.img.get('video_id', None) if true_vidid is not None or pred_vidid is not None: score_space = 'video' else: score_space = 'image' true_gid = true_coco_img.img['id'] pred_gid = pred_coco_img.img['id'] if thresh_bins is not None: if isinstance(thresh_bins, int): left_bin_edges = np.linspace(0, 1, thresh_bins) else: left_bin_edges = thresh_bins else: left_bin_edges = None img1 = true_coco_img.img if score_space == 'image': dsize = np.array((img1['width'], img1['height'])) elif score_space == 'video': dsize = np.array((video1['width'], video1['height'])) else: raise KeyError(score_space) if resolution is None: scale = None else: try: scale = true_coco_img._scalefactor_for_resolution(resolution=resolution, space=score_space) except Exception as ex: print(f'warning: ex={ex}') scale = None if scale is not None: dsize = np.ceil(np.array(dsize) * np.array(scale)).astype(int) row = { 'true_gid': true_gid, 'pred_gid': pred_gid, } if video1 is not None: row['video'] = video1['name'] shape = dsize[::-1] info = { 'row': row, 'shape': shape, } # TODO: parametarize these class categories # TODO: remove and generalize before porting to kwcoco ignore_classes = heuristics.IGNORE_CLASSNAMES background_classes = heuristics.BACKGROUND_CLASSES undistinguished_classes = heuristics.UNDISTINGUISHED_CLASSES context_classes = heuristics.CONTEXT_CLASSES negative_classes = heuristics.NEGATIVE_CLASSES # HACK! FIXME: There needs to be a clear definition of what classes are # scored and which are not. background_classes = background_classes | negative_classes """ The above heuristics should roughtly be: * ignore_classes - ignore, Unknown * background_classes - background, negative * undistinguished_classes - positive * context_classes - No Activity Post Construction inferred: * class_scored_classes - Site Preperation, Active Construction * salient_scored_classes - positive, Site Preperation, Active Construction """ # Determine what true/predicted categories are in common predicted_classes = [] for stream in pred_coco_img.channels.streams(): have = stream.intersection(true_classes) predicted_classes.extend(have.parsed) classes_of_interest = ub.oset(predicted_classes) - ( negative_classes | background_classes | ignore_classes | undistinguished_classes) # Determine if saliency has been predicted salient_class = 'salient' has_saliency = salient_class in pred_coco_img.channels # Load ground truth annotations if score_space == 'video': warp_img_to_vid = kwimage.Affine.coerce( true_coco_img.img.get('warp_img_to_vid', {'type': 'affine'})) true_dets = true_dets.warp(warp_img_to_vid) if scale is not None: true_dets = true_dets.scale(scale) info['true_dets'] = true_dets true_cidxs = true_dets.data['class_idxs'] true_ssegs = true_dets.data['segmentations'] true_catnames = list(ub.take(true_dets.classes.idx_to_node, true_cidxs)) # NOTE: The exact definition of how we build the "truth" segmentation mask # is up for debate. I think this is a reasonable definition, but this needs # to be reviewed. It also likely needs updating to become general and # remove the need for heuristics. # We might need to: # * add in a per-category weight canvas. This lets us say we can ignore # clas A when scoring class B. Is there an example where this is # relevant? # Does negative get moved to the background or scored? # Currently I'm just moving it to the background # How do we distinguish that # TODO: # Use the "valid_polygon" to zero out evaluations in invalid regions # Also use nan values in the predictions to do the same. # Combine these two measures. # Create a truth "panoptic segmentation" style mask for each task if has_saliency: # Truth for saliency-task true_saliency = np.zeros(shape, dtype=np.uint8) saliency_weights = np.ones(shape, dtype=np.float32) sseg_groups = { 'ignore': [], 'context': [], 'foreground': [], 'background': [], } for true_sseg, true_catname in zip(true_ssegs, true_catnames): if true_catname in background_classes: key = 'background' elif true_catname in ignore_classes: key = 'ignore' elif true_catname in context_classes: key = 'context' else: key = 'foreground' sseg_groups[key].append(true_sseg) if balance_area: if len(sseg_groups['foreground']): fg_poly = unary_union([p.to_shapely() for p in sseg_groups['foreground']]) unit_sseg_share = fg_poly.area / len(sseg_groups['foreground']) else: unit_sseg_share = 1 # background should be background, do nothing with it sseg_groups['background'] # Ignore context classes in saliency # Ignore no-activity and post-construction, ignore, and Unknown for true_sseg in sseg_groups['ignore']: saliency_weights = true_sseg.fill(saliency_weights, value=0) for true_sseg in sseg_groups['context']: # saliency_weights = true_sseg.fill(saliency_weights, value=0) ... # Score positive, site prep, and active construction. for true_sseg in sseg_groups['foreground']: true_saliency = true_sseg.fill(true_saliency, value=1) if balance_area: # Fill in the weights to upweight smaller areas. instance_weight = unit_sseg_share / true_sseg.area saliency_weights = true_sseg.fill(saliency_weights, value=instance_weight) # saliency_weights = saliency_weights / saliency_weights.max() if classes_of_interest: # Truth for class-task catname_to_true: Dict[str, np.ndarray] = { catname: np.zeros(shape, dtype=np.float32) for catname in classes_of_interest } class_weights = np.ones(shape, dtype=np.float32) initial_total_weight = class_weights.size sseg_groups = { 'background': [], 'ignore': [], 'undistinguished': [], 'foreground': [], } for true_sseg, true_catname in zip(true_ssegs, true_catnames): if true_catname in background_classes: key = 'background' elif true_catname in ignore_classes: key = 'ignore' elif true_catname in undistinguished_classes: key = 'undistinguished' else: key = 'foreground' true_sseg.meta['true_catname'] = true_catname sseg_groups[key].append(true_sseg) if balance_area: if len(sseg_groups['foreground']): fg_poly = unary_union([p.to_shapely() for p in sseg_groups['foreground']]) unit_sseg_share = fg_poly.area / len(sseg_groups['foreground']) else: unit_sseg_share = 1 true_sseg.area / initial_total_weight # background should be background, do nothing with it sseg_groups['background'] # Ignore context classes in saliency # Ignore no-activity and post-construction, ignore, and Unknown for true_sseg in sseg_groups['ignore']: class_weights = true_sseg.fill(class_weights, value=0) for true_sseg in sseg_groups['undistinguished']: class_weights = true_sseg.fill(class_weights, value=0) # Score positive, site prep, and active construction. for true_sseg in sseg_groups['foreground']: true_catname = true_sseg.meta['true_catname'] if balance_area: # Fill in the weights to upweight smaller areas. instance_weight = unit_sseg_share / true_sseg.area class_weights = true_sseg.fill(class_weights, value=instance_weight) catname_to_true[true_catname] = true_sseg.fill(catname_to_true[true_catname], value=1) # Hack: # normalize to 0-1, this downweights the background too much, but # I think fixes a upstream issue. Remove (or justify?) if possible. # class_weights = class_weights / class_weights.max() if classes_of_interest: # handle multiclass case pred_chan_of_interest = '|'.join(classes_of_interest) delayed_probs = pred_coco_img.imdelay( pred_chan_of_interest, space=score_space, resolution=resolution, nodata_method='float').as_xarray() # Do we need xarray anymore? class_probs = delayed_probs.finalize() invalid_mask = np.isnan(class_probs).all(axis=2) # import xdev # with xdev.embed_on_exception_context(before_embed=util_progress.ProgressManager.stopall): class_weights[invalid_mask] = 0 catname_to_prob = {} cx_to_binvecs = {} for cx, cname in enumerate(classes_of_interest): is_true = catname_to_true[cname] score = class_probs.loc[:, :, cname].data.copy() invalid_mask = np.isnan(score) weights = class_weights.copy() weights[invalid_mask] = 0 score[invalid_mask] = 0 pred_score = score.ravel() if left_bin_edges is not None: # round scores down to the nearest bin rounded_idx = np.searchsorted(left_bin_edges, pred_score) pred_score = left_bin_edges[rounded_idx] catname_to_prob[cname] = score bin_data = { # is_true denotes if the true class of the item is the # category of interest. 'is_true': is_true.ravel(), 'pred_score': pred_score, 'weight': weights.ravel(), } bin_data = kwarray.DataFrameArray(bin_data) bin_cfsn = BinaryConfusionVectors(bin_data, cx, classes_of_interest) # TODO: use me? # bin_measures = bin_cfsn.measures() # bin_measures.summary() cx_to_binvecs[cname] = bin_cfsn ovr_cfns = OneVsRestConfusionVectors(cx_to_binvecs, classes_of_interest) class_measures = ovr_cfns.measures() row['mAP'] = class_measures['mAP'] row['mAUC'] = class_measures['mAUC'] info.update({ 'class_weights': class_weights, 'class_measures': class_measures, 'catname_to_true': catname_to_true, 'catname_to_prob': catname_to_prob, }) if has_saliency: # TODO: consolidate this with above class-specific code salient_delay = pred_coco_img.imdelay(salient_class, space=score_space, resolution=resolution, nodata_method='float') salient_prob = salient_delay.finalize(nodata_method='float')[..., 0] salient_prob_orig = salient_prob.copy() invalid_mask = np.isnan(salient_prob) salient_prob[invalid_mask] = 0 try: saliency_weights[invalid_mask] = 0 except Exception: print(f'invalid_mask.shape={invalid_mask.shape}') print(f'saliency_weights.shape={saliency_weights.shape}') raise pred_score = salient_prob.ravel() if left_bin_edges is not None: rounded_idx = np.searchsorted(left_bin_edges, pred_score) pred_score = left_bin_edges[rounded_idx] bin_cfns = BinaryConfusionVectors(kwarray.DataFrameArray({ 'is_true': true_saliency.ravel(), 'pred_score': pred_score, 'weight': saliency_weights.ravel().astype(np.float32), })) salient_measures = bin_cfns.measures() salient_summary = salient_measures.summary() salient_metrics = { 'salient_' + k: v for k, v in ub.dict_isect(salient_summary, { 'ap', 'auc', 'max_f1'}).items() } row.update(salient_metrics) info.update({ 'salient_measures': salient_measures, 'salient_prob': salient_prob_orig, 'true_saliency': true_saliency, }) if 1: maximized_info = salient_measures.maximized_thresholds() # This cherry-picks a threshold per image! if viz_thresh == 'auto': cherry_picked_thresh = maximized_info['f1']['thresh'] saliency_thresh = cherry_picked_thresh else: saliency_thresh = viz_thresh pred_saliency = salient_prob > saliency_thresh y_true = true_saliency.ravel() y_pred = pred_saliency.ravel() sample_weight = saliency_weights.ravel() mat = skm.confusion_matrix(y_true, y_pred, labels=np.array([0, 1]), sample_weight=sample_weight) info.update({ 'mat': mat, 'pred_saliency': pred_saliency, 'saliency_thresh': saliency_thresh, 'saliency_weights': saliency_weights, }) # TODO: look at the category ranking at each pixel by score. # Is there a generalization of a confusion matrix to a ranking tensor? # if 0: # # TODO: Reintroduce hard-polygon segmentation scoring? # # Score hard-threshold predicted annotations # # SCORE PREDICTED ANNOTATIONS # # Create a pred "panoptic segmentation" style mask # pred_saliency = np.zeros(shape, dtype=np.uint8) # pred_dets = pred_coco.annots(gid=gid2).detections # for pred_sseg in pred_dets.data['segmentations']: # pred_saliency = pred_sseg.fill(pred_saliency, value=1) return info @ub.memoize def _memo_legend(label_to_color): import kwplot legend_img = kwplot.make_legend_img(label_to_color) return legend_img def draw_confusion_image(pred, target): canvas = np.zeros_like(pred) np.putmask(canvas, (target == 0) & (pred == 0), 0) # true-neg np.putmask(canvas, (target == 1) & (pred == 1), 1) # true-pos np.putmask(canvas, (target == 1) & (pred == 0), 2) # false-neg np.putmask(canvas, (target == 0) & (pred == 1), 3) # false-pos return canvas @profile def colorize_class_probs(probs, classes): """ probs = pred_cat_ohe classes = pred_classes """ # color = classes.graph.nodes[node].get('color', None) # Define default colors # default_cidx_to_color = kwimage.Color.distinct(len(data)) # try and read colors from classes CategoryTree # try: # cidx_to_color = [] cidx_to_color = [] for cidx in range(len(probs)): node = classes[cidx] color = classes.graph.nodes[node].get('color', None) if color is not None: color = kwimage.Color(color).as01() cidx_to_color.append(color) import distinctipy have_colors = [c for c in cidx_to_color if c is not None] num_need = sum(c is None for c in cidx_to_color) if num_need: new_colors = distinctipy.get_colors( num_need, exclude_colors=have_colors, rng=569944) new_color_iter = iter(new_colors) cidx_to_color = [next(new_color_iter) if c is None else c for c in cidx_to_color] canvas_dtype = np.float32 # Each class gets its own color, and modulates the alpha h, w = probs.shape[-2:] layer_shape = (h, w, 4) background = np.zeros(layer_shape, dtype=canvas_dtype) background[..., 3] = 1.0 layers = [] for cidx, chan in enumerate(probs): color = cidx_to_color[cidx] layer = np.empty(layer_shape, dtype=canvas_dtype) layer[..., 3] = chan layer[..., 0:3] = color layers.append(layer) layers.append(background) colormask = kwimage.overlay_alpha_layers( layers, keepalpha=False, dtype=canvas_dtype) return colormask @profile def draw_truth_borders(true_dets, canvas, alpha=1.0, color=None): true_sseg = true_dets.data['segmentations'] true_cidxs = true_dets.data['class_idxs'] _classes = true_dets.data['classes'] if color is None: _nodes = ub.take(_classes.idx_to_node, true_cidxs) _node_data = ub.take(_classes.graph.nodes, _nodes) _node_colors = [d['color'] for d in _node_data] color = _node_colors canvas = kwimage.ensure_float01(canvas) if alpha < 1.0: # remove this condition when kwimage 0.8.3 is released always take else empty_canvas = np.zeros_like(canvas, shape=(canvas.shape[0:2] + (4,))) overlay_canvas = true_sseg.draw_on(empty_canvas, fill=False, border=True, color=color, alpha=1.0) overlay_canvas[..., 3] *= alpha canvas = kwimage.overlay_alpha_images(overlay_canvas, canvas) else: canvas = true_sseg.draw_on(canvas, fill=False, border=True, color=color, alpha=alpha) return canvas @profile def dump_chunked_confusion(full_classes, true_coco_imgs, chunk_info, heatmap_dpath, title=None, config=None): """ Draw a a sequence of true/pred image predictions """ color_labels = ['TN', 'TP', 'FN', 'FP'] score_space = config.get('score_space', 'video') colors = list(ub.take(heuristics.CONFUSION_COLOR_SCHEME, color_labels)) # colors = ['blue', 'green', 'yellow', 'red'] # colors = ['black', 'white', 'yellow', 'red'] color_lut = np.array([kwimage.Color(c).as255() for c in colors]) # full_classes: kwcoco.CategoryTree = true_coco.object_categories() if config is None: config = {} resolution = config.get('resolution', None) # Make a legend color01_lut = color_lut / 255.0 legend_images = [] if 'catname_to_prob' in chunk_info[0]: # Class Legend label_to_color = { node: kwimage.Color(data['color']).as01() for node, data in full_classes.graph.nodes.items()} label_to_color = ub.sorted_keys(label_to_color) legend_img_class = _memo_legend(label_to_color) legend_images.append(legend_img_class) if 'pred_saliency' in chunk_info[0]: # Confusion Legend label_to_color = ub.dzip(color_labels, color01_lut) legend_img_saliency_cfsn = _memo_legend(label_to_color) legend_img_saliency_cfsn = kwimage.ensure_uint255(legend_img_saliency_cfsn) legend_images.append(legend_img_saliency_cfsn) if len(legend_images): legend_img = kwimage.stack_images(legend_images, axis=0, pad=5) else: legend_img = None # Draw predictions on each frame parts = [] frame_nums = [] true_gids = [] unique_vidnames = set() for info, true_coco_img in zip(chunk_info, true_coco_imgs): row = info['row'] if row.get('video', ''): unique_vidnames.add(row['video']) # true_gid = row['true_gid'] # true_coco_img = true_coco.coco_image(true_gid) true_gid = true_coco_img.img['id'] true_img = true_coco_img.img frame_index = true_img.get('frame_index', None) if frame_index is not None: frame_nums.append(frame_index) true_gids.append(true_gid) # image_header_text = f'{frame_index} - gid = {true_gid}' header_lines = heuristics.build_image_header_text( img=true_img, name=None, _header_extra=None, ) # date_captured = true_img.get('date_captured', '') # frame_index = true_img.get('frame_index', None) # gid = true_img.get('id', None) # sensor_coarse = true_img.get('sensor_coarse', 'unknown') # _header_extra = None # header_line_infos = [ # [f'gid={gid}, frame={frame_index}', _header_extra], # [sensor_coarse, date_captured], # ] # header_lines = [] # for line_info in header_line_infos: # header_line = ' '.join([p for p in line_info if p]) # if header_line: # header_lines.append(header_line) image_header_text = '\n'.join(header_lines) imgw = info['shape'][1] # SC_smt_it_stm_p8_newanns_weighted_raw_v39_epoch=52-step=2269088 header = kwimage.draw_header_text( {'width': imgw}, # image=confusion_image, # image=None, text=image_header_text, color='red', stack=False) vert_parts = [ header, ] DRAW_WEIGHTS = 1 if 'catname_to_prob' in info: true_dets = info['true_dets'] true_dets.data['classes'] = full_classes pred_classes = kwcoco.CategoryTree.coerce(list(info['catname_to_prob'].keys())) true_classes = kwcoco.CategoryTree.coerce(list(info['catname_to_true'].keys())) # todo: ensure colors are robust and consistent for node in pred_classes.graph.nodes(): pred_classes.graph.nodes[node]['color'] = full_classes.graph.nodes[node]['color'] for node in true_classes.graph.nodes(): true_classes.graph.nodes[node]['color'] = full_classes.graph.nodes[node]['color'] # pred_classes = kwcoco.CategoryTree pred_cat_ohe = np.stack(list(info['catname_to_prob'].values())) true_cat_ohe = np.stack(list(info['catname_to_true'].values())) # class_pred_idx = pred_cat_ohe.argmax(axis=0) # class_true_idx = true_cat_ohe.argmax(axis=0) true_overlay = colorize_class_probs(true_cat_ohe, true_classes)[..., 0:3] # true_heatmap = kwimage.Heatmap(class_probs=true_cat_ohe, classes=true_classes) # true_overlay = true_heatmap.colorize('class_probs')[..., 0:3] true_overlay = draw_truth_borders(true_dets, true_overlay, alpha=1.0) true_overlay = kwimage.ensure_uint255(true_overlay) true_overlay = kwimage.draw_text_on_image( true_overlay, 'true class', org=(1, 1), valign='top', color=TRUE_GREEN, border=1) vert_parts.append(true_overlay) if DRAW_WEIGHTS: class_weights = info['class_weights'] if class_weights.max() > 1: weight_image = kwarray.normalize(class_weights, min_val=0) weight_title = 'weights (normed)' else: weight_image = class_weights weight_title = 'weights' weight_image = kwimage.ensure_uint255(weight_image) weight_image = kwimage.draw_text_on_image( weight_image, weight_title, org=(1, 1), valign='top', color='pink', border=1) vert_parts.append(weight_image) pred_overlay = colorize_class_probs(pred_cat_ohe, pred_classes)[..., 0:3] # pred_heatmap = kwimage.Heatmap(class_probs=pred_cat_ohe, classes=pred_classes) # pred_overlay = pred_heatmap.colorize('class_probs')[..., 0:3] pred_overlay = draw_truth_borders(true_dets, pred_overlay, alpha=0.05, color='white') # pred_overlay = draw_truth_borders(true_dets, pred_overlay, alpha=0.05) pred_overlay = kwimage.ensure_uint255(pred_overlay) pred_overlay = kwimage.draw_text_on_image( pred_overlay, 'pred class', org=(1, 1), valign='top', color=PRED_BLUE, border=1) vert_parts.append(pred_overlay) if 'pred_saliency' in info: pred_saliency = info['pred_saliency'].astype(np.uint8) true_saliency = info['true_saliency'] saliency_thresh = info['saliency_thresh'] confusion_idxs = draw_confusion_image(pred_saliency, true_saliency) confusion_image = color_lut[confusion_idxs] confusion_image = kwimage.ensure_uint255(confusion_image) confusion_image = kwimage.draw_text_on_image( confusion_image, f'confusion saliency: thresh={saliency_thresh:0.3f}', org=(1, 1), valign='top', color='white', border=1) vert_parts.append( confusion_image ) if DRAW_WEIGHTS: saliency_weights = info['saliency_weights'] if saliency_weights.max() > 1: weight_image = kwarray.normalize(saliency_weights, min_val=0) weight_title = 'weights (normed)' else: weight_image = saliency_weights weight_title = 'weights' weight_image = kwimage.ensure_uint255(weight_image) weight_image = kwimage.draw_text_on_image( weight_image, weight_title, org=(1, 1), valign='top', color='pink', border=1) vert_parts.append(weight_image) elif 'true_saliency' in info: true_saliency = info['true_saliency'] true_saliency = true_saliency.astype(np.float32) heatmap = kwimage.make_heatmask( true_saliency, with_alpha=0.5, cmap='plasma') # heatmap[invalid_mask] = 0 heatmap_int = kwimage.ensure_uint255(heatmap[..., 0:3]) heatmap_int = kwimage.draw_text_on_image( heatmap_int, 'true saliency', org=(1, 1), valign='top', color=TRUE_GREEN, border=1) vert_parts.append(heatmap_int) # confusion_image = kwimage.draw_text_on_image( # confusion_image, image_text, org=(1, 1), valign='top', # color='white', border={'color': 'black'}) # TODO: # Can we show the reference image? # TODO: # Show the datetime on the top of the image (and the display band?) real_image_norm = None real_image_int = None TRY_IMREAD = 1 if TRY_IMREAD: avali_chans = {p2 for p1 in true_coco_img.channels.spec.split(',') for p2 in p1.split('|')} chosen_viz_channs = None if len(avali_chans & {'red', 'green', 'blue'}) == 3: chosen_viz_channs = 'red|green|blue' elif len(avali_chans & {'r', 'g', 'b'}) == 3: chosen_viz_channs = 'r|g|b' elif len(avali_chans & {'pan'}) == 3: chosen_viz_channs = 'pan' else: chosen_viz_channs = true_coco_img.primary_asset()['channels'] try: real_image = true_coco_img.imdelay(chosen_viz_channs, space=score_space, nodata_method='float', resolution=resolution).finalize()[:] real_image_norm = kwimage.normalize_intensity(real_image) real_image_norm = kwimage.fill_nans_with_checkers(real_image_norm) real_image_int = kwimage.ensure_uint255(real_image_norm) except Exception as ex: print('ex = {!r}'.format(ex)) TRY_SOFT = 1 salient_prob = None if TRY_SOFT: salient_prob = info.get('salient_prob', None) # invalid_mask = info.get('invalid_mask', None) if salient_prob is not None: invalid_mask = np.isnan(salient_prob) heatmap = kwimage.make_heatmask( salient_prob, with_alpha=0.5, cmap='plasma') heatmap[invalid_mask] = np.nan heatmap = kwimage.fill_nans_with_checkers(heatmap) # heatmap[invalid_mask] = 0 heatmap_int = kwimage.ensure_uint255(heatmap[..., 0:3]) heatmap_int = kwimage.draw_text_on_image( heatmap_int, 'pred saliency', org=(1, 1), valign='top', color=PRED_BLUE, border=1) vert_parts.append(heatmap_int) # if real_image_norm is not None: # overlaid = kwimage.overlay_alpha_layers([heatmap, real_image_norm.mean(axis=2)]) # overlaid = kwimage.ensure_uint255(overlaid[..., 0:3]) # vert_parts.append(overlaid) if real_image_int is not None: vert_parts.append(real_image_int) vert_parts = [kwimage.ensure_uint255(c) for c in vert_parts] vert_stack = kwimage.stack_images(vert_parts, axis=0) parts.append(vert_stack) max_frame = None if len(frame_nums) == 0 else max(frame_nums) min_frame = None if len(frame_nums) == 0 else min(frame_nums) max_gid = max(true_gids) min_gid = min(true_gids) if max_frame == min_frame: frame_part = f'{min_frame}' else: frame_part = f'{min_frame}-{max_frame}' if max_gid == min_gid: gid_part = f'{min_gid}' else: gid_part = f'{min_gid}-{max_gid}' vidname_part = '_'.join(list(unique_vidnames)) if not vidname_part: vidname_part = '_loose_images' plot_fstem = f'{vidname_part}-{frame_part}-{gid_part}' canvas_title_parts = [] if title: canvas_title_parts.append(title) canvas_title_parts.append(plot_fstem) canvas_title = '\n'.join(canvas_title_parts) plot_canvas = kwimage.stack_images(parts, axis=1, overlap=-10) if legend_img is not None: plot_canvas = kwimage.stack_images( [plot_canvas, legend_img], axis=1, overlap=-10) header = kwimage.draw_header_text( {'width': plot_canvas.shape[1]}, canvas_title) plot_canvas = kwimage.stack_images([header, plot_canvas], axis=0) heatmap_dpath = ub.Path(str(heatmap_dpath)) vid_plot_dpath = (heatmap_dpath / vidname_part).ensuredir() plot_fpath = vid_plot_dpath / (plot_fstem + '.jpg') kwimage.imwrite(str(plot_fpath), plot_canvas) @profile def evaluate_segmentations(true_coco, pred_coco, eval_dpath=None, eval_fpath=None, config=None): """ TODO: - [ ] Fold non-critical options into the config CommandLine: XDEV_PROFILE=1 xdoctest -m geowatch.tasks.fusion.evaluate evaluate_segmentations Example: >>> from geowatch.tasks.fusion.evaluate import * # NOQA >>> from kwcoco.coco_evaluator import CocoEvaluator >>> from kwcoco.demo.perterb import perterb_coco >>> import kwcoco >>> true_coco1 = kwcoco.CocoDataset.demo('vidshapes2', image_size=(64, 64)) >>> true_coco2 = kwcoco.CocoDataset.demo('shapes2', image_size=(64, 64)) >>> #true_coco1 = kwcoco.CocoDataset.demo('vidshapes9') >>> #true_coco2 = kwcoco.CocoDataset.demo('shapes128') >>> true_coco = kwcoco.CocoDataset.union(true_coco1, true_coco2) >>> kwargs = { >>> 'box_noise': 0.5, >>> 'n_fp': (0, 10), >>> 'n_fn': (0, 10), >>> 'with_probs': True, >>> 'with_heatmaps': True, >>> 'verbose': 1, >>> } >>> # TODO: it would be nice to demo the soft metrics >>> # functionality by adding "salient_prob" or "class_prob" >>> # auxiliary channels to this demodata. >>> print('perterbing') >>> pred_coco = perterb_coco(true_coco, **kwargs) >>> eval_dpath = ub.Path.appdir('geowatch/tests/fusion_eval').ensuredir() >>> print('eval_dpath = {!r}'.format(eval_dpath)) >>> config = {} >>> config['score_space'] = 'image' >>> draw_curves = 'auto' >>> draw_heatmaps = 'auto' >>> #draw_heatmaps = False >>> config['workers'] = 'min(avail-2,6)' >>> #workers = 0 >>> evaluate_segmentations(true_coco, pred_coco, eval_dpath, config=config) Example: >>> # xdoctest: +REQUIRES(env:SLOW_DOCTEST) >>> from geowatch.tasks.fusion.evaluate import * # NOQA >>> from kwcoco.coco_evaluator import CocoEvaluator >>> from kwcoco.demo.perterb import perterb_coco >>> import kwcoco >>> true_coco = kwcoco.CocoDataset.demo('vidshapes2', image_size=(64, 64)) >>> kwargs = { >>> 'box_noise': 0.5, >>> 'n_fp': (0, 10), >>> 'n_fn': (0, 10), >>> 'with_probs': True, >>> 'with_heatmaps': True, >>> 'verbose': 1, >>> } >>> # TODO: it would be nice to demo the soft metrics >>> # functionality by adding "salient_prob" or "class_prob" >>> # auxiliary channels to this demodata. >>> print('perterbing') >>> pred_coco = perterb_coco(true_coco, **kwargs) >>> eval_dpath = ub.Path.appdir('geowatch/tests/fusion_eval-video').ensuredir() >>> print('eval_dpath = {!r}'.format(eval_dpath)) >>> config = {} >>> config['score_space'] = 'video' >>> config['balance_area'] = True >>> draw_curves = 'auto' >>> draw_heatmaps = 'auto' >>> #draw_heatmaps = False >>> config['workers'] = 'min(avail-2,6)' >>> #workers = 0 >>> evaluate_segmentations(true_coco, pred_coco, eval_dpath, config=config) """ import rich if config is None: config = {} draw_curves = config.get('draw_curves', 'auto') draw_heatmaps = config.get('draw_heatmaps', 'auto') score_space = config.get('score_space', 'auto') draw_workers = config.get('draw_workers', 'auto') if score_space == 'auto': if true_coco.n_videos: score_space = 'video' else: score_space = 'image' config['score_space'] = score_space # Ensure each class has colors. heuristics.ensure_heuristic_coco_colors(true_coco) true_classes = list(true_coco.object_categories()) full_classes: kwcoco.CategoryTree = true_coco.object_categories() # Sometimes supercategories dont get colors, this fixes that. heuristics.ensure_heuristic_category_tree_colors(full_classes) workers = util_parallel.coerce_num_workers(config.get('workers', 0)) if draw_workers == 'auto': draw_workers = min(2, workers) else: draw_workers = util_parallel.coerce_num_workers(draw_workers) # Extract metadata about the predictions to persist meta = {} meta['info'] = info = [] if pred_coco.fpath is not None: pred_fpath = ub.Path(pred_coco.fpath) meta['pred_name'] = '_'.join((list(pred_fpath.parts[-2:-1]) + [pred_fpath.stem])) predicted_info = pred_coco.dataset.get('info', []) for item in predicted_info: if item.get('type', None) == 'measure': info.append(item) if item.get('type', None) == 'process': proc_name = item.get('properties', {}).get('name', None) if proc_name == 'geowatch.tasks.fusion.predict': package_fpath = item['properties']['config'].get('package_fpath') if 'title' not in item: item['title'] = ub.Path(package_fpath).stem if 'package_name' not in item: item['package_name'] = ub.Path(package_fpath).stem # FIXME: title should also include pred-config info meta['title'] = item['title'] meta['package_name'] = item['package_name'] info.append(item) # Title contains the model package name if we can infer it package_name = meta.get('package_name', '') pred_name = meta.get('pred_name', '') title_parts = [p for p in [package_name, pred_name] if p] resolution = config.get('resolution', None) balance_area = config.get('balance_area', False) if resolution is not None: title_parts.append(f'space={score_space} @ {resolution}, balance_area={balance_area}') else: title_parts.append(f'space={score_space} balance_area={balance_area}') meta['title_parts'] = title_parts title = meta['title'] = ' - '.join(title_parts) required_marked = 'auto' # parametarize if required_marked == 'auto': # In "auto" mode dont require marks if all images are unmarked, # otherwise assume that we should restirct to marked images required_marked = any(pred_coco.images().lookup('has_predictions', False)) matches = kwcoco_extensions.associate_images( true_coco, pred_coco, key_fallback='id') video_matches = matches['video'] image_matches = matches['image'] n_vid_matches = len(video_matches) n_img_per_vid_matches = [len(d['match_gids1']) for d in video_matches] n_img_matches = len(image_matches['match_gids1']) print('n_img_per_vid_matches = {}'.format(ub.urepr(n_img_per_vid_matches, nl=1))) print('n_vid_matches = {}'.format(ub.urepr(n_vid_matches, nl=1))) print('n_img_matches = {!r}'.format(n_img_matches)) rich.print(f'Eval Dpath: [link={eval_dpath}]{eval_dpath}[/link]') rows = [] chunk_size = 5 # thresh_bins = 256 * 256 # thresh_bins = 64 * 64 thresh_bins = np.linspace(0, 1, 128 * 128) # this is more stable using an ndarray if draw_curves == 'auto': draw_curves = bool(eval_dpath is not None) if draw_heatmaps == 'auto': draw_heatmaps = bool(eval_dpath is not None) pcontext = process_context.ProcessContext( name='geowatch.tasks.fusion.evaluate', config=config, ) pcontext.start() if eval_dpath is None: heatmap_dpath = None else: curve_dpath = (ub.Path(eval_dpath) / 'curves').ensuredir() pcontext.write_invocation(curve_dpath / 'invocation.sh') # Objects that will aggregate confusion across multiple images salient_measure_combiner = MeasureCombiner(thresh_bins=thresh_bins) class_measure_combiner = OneVersusRestMeasureCombiner(thresh_bins=thresh_bins) # Gather the true and predicted image pairs to be scored total_images = 0 if required_marked: for video_match in video_matches: gids1 = video_match['match_gids1'] gids2 = video_match['match_gids2'] flags = pred_coco.images(gids2).lookup('has_predictions', False) video_match['match_gids1'] = list(ub.compress(gids1, flags)) video_match['match_gids2'] = list(ub.compress(gids2, flags)) total_images += len(gids1) gids1 = image_matches['match_gids1'] gids2 = image_matches['match_gids2'] flags = pred_coco.images(gids2).lookup('has_predictions', False) image_matches['match_gids1'] = list(ub.compress(gids1, flags)) image_matches['match_gids2'] = list(ub.compress(gids2, flags)) total_images += len(gids1) else: total_images = None # Prepare job pools print('workers = {!r}'.format(workers)) print('draw_workers = {!r}'.format(draw_workers)) metrics_executor = ub.Executor(mode='process', max_workers=workers) draw_executor = ub.Executor(mode='process', max_workers=draw_workers) prog = ub.ProgIter(total=total_images, desc='submit scoring jobs', adjust=False, freq=1) prog.begin() job_chunks = [] draw_jobs = [] # Submit scoring jobs over pairs of true-predicted images in videos for video_match in video_matches: prog.set_postfix_str('comparing ' + video_match['vidname']) gids1 = video_match['match_gids1'] gids2 = video_match['match_gids2'] if required_marked: flags = pred_coco.images(gids2).lookup('has_predictions', False) gids1 = list(ub.compress(gids1, flags)) gids2 = list(ub.compress(gids2, flags)) current_chunk = [] for gid1, gid2 in zip(gids1, gids2): pred_coco_img = pred_coco.coco_image(gid1).detach() true_coco_img = true_coco.coco_image(gid2).detach() true_dets = true_coco.annots(gid=gid1).detections vidid1 = true_coco.imgs[gid1]['video_id'] video1 = true_coco.index.videos[vidid1] job = metrics_executor.submit( single_image_segmentation_metrics, pred_coco_img, true_coco_img, true_classes, true_dets, video1, thresh_bins=thresh_bins, config=config) if len(current_chunk) >= chunk_size: job_chunks.append(current_chunk) current_chunk = [] current_chunk.append(job) prog.update() if len(current_chunk) > 0: job_chunks.append(current_chunk) # Submit scoring jobs over pairs of true-predicted images without videos if score_space == 'image': gids1 = image_matches['match_gids1'] gids2 = image_matches['match_gids2'] for gid1, gid2 in zip(gids1, gids2): pred_coco_img = pred_coco.coco_image(gid1).detach() true_coco_img = true_coco.coco_image(gid2).detach() true_dets = true_coco.annots(gid=gid1).detections video1 = None job = metrics_executor.submit( single_image_segmentation_metrics, pred_coco_img, true_coco_img, true_classes, true_dets, video1, thresh_bins=thresh_bins, config=config) prog.update() job_chunks.append([job]) else: if len(image_matches['match_gids1']) > 0: warnings.warn(ub.paragraph( f''' Scoring was requested in video mode, but there are {len(image_matches['match_gids1'])} true/pred image pairs that are unassociated with a video. These pairs will not be included in video space scoring. ''')) prog.end() num_jobs = sum(map(len, job_chunks)) RICH_PROG = 'auto' if RICH_PROG == 'auto': # Use rich outside of slurm RICH_PROG = not os.environ.get('SLURM_JOBID', '') pman = util_progress.ProgressManager(backend='rich' if RICH_PROG else 'progiter') DEBUG = 0 if DEBUG: orig_infos = [] with pman: score_prog = pman.progiter(desc="[cyan] Scoring...", total=num_jobs) score_prog.start() if draw_heatmaps: draw_prog = pman.progiter(desc="[green] Drawing...", total=len(job_chunks)) draw_prog.start() for job_chunk in job_chunks: chunk_info = [] for job in job_chunk: info = job.result() if DEBUG: orig_infos.append(info) score_prog.update(1) rows.append(info['row']) class_measures = info.get('class_measures', None) salient_measures = info.get('salient_measures', None) if salient_measures is not None: salient_measure_combiner.submit(salient_measures) if class_measures is not None: class_measure_combiner.submit(class_measures) if draw_heatmaps: chunk_info.append(info) # Once a job chunk is done, clear its memory job = None job_chunk.clear() # Reduce measures over the chunk if salient_measure_combiner.queue_size > chunk_size: salient_measure_combiner.combine() if class_measure_combiner.queue_size > chunk_size: class_measure_combiner.combine() if draw_heatmaps: heatmap_dpath = (ub.Path(eval_dpath) / 'heatmaps').ensuredir() # Let the draw executor release any memory it can remaining_draw_jobs = [] for draw_job in draw_jobs: if draw_job.done(): draw_job.result() draw_prog.update(1) else: remaining_draw_jobs.append(draw_job) draw_job = None draw_jobs = remaining_draw_jobs # As chunks of evaluation jobs complete, submit background jobs to # draw results to disk if requested. true_gids = [info['row']['true_gid'] for info in chunk_info] true_coco_imgs = true_coco.images(true_gids).coco_images true_coco_imgs = [g.detach() for g in true_coco_imgs] draw_job = draw_executor.submit( dump_chunked_confusion, full_classes, true_coco_imgs, chunk_info, heatmap_dpath, title=title, config=config) draw_jobs.append(draw_job) metrics_executor.shutdown() if draw_heatmaps: # Allow all drawing jobs to finalize while draw_jobs: job = draw_jobs.pop() job.result() draw_prog.update(1) draw_executor.shutdown() df = pd.DataFrame(rows) print('Per Image Pixel Measures') rich.print(df) rich.print(df.describe().T) # Finalize all of the aggregated measures print('Finalize salient measures') # Note: this will return False if there are no salient measures salient_combo_measures = salient_measure_combiner.finalize() if salient_combo_measures is False or salient_combo_measures is None: # Use nan measures from empty binary confusion vectors salient_combo_measures = BinaryConfusionVectors(None).measures() # print('salient_combo_measures = {!r}'.format(salient_combo_measures)) if DEBUG: # Redo salient combine tocombine = [] for p in tocombine: z = ub.dict_isect(p, {'fp_count', 'tp_count', 'fn_count', 'tn_count', 'thresholds', 'nsupport'}) print(ub.urepr(ub.map_vals(list, z), nl=0)) salient_measure_combiner = MeasureCombiner(thresh_bins=thresh_bins) print('salient_combo_measures.__dict__ = {!r}'.format(salient_combo_measures.__dict__)) # precision = None # growth = None from kwcoco.metrics.confusion_measures import Measures for info in orig_infos: class_measures = info.get('class_measures', None) salient_measures = info.get('salient_measures', None) if salient_measures is not None: tocombine.append(salient_measures) salient_measure_combiner.submit(salient_measures) combo = Measures.combine(tocombine, thresh_bins=thresh_bins).reconstruct() print('combo = {!r}'.format(combo)) combo = Measures.combine(tocombine, precision=2) combo.reconstruct() print('combo = {!r}'.format(combo)) combo = Measures.combine(tocombine, growth='max') combo.reconstruct() print('combo = {!r}'.format(combo)) salient_combo_measures = salient_measure_combiner.finalize() print('salient_combo_measures = {!r}'.format(salient_combo_measures)) print('Finalize class measures') class_combo_measure_dict = class_measure_combiner.finalize() ovr_combo_measures = class_combo_measure_dict['perclass'] # Combine class + salient measures using the "SingleResult" container # (TODO: better API) result = CocoSingleResult( salient_combo_measures, ovr_combo_measures, None, meta) rich.print('result = {}'.format(result)) meta['info'].append(pcontext.stop()) if salient_combo_measures is not None: if eval_dpath is not None: if isinstance(salient_combo_measures, dict): salient_combo_measures['meta'] = meta title = '\n'.join(meta.get('title_parts', [meta.get('title', '')])) if eval_fpath is None: eval_fpath = curve_dpath / 'measures2.json' print('Dump eval_fpath={}'.format(eval_fpath)) result.dump(os.fspath(eval_fpath)) if draw_curves: import kwplot # kwplot.autompl() with kwplot.BackendContext('agg'): fig = kwplot.figure(doclf=True) print('Dump salient figures') salient_combo_measures.summary_plot(fnum=1, title=title) fig = kwplot.autoplt().gcf() fig.savefig(str(curve_dpath / 'salient_summary.png')) print('Dump class figures') result.dump_figures(curve_dpath, expt_title=title) summary = {} if class_combo_measure_dict is not None: summary['class_mAP'] = class_combo_measure_dict['mAP'] summary['class_mAUC'] = class_combo_measure_dict['mAUC'] if salient_combo_measures is not None: summary['salient_ap'] = salient_combo_measures['ap'] summary['salient_auc'] = salient_combo_measures['auc'] summary['salient_max_f1'] = salient_combo_measures['max_f1'] rich.print('summary = {}'.format(ub.urepr( summary, nl=1, precision=4, align=':', sort=0))) rich.print(f'Eval Dpath: [link={eval_dpath}]{eval_dpath}[/link]') print(f'eval_fpath={eval_fpath}') return df def _redraw_measures(eval_dpath): """ hack helper for developer, not critical """ curve_dpath = ub.Path(eval_dpath) / 'curves' measures_fpath = curve_dpath / 'measures.json' with open(measures_fpath, 'r') as file: state = json.load(file) salient_combo_measures = Measures.from_json(state) meta = salient_combo_measures.get('meta', []) title = '' if meta is not None: if isinstance(meta, list): # Old for item in meta: title = item.get('title', title) else: # title = meta.get('title', title) title = '\n'.join(meta.get('title_parts', [meta.get('title', '')])) import kwplot with kwplot.BackendContext('agg'): salient_combo_measures.summary_plot(fnum=1, title=title) fig = kwplot.autoplt().gcf() fig.savefig(str(curve_dpath / 'summary_redo.png')) if __name__ == '__main__': # import xdev # xdev.make_warnings_print_tracebacks() main()