#!/usr/bin/env python3 """ KWCoco video visualization script TODO: - [ ] Option to interpret a channel as a heatmap and overlay it on top of another set of channels interpreted as a grayscale image. - [ ] Migrate to kwcoco proper - [ ] Show valid image regions CommandLine: # A demo of this script on toydata is as follows # TEMP_DPATH=$(mktemp -d) TEMP_DPATH=$HOME/.cache/kwcoco/demo/viz mkdir -p $TEMP_DPATH echo "TEMP_DPATH = $TEMP_DPATH" cd $TEMP_DPATH KWCOCO_BUNDLE_DPATH=$TEMP_DPATH/toy_bundle KWCOCO_FPATH=$KWCOCO_BUNDLE_DPATH/data.kwcoco.json VIZ_DPATH=$KWCOCO_BUNDLE_DPATH/_viz python -m kwcoco toydata --key=vidshapes3-msi-multisensor-frames7 --dst=$KWCOCO_FPATH python -m geowatch.cli.coco_visualize_videos --src=$KWCOCO_FPATH --viz_dpath=$VIZ_DPATH --animate=True --workers=0 --any3=only --max_dim=128 python -m geowatch.cli.coco_visualize_videos --src=$KWCOCO_FPATH --viz_dpath=$VIZ_DPATH --zoom_to_tracks=True --start_frame=1 --num_frames=5 --animate=True """ import scriptconfig as scfg import ubelt as ub class CocoVisualizeConfig(scfg.DataConfig): """ Visualizes annotations on kwcoco video frames on each band CommandLine: # Point to your kwcoco file DVC_DPATH=$HOME/data/dvc-repos/smart_watch_dvc COCO_FPATH=$DVC_DPATH/drop1-S2-L8-aligned/data.kwcoco.json python -m geowatch.cli.coco_visualize_videos --src $COCO_FPATH --viz_dpath ./viz_out --channels="red|green|blue" --space="video" COCO_FPATH=/home/joncrall/data/dvc-repos/smart_watch_dvc/drop1-S2-L8-WV-aligned/KR_R001/subdata.kwcoco.json COCO_FPATH=/home/joncrall/data/dvc-repos/smart_watch_dvc/drop1-S2-L8-WV-aligned/data.kwcoco.json python -m geowatch.cli.coco_visualize_videos --src $COCO_FPATH --space="image" # Also note you can make an animated gif python -m kwplot.cli.gifify -i "./viz_out/US_Jacksonville_R01/_anns/red|green|blue/" -o US_Jacksonville_R01_anns.gif # NEW: as of 2021-11-04 : helper animation script python -m geowatch.cli.animate_visualizations --viz_dpath ./viz_out """ epilog = ''' Examples: # Draw some channels of interest quickly and then animate them, and # dump them into an autogenerated directory in the source kwcoco bundle KWCOCO_FPATH= geowatch visualize "$KWCOCO_FPATH" --workers=avail --animate=True --channels="salient,red|green|blue" ''' __default__ = { 'src': scfg.Value('data.kwcoco.json', help='input dataset', position=1), 'viz_dpath': scfg.Value(None, help=ub.paragraph( ''' Where to save the visualizations. If unspecified, writes them adjacent to the input kwcoco file ''')), 'workers': scfg.Value('auto', help='number of parallel procs'), 'max_workers': scfg.Value(None, help='DEPRECATED USE workers'), 'space': scfg.Value('video', help='can be image or video space'), 'max_dim': scfg.Value(None, help='if specified, the visualization will resize if it has a dimension larger than this'), 'min_dim': scfg.Value(384, help='if specified, the visualization will resize if it has a dimension smaller than this'), 'resolution': scfg.Value(None, help='the resolution to make the output at. If unspecified use the dataset default'), 'channels': scfg.Value(None, type=str, help='only viz these channels'), 'any3': scfg.Value(False, help=ub.paragraph( ''' if True, ensure the "any3" channels are drawn. If set to "only", then other per-channel visualizations are supressed. TODO: better name? TODO: deprecate? ''')), 'draw_imgs': scfg.Value(True, isflag=True), 'draw_anns': scfg.Value('auto', isflag=True, help='auto means only draw anns if they exist'), 'draw_valid_region': scfg.Value(False, help='if True, draw the valid region if it exists'), 'cmap': scfg.Value('viridis', help='colormap for single channel data'), 'animate': scfg.Value('auto', isflag=True, help='if True, make an animated gif from the output. Defaults to False.'), 'num_frames': scfg.Value(None, type=str, help='show the first N frames from each video, if None, all are shown'), 'start_frame': scfg.Value(0, type=str, help='If specified each video will start on this frame'), 'ann_score_thresh': scfg.Value(0, help='If annotations have a score, remove any under this threshold'), 'skip_missing': scfg.Value(True, isflag=True, help=ub.paragraph( ''' If true, skip any image that does not have the requested channels. Otherwise a nan image will be shown ''')), 'skip_aggressive': scfg.Value(False, isflag=True, help=ub.paragraph( ''' Aggresively skip frames based on heuristics of badness. ''')), 'only_boxes': scfg.Value(False, isflag=True, help=ub.paragraph( ''' If false, draws full annotation - which can be time consuming if there are a lot. DEPRECATED. Set draw_labels=0 and draw_segmentations=0 ''')), 'draw_segmentations': scfg.Value(True, help='if True draw annotation segmentation polygons'), 'draw_labels': scfg.Value(True, help='if True draw text labels on annotations'), 'draw_boxes': scfg.Value(True, help='if True draw bounding boxes around annotations'), 'alpha': scfg.Value(None, help='transparency / opacity of annotations'), # TODO: better support for this # TODO: use the kwcoco_video_data, has good logic for this 'zoom_to_tracks': scfg.Value(False, isflag=True, type=str, help='if True, zoom to tracked annotations. Experimental, might not work perfectly yet.'), 'norm_over_time': scfg.Value(False, isflag=True, help='if True, normalize data over time'), 'fixed_normalization_scheme': scfg.Value( None, type=str, help='Use a fixed normalization scheme for visualization; e.g. "scaled_25percentile"'), 'extra_header': scfg.Value(None, help='extra text to include in the header'), 'draw_header': scfg.Value(True, help='If false disables drawing the header'), 'draw_chancode': scfg.Value(True, help='If false disables drawing the channel code'), 'include_sensors': scfg.Value(None, help='if specified can be comma separated valid sensors'), 'exclude_sensors': scfg.Value(None, help='if specified can be comma separated invalid sensors'), 'select_images': scfg.Value( None, type=str, help=ub.paragraph( ''' A json query (via the jq spec) that specifies which images belong in the subset. Note, this is a passed as the body of the following jq query format string to filter valid ids '.images[] | select({select_images}) | .id'. Examples for this argument are as follows: '.id < 3' will select all image ids less than 3. '.file_name | test(".*png")' will select only images with file names that end with png. '.file_name | test(".*png") | not' will select only images with file names that do not end with png. '.myattr == "foo"' will select only image dictionaries where the value of myattr is "foo". '.id < 3 and (.file_name | test(".*png"))' will select only images with id less than 3 that are also pngs. .myattr | in({"val1": 1, "val4": 1}) will take images where myattr is either val1 or val4. Requries the "jq" python library is installed. ''')), 'select_videos': scfg.Value( None, help=ub.paragraph( ''' A json query (via the jq spec) that specifies which videos belong in the subset. Note, this is a passed as the body of the following jq query format string to filter valid ids '.videos[] | select({select_images}) | .id'. Examples for this argument are as follows: '.name | startswith("foo")' will select only videos where the name starts with foo. Only applicable for dataset that contain videos. Requries the "jq" python library is installed. ''')), 'verbose': scfg.Value(0, isflag=True, help='verbosity level'), 'stack': scfg.Value('auto', isflag=True, help='if True stack late fused channels in the same image'), 'role_order': scfg.Value(None, help=ub.paragraph( ''' if specified, annotations are grouped by roles and drawn on different items in a channels stack in the given order ''')), 'smart': scfg.Value(False, isflag=True, help=ub.paragraph( ''' if True, override params based on "smart" settings. This defaults to going fast and using more resources, and stacked data. '''), alias=['fast']), } def main(cmdline=True, **kwargs): """ Example: >>> import kwcoco >>> from geowatch.utils import kwcoco_extensions >>> from geowatch.cli.coco_visualize_videos import * # NOQA >>> import ubelt as ub >>> dpath = ub.Path.appdir('geowatch/tests/viz_video1').delete().ensuredir() >>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral', num_frames=2, image_size=(64, 64), num_videos=2) >>> img = dset.dataset['images'][0] >>> coco_img = dset.coco_image(img['id']) >>> kwargs = { >>> 'src': dset.fpath, >>> 'viz_dpath': dpath, >>> 'space': 'video', >>> 'channels': None, >>> 'zoom_to_tracks': True, >>> } >>> cmdline = False >>> main(cmdline=cmdline, **kwargs) Example: >>> import kwcoco >>> from geowatch.utils import kwcoco_extensions >>> from geowatch.cli.coco_visualize_videos import * # NOQA >>> import geowatch >>> import ubelt as ub >>> dpath = ub.Path.appdir('geowatch/tests/viz_video2').delete().ensuredir() >>> dset = geowatch.coerce_kwcoco('geowatch-msi', num_frames=5, image_size=(64, 64), num_videos=1) >>> img = dset.dataset['images'][0] >>> coco_img = dset.coco_image(img['id']) >>> kwargs = { >>> 'src': dset.fpath, >>> 'viz_dpath': dpath, >>> 'space': 'video', >>> 'channels': None, >>> 'zoom_to_tracks': False, >>> 'stack': 'only', >>> } >>> cmdline = False >>> main(cmdline=cmdline, **kwargs) Ignore: src = ub.expandpath('$HOME/data/dvc-repos/smart_watch_dvc/drop1-S2-L8-aligned/data.kwcoco.json') cmdline = False kwargs = { 'src': src, } """ config = CocoVisualizeConfig.cli(data=kwargs, cmdline=cmdline and {'strict': True}, strict=True) from kwutil import util_parallel from kwutil import util_resources from geowatch.utils import kwcoco_extensions import kwcoco import kwarray import rich import numpy as np rich.print('config = {}'.format(ub.urepr(dict(config), nl=2))) space = config['space'] channels = config['channels'] if config['smart']: if config['workers'] == 'auto': config['workers'] = 'avail' if config['animate'] == 'auto': config['animate'] = True if config['stack'] == 'auto': config['stack'] = 'only' if config['channels'] is None: channels = config['channels'] = 'auto' # if config['draw_valid_region'] is None: # config['draw_valid_region'] = False if config['animate'] == 'auto': config['animate'] = False if config['stack'] == 'auto': config['stack'] = False if config['max_workers'] is not None: ub.schedule_deprecation( 'geowatch', 'max_workers', 'argument to coco_visualize_videos', deprecate='now', error='later', remove='later') max_workers = util_parallel.coerce_num_workers(config['max_workers']) else: max_workers = util_parallel.coerce_num_workers(config['workers']) rich.print('max_workers = {!r}'.format(max_workers)) coco_dset = kwcoco.CocoDataset.coerce(config['src']) rich.print('coco_dset.fpath = {!r}'.format(coco_dset.fpath)) rich.print('coco_dset = {!r}'.format(coco_dset)) from geowatch import heuristics heuristics.ensure_heuristic_coco_colors(coco_dset) if channels == 'auto': from delayed_image import FusedChannelSpec auto_channels = [ FusedChannelSpec.coerce('red|green|blue'), FusedChannelSpec.coerce('No Activity|Site Preparation|Active Construction|Post Construction'), FusedChannelSpec.coerce('salient'), FusedChannelSpec.coerce('ac_salient'), FusedChannelSpec.coerce('pan'), ] from collections import defaultdict, Counter channel_stats = kwcoco_extensions.coco_channel_stats(coco_dset) all_sensorchan = channel_stats['all_sensorchan'] sensor_to_single_channels = defaultdict(Counter) for spec in all_sensorchan.streams(): sensor_to_single_channels[spec.sensor.spec].update(spec.chans.as_list()) chosen = [] for sensor, chanhist in sensor_to_single_channels.items(): has_chans = set(chanhist.keys()) for ac in auto_channels: if ac.to_set().issubset(has_chans): chosen.append(ac.spec) chosen = ub.oset(sorted(set(chosen))) if 'red|green|blue' in chosen: # force RGB first chosen = ub.oset(['red|green|blue']) | (chosen - {'red|green|blue'}) channels = ','.join(chosen) rich.print(f'AUTO channels={channels}') elif channels is None: channel_stats = kwcoco_extensions.coco_channel_stats(coco_dset) all_sensorchan = channel_stats['all_sensorchan'] channels = all_sensorchan # Expand certain channels requested_sensorchan = kwcoco.SensorChanSpec.coerce(channels) requested_sensorchan.streams() expanded_streams = [] chan_alias = { 'rgb': 'red|green|blue', 'sc': 'No Activity|Site Preparation|Active Construction|Post Construction', 'bas': 'salient', } for fused_sensorchan in requested_sensorchan.streams(): chan = fused_sensorchan.chans.spec chan = chan_alias.get(chan, chan) # TODO: handle the sensor part # expanded_streams.append(fused_sensorchan.sensor.spec + ':' + chan) expanded_streams.append(chan) channels = (','.join(expanded_streams)) print(f'channels = {ub.urepr(channels, nl=1)}') if config['draw_anns'] == 'auto': config['draw_anns'] = coco_dset.n_annots > 0 bundle_dpath = ub.Path(coco_dset.bundle_dpath) dset_idstr = coco_dset._dataset_id() if config['viz_dpath'] is not None: viz_dpath = ub.Path(config['viz_dpath']) else: viz_dpath = bundle_dpath / '_viz_{}'.format(dset_idstr) rich.print('viz_dpath = {!r}'.format(viz_dpath)) from kwutil import util_progress pman = util_progress.ProgressManager() pman.__enter__() # prog = ub.ProgIter( # coco_dset.index.videos.items(), total=len(coco_dset.index.videos), # desc='viz videos', verbose=3) import itertools as it # Add a fake video for loose images video_items = it.chain(coco_dset.index.videos.items(), [(None, None)]) prog = pman.progiter( video_items, total=len(coco_dset.index.videos) + 1, desc='viz videos', verbose=3) util_resources.request_nofile_limits() pool = ub.JobPool(mode='thread', max_workers=max_workers) from scriptconfig.smartcast import smartcast num_frames = smartcast(config['num_frames']) start_frame = smartcast(config['start_frame']) end_frame = None if num_frames is None else start_frame + num_frames selected_gids = None selected_gids = kwcoco_extensions.filter_image_ids( coco_dset, gids=selected_gids, include_sensors=config['include_sensors'], exclude_sensors=config['exclude_sensors'], select_images=config['select_images'], select_videos=config['select_videos'], ) if config['skip_missing'] and channels is not None: requested_channels = kwcoco.ChannelSpec.coerce(channels).fuse().as_set() print(f'requested_channels={requested_channels}') coco_images = coco_dset.images(selected_gids).coco_images keep = [] for coco_img in coco_images: img_channels = coco_img.channels if img_channels is None: if not config['skip_aggressive']: keep.append(coco_img.img['id']) else: code = img_channels.fuse().as_set() if config['skip_aggressive']: if len(requested_channels & code) == len(requested_channels): keep.append(coco_img.img['id']) else: if requested_channels & code: keep.append(coco_img.img['id']) rich.print(f'Filtered {len(coco_images) - len(keep)} images without requested channels. Keeping {len(keep)}') selected_gids = keep viz_dpath_abs = viz_dpath.absolute() rich.print(f'Will write to: [link={viz_dpath_abs}]{viz_dpath_abs}[/link]') video_names = [] for vidid, video in prog: if video is None: video = { 'name': 'loose-images', } sub_dpath = viz_dpath / video['name'] if vidid is None: loose_gids = [ gid for gid, v in coco_dset.images().lookup('video_id', None, keepid=1).items() if v is None ] gids = loose_gids else: gids = coco_dset.index.vidid_to_gids[vidid] if selected_gids is not None: gids = list(ub.oset(gids) & set(selected_gids)) if len(gids) == 0: rich.print(f'Skip {video["name"]=!r} with no selected images') continue sub_dpath.ensuredir() video_names.append(video['name']) if config['animate'] == 'oops': rich.print('Got animate=oops. ' 'Assuming images already exists and you forgot to animate' 'Skipping video draw') continue norm_over_time = config['norm_over_time'] if not norm_over_time: chan_to_normalizer = None else: coco_images = [coco_dset.coco_image(gid) for gid in gids] # quick and dirty: # Find the first image for each visualization channel # to use as the normalizer. # Probably better to use multiple images from the sequence # to do normalization if channels is not None: requested_channels = kwcoco.ChannelSpec.coerce(channels).fuse().as_set() else: requested_channels = set() for coco_img in coco_images: code = coco_img.channels.fuse().as_set() requested_channels.update(code) chan_to_ref_imgs = {} for code in requested_channels: chan_to_ref_imgs[code] = [] _remain = requested_channels.copy() for coco_img in coco_images: imghas = coco_img.channels.fuse().as_set() common = imghas & _remain for c in common: chan_to_ref_imgs[c].append(coco_img) chan_to_normalizer = {} for chan, coco_imgs in chan_to_ref_imgs.items(): s = max(1, len(coco_imgs) // 10) obs = [] for coco_img in coco_imgs[::s]: rawdata = coco_img.imdelay(channels=chan).prepare().optimize().finalize() mask = rawdata != 0 obs.append(rawdata[mask].ravel()) allobs = np.hstack(obs) normalizer = kwarray.find_robust_normalizers(allobs, params={ 'high': 0.90, 'mid': 0.5, 'low': 0.01, 'mode': 'linear', # 'mode': 'sigmoid', }) chan_to_normalizer[chan] = normalizer rich.print('chan_to_normalizer = {}'.format(ub.urepr(chan_to_normalizer, nl=1))) if config['draw_valid_region']: valid_vidspace_region = video.get('valid_region', None) else: valid_vidspace_region = None common_kw = ub.udict(config) & { 'resolution', 'draw_header', 'draw_chancode', 'skip_aggressive', 'stack', 'min_dim', 'min_dim', 'verbose', 'only_boxes', 'draw_boxes', 'draw_labels', 'fixed_normalization_scheme', 'any3', 'cmap', 'role_order', 'smart', 'ann_score_thresh', 'alpha', } if config['zoom_to_tracks']: assert space == 'video' tid_to_info = video_track_info(coco_dset, vidid) for tid, track_info in tid_to_info.items(): track_dpath = sub_dpath / '_tracks' / 'tid_{}'.format(tid) track_dpath.ensuredir() vid_crop_box = track_info['full_vid_box'] # Add context (todo: parameterize how much) vid_crop_box = vid_crop_box.scale(1.5, about='center') vid_crop_box = vid_crop_box.clip( 0, 0, video['width'] - 2, video['height'] - 2) vid_crop_box = vid_crop_box.to_xywh() vid_crop_box = vid_crop_box.quantize() gid_subset = gids[start_frame:end_frame] local_max_frame = len(gid_subset) for local_frame_index, gid in enumerate(gid_subset): img = coco_dset.index.imgs[gid] anns = coco_dset.annots(gid=gid).objs if config['extra_header']: _header_extra = f'tid={tid}' + config['extra_header'] else: _header_extra = f'tid={tid}' pool.submit(_write_ann_visualizations2, coco_dset, img, anns, track_dpath, space=space, channels=channels, vid_crop_box=vid_crop_box, _header_extra=_header_extra, chan_to_normalizer=chan_to_normalizer, local_frame_index=local_frame_index, local_max_frame=local_max_frame, dset_idstr=dset_idstr, **common_kw ) else: gid_subset = gids[start_frame:end_frame] local_max_frame = len(gid_subset) for local_frame_index, gid in enumerate(gid_subset): img = coco_dset.index.imgs[gid] anns = coco_dset.annots(gid=gid).objs if config['extra_header']: _header_extra = config['extra_header'] else: _header_extra = '' pool.submit(_write_ann_visualizations2, coco_dset, img, anns, sub_dpath, space=space, channels=channels, draw_imgs=config['draw_imgs'], draw_anns=config['draw_anns'], _header_extra=_header_extra, chan_to_normalizer=chan_to_normalizer, dset_idstr=dset_idstr, local_frame_index=local_frame_index, local_max_frame=local_max_frame, valid_vidspace_region=valid_vidspace_region, skip_missing=config['skip_missing'], **common_kw ) # for job in ub.ProgIter(pool.as_completed(), total=len(pool), desc='write imgs'): for job in pman.progiter(pool.as_completed(), total=len(pool), desc='write imgs'): try: job.result() except SkipFrame: ... pool.jobs.clear() pman.__exit__(None, None, None) rich.print(f'Wrote images to: [link={viz_dpath_abs}]{viz_dpath_abs}[/link]') if config['animate']: # TODO: develop this idea more # Try to parse out an animation config import scriptconfig as scfg from kwutil import util_yaml class AnimateConfig(scfg.DataConfig): # TODO: should be able to load from an alias frames_per_second = scfg.Value(0.7, alias=['fps']) animate_config = dict(AnimateConfig()) if isinstance(config['animate'], str) and config['animate'] not in {'oops'}: try: user_config = util_yaml.Yaml.loads(config['animate']) assert isinstance(user_config, dict), 'animate subconfig should be coercable into a dict' # hack if 'fps' in user_config: user_config['frames_per_second'] = user_config.pop('fps') animate_config = AnimateConfig(**user_config) except Exception: print('Tried to pass animate as a yaml config but loading failed') raise rich.print('animate_config = {}'.format(ub.urepr(animate_config, nl=1))) from geowatch.cli import animate_visualizations # Hack: pretend that stack is a channel even though it is not. if config['stack']: if not channels: channels = 'stack' else: channels = channels + ',stack' outputs = animate_visualizations.animate_visualizations( viz_dpath=viz_dpath, channels=channels, video_names=video_names, draw_imgs=config['draw_imgs'], draw_anns=config['draw_anns'], workers=max_workers, zoom_to_tracks=config['zoom_to_tracks'], **animate_config, ) # Links for summaries type_to_anis = ub.group_items(outputs, lambda x: x['type']) for ani_type, items in type_to_anis.items(): summary_fpath = (viz_dpath / ('_' + ani_type)).ensuredir() for item in items: fpath = ub.Path(item['fpath']) dst = summary_fpath / fpath.name ub.symlink(fpath, dst) # Terminal fixup import sys if sys.stdout.isatty(): # FFmpeg seems to mess up terminal output. I'm not sure why. # Also running this "fixup" seems to break things when people run # multiple commands in a copy-paste fashion, so we should remove # this. ub.cmd('stty sane', verbose=3) class SkipFrame(Exception): pass class SkipChanGroup(Exception): pass def video_track_info(coco_dset, vidid): import kwimage vid_annots = coco_dset.images(video_id=vidid).annots track_ids = set(ub.flatten(vid_annots.lookup('track_id'))) tid_to_info = {} for tid in track_ids: track_aids = coco_dset.index.trackid_to_aids[tid] vidspace_boxes = [] track_gids = [] for aid in track_aids: ann = coco_dset.index.anns[aid] gid = ann['image_id'] img = coco_dset.index.imgs[gid] bbox = ann['bbox'] vid_from_img = kwimage.Affine.coerce(img.get('warp_img_to_vid', None)) imgspace_box = kwimage.Boxes([bbox], 'xywh') vidspace_box = imgspace_box.warp(vid_from_img) vidspace_boxes.append(vidspace_box) track_gids.append(gid) all_vidspace_boxes = kwimage.Boxes.concatenate(vidspace_boxes) full_vid_box = all_vidspace_boxes.bounding_box().to_xywh() tid_to_info[tid] = { 'tid': tid, 'full_vid_box': full_vid_box, 'track_gids': track_gids, 'track_aids': track_aids, } return tid_to_info __config__ = CocoVisualizeConfig def select_fixed_normalization(fixed_normalization_scheme, sensor_coarse): chan_to_normalizer = {} if fixed_normalization_scheme == 'scaled': if sensor_coarse in {'L8', 'S2'}: for c in ['blue', 'green', 'red', 'nir', 'swir16', 'swir22']: chan_to_normalizer[c] = {'type': 'normalize', 'mode': 'linear', 'min_val': 0, 'max_val': 10_000} elif fixed_normalization_scheme == 'scaled_50percentile': if sensor_coarse in {'L8', 'S2'}: for c in ['blue', 'green', 'red', 'nir', 'swir16', 'swir22']: chan_to_normalizer[c] = {'type': 'normalize', 'mode': 'linear', 'min_val': 0, 'max_val': 5_000} elif fixed_normalization_scheme == 'scaled_25percentile': if sensor_coarse in {'L8', 'S2'}: for c in ['blue', 'green', 'red', 'nir', 'swir16', 'swir22']: chan_to_normalizer[c] = {'type': 'normalize', 'mode': 'linear', 'min_val': 0, 'max_val': 2_500} elif fixed_normalization_scheme == 'scaled_raw': if sensor_coarse == 'L8': for c in ['blue', 'green', 'red', 'nir', 'swir16', 'swir22']: chan_to_normalizer[c] = {'type': 'normalize', 'mode': 'linear', 'min_val': 7_272, 'max_val': 36_363} if sensor_coarse == 'S2': for c in ['blue', 'green', 'red', 'nir', 'swir16', 'swir22']: chan_to_normalizer[c] = {'type': 'normalize', 'mode': 'linear', 'min_val': 1, 'max_val': 10_000} elif fixed_normalization_scheme == 'scaled_raw_50percentile': if sensor_coarse == 'L8': for c in ['blue', 'green', 'red', 'nir', 'swir16', 'swir22']: chan_to_normalizer[c] = {'type': 'normalize', 'mode': 'linear', 'min_val': 7_272, 'max_val': 21_818} if sensor_coarse == 'S2': for c in ['blue', 'green', 'red', 'nir', 'swir16', 'swir22']: chan_to_normalizer[c] = {'type': 'normalize', 'mode': 'linear', 'min_val': 1, 'max_val': 5_000} elif fixed_normalization_scheme == 'scaled_raw_25percentile': if sensor_coarse == 'L8': for c in ['blue', 'green', 'red', 'nir', 'swir16', 'swir22']: chan_to_normalizer[c] = {'type': 'normalize', 'mode': 'linear', 'min_val': 7_272, 'max_val': 14_544} if sensor_coarse == 'S2': for c in ['blue', 'green', 'red', 'nir', 'swir16', 'swir22']: chan_to_normalizer[c] = {'type': 'normalize', 'mode': 'linear', 'min_val': 1, 'max_val': 2_500} else: raise NotImplementedError('Unsupported fixed normalization scheme') return chan_to_normalizer def _resolve_channel_groups(coco_img, channels, verbose, request_grouped_bands, any3, smart): """ Resolve which channel groups should be requested. """ from kwcoco import channel_spec import kwcoco import rich if channels is not None: if isinstance(channels, list): channels = ','.join(channels) # hack channels = channel_spec.ChannelSpec.coerce(channels) chan_groups = [ {'chan': chan_obj} for chan_obj in channels.streams() ] else: if verbose > 0: rich.print('Choosing channels') rich.print(f'request_grouped_bands={request_grouped_bands}') channels = coco_img.channels if channels is None: # Image does not have channel metadata, the best we can do is # assume RGB chan_groups = [{ 'pname': 'null', 'chan': None, }] return chan_groups if request_grouped_bands == 'default': # Use false color for special groups request_grouped_bands = ['red|green|blue', 'r|g|b'] for cand in request_grouped_bands: cand = kwcoco.FusedChannelSpec.coerce(cand) has_cand = (channels & cand).numel() == cand.numel() if has_cand: channels = channels - cand # todo: nicer way to join streams # channels = kwcoco.ChannelSpec.coerce(channels.spec + ',' + cand.spec) channels = channels + cand # kwcoco.ChannelSpec.coerce(channels.spec + ',' + cand.spec) initial_groups = channels.streams() chan_groups = [] group : kwcoco.FusedChannelSpec for group in initial_groups: if group.numel() > 3: # For large group, just take the first 3 channels if group.numel() > 8: group = group.normalize()[0:3] chan_groups.append({ 'chan': group, }) else: # For smaller groups split them into singles for part in group: chan_groups.append({ 'chan': kwcoco.FusedChannelSpec.coerce(part) }) else: chan_groups.append({ 'chan': group, }) for row in chan_groups: row['pname'] = row['chan'].path_sanitize() if any3: if any3 == 'only': # Kick everything else out chan_groups = [] # Try to visualize any3 channels to get a nice viewable sequence avail_channels = channels.fuse() common_visualizers = list(map(kwcoco.FusedChannelSpec.coerce, [ 'red|green|blue', 'r|g|b', 'pan', 'panchromatic'])) found = None for cand in common_visualizers: flag = (cand & avail_channels).spec == cand.spec if flag: found = cand break # Just show false color from the first few channels if found is None: first3 = avail_channels.as_list()[0:3] found = kwcoco.FusedChannelSpec.coerce('|'.join(first3)) chan_groups.append({ 'pname': 'any3', 'chan': found, }) return chan_groups def __default_kwcoco_build_image_header_text(**kwargs): """ TODO: non geowatch dependant version A heuristic for what sort of info is useful to plot on the header of an image. Kwargs: img coco_dset vidname, _header_extra gid, frame_index, dset_idstr, name, sensor_coarse, date_captured Example: >>> from geowatch.heuristics import * # NOQA >>> img = { >>> 'id': 1, >>> 'frame_index': 0, >>> 'date_captured': '2020-01-01', >>> 'name': 'BLARG', >>> 'sensor_coarse': 'Sensor1', >>> } >>> kwargs = { >>> 'img': img, >>> 'dset_idstr': '', >>> 'name': '', >>> '_header_extra': None, >>> } >>> header_lines = build_image_header_text(**kwargs) >>> print('header_lines = {}'.format(ub.urepr(header_lines, nl=1))) """ img = kwargs.get('img', {}) _header_extra = kwargs.get('_header_extra', None) dset_idstr = kwargs.get('dset_idstr', '') def _multi_get(key, default=ub.NoParam, *dicts): # try to lookup from multiple dictionaries found = default for d in dicts: if key in d: found = d[key] break if found is ub.NoParam: raise Exception return found sensor_coarse = _multi_get('sensor_coarse', 'unknown', kwargs, img) # name = _multi_get('name', 'unknown', kwargs, img) date_captured = _multi_get('date_captured', '', kwargs, img) frame_index = _multi_get('frame_index', None, kwargs, img) gid = _multi_get('id', None, kwargs, img) image_name = _multi_get('name', '', kwargs, img) vidname = None if 'vidname' in kwargs: vidname = kwargs['vidname'] else: coco_dset = kwargs.get('coco_dset', None) if coco_dset is not None: vidname = coco_dset.index.videos[img['video_id']]['name'] image_id_parts = [] image_id_parts.append(f'gid={gid}') image_id_parts.append(f'frame_index={frame_index}') image_id_part = ', '.join(image_id_parts) header_line_infos = [] header_line_infos.append([vidname, image_id_part, _header_extra]) header_line_infos.append([dset_idstr]) header_line_infos.append([image_name]) header_line_infos.append([sensor_coarse, date_captured]) header_lines = [] for line_info in header_line_infos: header_line = ' '.join([p for p in line_info if p]) header_line = header_line.replace('\\n', '\n') # hack if header_line: header_lines.append(header_line) return header_lines def _write_ann_visualizations2(coco_dset, img : dict, anns : list, sub_dpath : str, space : str, channels=None, vid_crop_box=None, request_grouped_bands='default', draw_imgs=True, draw_anns=True, _header_extra=None, chan_to_normalizer=None, fixed_normalization_scheme=None, any3=True, dset_idstr='', skip_missing=False, only_boxes=1, draw_boxes=True, draw_labels=True, draw_segmentations=True, cmap='viridis', max_dim=None, min_dim=None, local_frame_index=None, local_max_frame=None, valid_vidspace_region=None, stack=False, draw_valid_region=True, verbose=0, skip_aggressive=False, draw_header=True, draw_chancode=True, resolution=None, role_order=None, smart=None, ann_score_thresh=0, alpha=None, ): """ Dumps an intensity normalized "space-aligned" kwcoco image visualization (with or without annotation overlays) for specific bands to disk. """ # See if we can look at what we made sensor_coarse = img.get('sensor_coarse', 'unknown') # align_method = img.get('align_method', 'unknown') name = img.get('name', 'unnamed') name = name.replace('/', '_') # Ensure names are differentiated between frames. import math import rich import kwimage import numpy as np if local_max_frame is None: num_digits = 8 else: num_digits = int(math.log10(max(local_max_frame, 1))) + 1 if local_frame_index is None: local_frame_index = -1 frame_id = f'{local_frame_index:0{num_digits}d}' if verbose > 2: _body = f'--- Render frame {frame_id} ---' rich.print('=' * len(_body)) rich.print(_body) rich.print('=' * len(_body)) from geowatch import heuristics header_lines = heuristics.build_image_header_text( img=img, name=None, _header_extra=_header_extra, coco_dset=coco_dset, ) if verbose > 2: rich.print('header_lines = {}'.format(ub.urepr(header_lines, nl=1))) coco_img = coco_dset.coco_image(img['id']) finalize_opts = { 'interpolation': 'linear', 'nodata_method': 'float', } if 1: if resolution is None: factor = 1 else: factor = coco_img._scalefactor_for_resolution( space=space, resolution=resolution) warp_viz_from_space = kwimage.Affine.scale(factor) delayed = coco_img.imdelay(space=space, resolution=resolution, **finalize_opts) warp_vid_from_img = coco_img.warp_vid_from_img if space == 'video': warp_viz_from_img = warp_viz_from_space @ warp_vid_from_img else: warp_viz_from_img = warp_viz_from_space if fixed_normalization_scheme is not None: chan_to_normalizer = select_fixed_normalization( fixed_normalization_scheme, sensor_coarse) # Hacks for common "heatmap" channels chan_to_normalizer['depth'] = {'type': 'normalize', 'mode': 'linear', 'min_val': 0, 'max_val': 255} if verbose > 0: rich.print(f'fixed_normalization_scheme={fixed_normalization_scheme}') rich.print(f'chan_to_normalizer={chan_to_normalizer}') rich.print(f'channels={channels}') chan_groups = _resolve_channel_groups(coco_img, channels, verbose, request_grouped_bands, any3, smart) img_view_dpath = sub_dpath / '_imgs' ann_view_dpath = sub_dpath / '_anns' anns_ = [ub.dict_diff(ann, ['keypoints']) for ann in anns] # Ignore keypoints role_to_anns = ub.group_items(anns_, lambda ann: ann.get('role', None)) role_to_anns = {'none' if k is None else k.lower(): v for k, v in role_to_anns.items()} role_to_dets = ub.udict() for role, role_anns in role_to_anns.items(): colors = [] # Determine the color for each annotation for ann in role_anns: color = 'kitware_red' cid = ann['category_id'] if cid is not None: cat = coco_dset.cats[cid] color = cat.get('color', color) if 0: # todo better role support # temporary hack if 'misc_info' in ann: misc_info = ann['misc_info'] if isinstance(misc_info, dict): color = misc_info.get('confusion_color', color) if color is None: # color = 'kitware_red' # color = 'kitware_lightgray' color = 'white' color = kwimage.Color.coerce(color).as01() colors.append(color) role_dets = kwimage.Detections.from_coco_annots(role_anns, dset=coco_dset) role_dets.data['colors'] = np.array(colors) if ann_score_thresh: flags = [float(ann.get('score', 1)) > ann_score_thresh for ann in role_anns] role_dets = role_dets.compress(flags) role_dets = role_dets.warp(warp_viz_from_img) role_to_dets[role] = role_dets # TODO: asset space if vid_crop_box is not None: # Ensure the crop box is in the proper space if space == 'image': warp_viz_from_vid = warp_viz_from_space @ warp_vid_from_img.inv() elif space == 'video': warp_viz_from_vid = warp_viz_from_space crop_box = vid_crop_box else: raise KeyError(space) crop_box = vid_crop_box.warp(warp_viz_from_vid).quantize() ann_shift = (-crop_box.tl_x.ravel()[0], -crop_box.tl_y.ravel()[0]) for role_dets in role_to_dets.values(): role_dets.translate(ann_shift, inplace=True) delayed = delayed.crop(crop_box.to_slices()[0]) valid_image_poly = None valid_video_poly = None if draw_valid_region: valid_region = img.get('valid_region', None) else: valid_region = None if valid_region: valid_image_poly: kwimage.MultiPolygon = kwimage.MultiPolygon.coerce(valid_region) valid_image_poly = valid_image_poly.warp(warp_viz_from_img) if valid_vidspace_region is not None: valid_vidspace_region = kwimage.MultiPolygon.coerce(valid_vidspace_region) if space == 'image': warp_img_from_vid = warp_vid_from_img.inv() warp_viz_from_vid = warp_viz_from_space @ warp_img_from_vid valid_video_poly = valid_vidspace_region.warp(warp_viz_from_vid) else: valid_video_poly = valid_vidspace_region.warp(warp_viz_from_space) # Determine if we need to scale the image for visualization viz_scale_factor = 1.0 if min_dim is not None: chan_min_dim = min(delayed.dsize) * viz_scale_factor if chan_min_dim < min_dim: viz_scale_factor *= min_dim / chan_min_dim if max_dim is not None: chan_max_dim = max(delayed.dsize) * viz_scale_factor if chan_max_dim > max_dim: viz_scale_factor *= max_dim / chan_max_dim if viz_scale_factor != 1: viz_warp = kwimage.Affine.scale(viz_scale_factor) delayed = delayed.warp(viz_warp) for role_dets in role_to_dets.values(): role_dets.warp(viz_warp, inplace=True) if valid_image_poly is not None: valid_image_poly = valid_image_poly.warp(viz_warp) if valid_video_poly is not None: valid_video_poly = valid_video_poly.warp(viz_warp) if stack: ann_stack = [] img_stack = [] # Add in custom crop / zoom? # h, w = delayed.shape[0:2] # delayed = delayed[0:h // 2, w // 4: w - w // 4] # TODO: user should need to specify this. max_stacks = len(chan_groups) if role_order is not None: # role_order = ['truth', 'none'] requested_slots = {} for role in role_to_anns.keys(): try: idx = role_order.index(role) except ValueError: idx = 0 requested_slots[role] = min(idx, max_stacks) stack_idx_to_roles = ub.invert_dict(requested_slots, unique_vals=False) else: # If unspecified draw all roles on the first part stack_idx_to_roles = {0: list(role_to_anns.keys())} if 1 and verbose > 100: rich.print(f'role_to_num_anns={ub.udict(role_to_anns).map_values(len)}') rich.print(f'role_order={role_order}') rich.print(f'stack_idx_to_roles={stack_idx_to_roles}') stack_idx = 0 handled_specs = set() for chan_row in chan_groups: request_roles = stack_idx_to_roles.get(stack_idx, []) if smart: # not sure how to encode this with sensible cli args. could use help # with this API. The idea is we only need to visualize one set of # visualizable channels, we only need rgb or pan, not both if 'red|green|blue' in handled_specs and chan_row['chan'].spec == 'pan': # already visualized rgb, dont need pan if verbose > 2: rich.print(f'... smart skip {chan_row=}') continue if verbose > 2: rich.print(f'... render {chan_row=}') try: stack_img_item, stack_ann_item = draw_chan_group( coco_dset, frame_id, name, ann_view_dpath, img_view_dpath, delayed, chan_row, finalize_opts, verbose, skip_missing, skip_aggressive, chan_to_normalizer, cmap, header_lines, valid_image_poly, draw_imgs, draw_anns, only_boxes, draw_boxes, draw_labels, draw_segmentations, role_to_dets, valid_video_poly, stack, draw_header, stack_idx, request_roles, ann_score_thresh, alpha) if stack: img_stack.append(stack_img_item) ann_stack.append(stack_ann_item) except SkipChanGroup: if verbose > 2: rich.print(f'... skipped render {chan_row=}') else: stack_idx += 1 if chan_row['chan'] is not None: handled_specs.add(chan_row['chan'].spec) if verbose > 2: rich.print(f'... success render {chan_row=}') if stack: if verbose > 2: rich.print('... stacking') img_stacked_dpath = (img_view_dpath / 'stack') ann_stacked_dpath = (ann_view_dpath / 'stack') view_img_fpath = img_stacked_dpath / (frame_id + '_' + name + '_stack' + '.view_img.jpg') view_ann_fpath = ann_stacked_dpath / (frame_id + '_' + name + '_stack' + '.view_ann.jpg') stack_header_lines = header_lines.copy() header_text = '\n'.join(stack_header_lines) def stack_infos(_stack): tostack = [] for item in _stack: if item is None: ... # print('warning: None stack item') else: canvas = item['im'] chan = item['chan'] # canvas = kwimage.ensure_float01(canvas, copy=True) canvas = kwimage.ensure_uint255(canvas) if draw_chancode: canvas = kwimage.draw_text_on_image( canvas, chan, (1, 2), valign='top', color='lime', border=3) tostack.append(canvas) if len(tostack) > 0: canvas = kwimage.stack_images(tostack) else: canvas = kwimage.draw_text_on_image(None, text='X') canvas = kwimage.imresize(canvas, dsize=(512, 512)) canvas = kwimage.ensure_uint255(canvas) return canvas if ann_stack: ann_stack_canvas = stack_infos(ann_stack) if draw_header: ann_header = kwimage.draw_header_text(image=ann_stack_canvas, text=header_text, stack=False, fit='shrink') ann_header = kwimage.imresize( # ann_header, dsize=(None, 100), letterbox=True) ann_header, dsize=(ann_header.shape[1], 100), letterbox=True) ann_canvas = kwimage.stack_images([ann_header, ann_stack_canvas]) else: ann_canvas = ann_stack_canvas view_ann_fpath.parent.ensuredir() kwimage.imwrite(view_ann_fpath, ann_canvas) if img_stack: img_stack_canvas = stack_infos(img_stack) if draw_header: img_header = kwimage.draw_header_text(image=img_stack_canvas, text=header_text, fit='shrink', stack=False) img_header = kwimage.imresize( img_header, dsize=(img_header.shape[1], 100), letterbox=True) img_canvas = kwimage.stack_images([img_header, img_stack_canvas]) else: img_canvas = img_stack_canvas view_img_fpath.parent.ensuredir() kwimage.imwrite(view_img_fpath, img_canvas) if verbose > 2: rich.print(f'--- End frame {frame_id}') def draw_chan_group(coco_dset, frame_id, name, ann_view_dpath, img_view_dpath, delayed, chan_row, finalize_opts, verbose, skip_missing, skip_aggressive, chan_to_normalizer, cmap, header_lines, valid_image_poly, draw_imgs, draw_anns, only_boxes, draw_boxes, draw_labels, draw_segmentations, role_to_dets, valid_video_poly, stack, draw_header, stack_idx, request_roles, ann_score_thresh, alpha): from geowatch.utils import util_kwimage import kwimage import kwarray import kwcoco import numpy as np import rich chan_pname = chan_row['pname'] chan_group_obj = chan_row['chan'] # import xdev # with xdev.embed_on_exception_context: img_chan_dpath = img_view_dpath / chan_pname ann_chan_dpath = ann_view_dpath / chan_pname # Prevent long names for docker (limit is 242 chars) if chan_group_obj is not None: chan_list = chan_group_obj.parsed chan_group = chan_group_obj.spec chan_pname2 = kwcoco.FusedChannelSpec.coerce(chan_group).path_sanitize(maxlen=10) prefix = '_'.join([frame_id, chan_pname2]) else: chan_group = None chan_list = None prefix = '_'.join([frame_id, 'null']) view_img_fpath = img_chan_dpath / prefix + '_' + name + '.view_img.jpg' view_ann_fpath = ann_chan_dpath / prefix + '_' + name + '.view_ann.jpg' if chan_group_obj is not None: chan = delayed.take_channels(chan_group) else: chan = delayed chan = chan.prepare().optimize() # When util_delayed_poc is removed, remove **delayed_ops # as they should be given in the constructor. raw_canvas = canvas = chan.finalize(**finalize_opts) # foo = kwimage.fill_nans_with_checkers(raw_canvas) if verbose > 1: rich.print('raw_canvas.shape = {!r}'.format(raw_canvas.shape)) rich.print('chan_list = {!r}'.format(chan_list)) try: # chan_stats = kwarray.stats_dict(raw_canvas, axis=2, nan=True) chan_stats = kwarray.stats_dict(raw_canvas, axis=(0, 1), nan=True, quantile=False) rich.print('chan_stats = {}'.format(ub.urepr(chan_stats, nl=1))) except Exception as ex: rich.print(f'ex={ex}') import warnings warnings.warn('Error printing chan stats, probably need kwarray >= 0.6.1') if skip_missing and np.all(np.isnan(raw_canvas)): if skip_aggressive: print('Skip because all is nan') raise SkipFrame raise SkipChanGroup # if skip_aggressive: # is_bad = np.isnan(raw_canvas).ravel() # percent_bad = is_bad.sum() / len(is_bad) # if percent_bad > 0.5: # print('Skip because some is nan') # print('skip') # raise SkipFrame if 0 and str(chan_group) == 'salient': # TEST CODE # blur1 = kwarray.atleast_nd(kwimage.gaussian_blur(raw_canvas, sigma=1.6), n=3) # blur2 = kwarray.atleast_nd(kwimage.gaussian_blur(raw_canvas, sigma=3.2), n=3) blur1 = kwarray.atleast_nd(kwimage.gaussian_blur(raw_canvas, sigma=0.8), n=3) blur2 = kwarray.atleast_nd(kwimage.gaussian_blur(raw_canvas, sigma=1.6), n=3) dog = blur1 - blur2 shift_dog = dog - min(0, np.nanmin(dog)) canvas = blur2 canvas = shift_dog # orig_max = np.nanmax(raw_canvas) # canvas = kwimage.normalize(shift_dog) * orig_max # canvas = canvas * raw_canvas canvas = raw_canvas # median = np.nanmedian(canvas) canvas = kwarray.atleast_nd(kwimage.gaussian_blur(canvas, sigma=3.6), n=3) median, = np.quantile(canvas.ravel()[~np.isnan(canvas.ravel())], q=[0.8]) # median = np.nanmean(canvas) + np.nanstd(canvas) canvas = (canvas - median).clip(0, None) canvas = np.sqrt(canvas) # raw_canvas = kwimage.normalize(dog) # raw_canvas = kwarray.atleast_nd(raw_canvas, n=3) # raw_canvas = raw_canvas[:, :, None] if verbose > 100: print('chan normalizer part') if chan_to_normalizer is None: dmax = np.nanmax(raw_canvas) # dmin = canvas.min() needs_norm = dmax > 1.0 # if canvas.max() <= 0 or canvas.min() >= 255: # Hack to only do noramlization on "non-standard" data ranges if needs_norm: mask = ~np.isnan(raw_canvas) norm_canvas = kwimage.normalize_intensity(raw_canvas, mask=mask, params={ 'high': 0.90, 'mid': 0.5, 'low': 0.01, 'mode': 'linear', }) canvas = norm_canvas canvas = np.clip(canvas, 0, None) else: new_parts = [] for cx, c in enumerate(chan_list): normalizer = chan_to_normalizer.get(c, None) data = canvas[..., cx] mask = ~np.isnan(data) if normalizer is None: p = kwimage.normalize_intensity(data, params={ 'high': 0.90, 'mid': 0.5, 'low': 0.01, 'mode': 'linear', }) else: p = kwarray.apply_normalizer(data, normalizer, mask=mask, set_value_at_mask=0.) new_parts.append(p) canvas = np.stack(new_parts, axis=2) if verbose > 100: print('after normalizer part') canvas = kwimage.nodata_checkerboard(canvas, on_value=0.3) if verbose > 100: print('after checkers part') # Do the channels correspond to classes with known colors? if chan_group_obj is not None: chan_names = chan_row['chan'].to_list() else: chan_names = [] channel_colors = [] from geowatch import heuristics # For some reason predict is not preserving categories for cat in heuristics.CATEGORIES: coco_dset.ensure_category(**cat) heuristics.ensure_heuristic_coco_colors(coco_dset) for cname in chan_names: if cname in coco_dset.index.name_to_cat: cat = coco_dset.index.name_to_cat[cname] if 'color' in cat: channel_colors.append(cat['color']) else: channel_colors.append(None) else: channel_colors.append(None) if verbose > 100: print('after channel colors part') if any(c is not None for c in channel_colors): # This flag makes it so 1 channel outputs always use cmap. # not sure if I like that or not, probably needs to be configurable _flag = kwimage.num_channels(canvas) != 1 if _flag: if verbose > 100: print('do perchannel_colorize') print(f'channel_colors={channel_colors}') canvas = util_kwimage.perchannel_colorize(canvas, channel_colors=channel_colors) if verbose > 100: print('finished perchannel_colorize') canvas = canvas[..., 0:3] if verbose > 100: print('finished channel color part') if cmap is not None: if kwimage.num_channels(canvas) == 1: if verbose > 100: print('doing 1 channel cmap') import matplotlib as mpl if chan_group == 'pan': # Use grayscale for certain 1 band images canvas = np.nan_to_num(canvas) canvas = kwimage.atleast_3channels(canvas) if len(canvas) == 3: canvas = canvas[..., 0] # canvas = kwimage.ensure_float01(canvas) else: try: import matplotlib.cm # NOQA cmap_ = mpl.cm.get_cmap(cmap) except AttributeError: # https://github.com/matplotlib/matplotlib/issues/20853 cmap_ = mpl.colormaps[cmap] canvas = np.nan_to_num(canvas) if len(canvas.shape) == 3: canvas = canvas[..., 0] canvas = cmap_(canvas)[..., 0:3].astype(np.float32) if verbose > 100: print('after cmap part') canvas = util_kwimage.ensure_false_color(canvas) canvas = kwimage.ensure_uint255(canvas) if len(canvas.shape) > 2 and canvas.shape[2] > 4: # hack for wv canvas = canvas[..., 0] chan_header_lines = header_lines.copy() chan_header_lines.append(str(chan_group)) header_text = '\n'.join(chan_header_lines) if valid_image_poly is not None: # Draw the valid region specified at the image level if any([p.data['exterior'].data.size for p in valid_image_poly.data]): canvas = valid_image_poly.draw_on(canvas, color='kitware_green', fill=False, border=True, alpha=alpha) if valid_video_poly is not None: # Draw the valid region specified at the video level if any([p.data['exterior'].data.size for p in valid_video_poly.data]): canvas = valid_video_poly.draw_on(canvas, color='lawngreen', fill=False, border=True, alpha=alpha) stack_imgs = draw_imgs and stack stack_anns = draw_anns and stack draw_anns_alone = draw_anns and stack != 'only' draw_imgs_alone = draw_imgs and stack != 'only' if draw_anns_alone: ann_chan_dpath.ensuredir() if draw_imgs_alone: img_chan_dpath.ensuredir() img_canvas = None ann_canvas = None img_stack_item = None ann_stack_item = None if verbose > 100: print('before canvas parts') rich.print(f'draw_imgs_alone={draw_imgs_alone}') rich.print(f'draw_anns_alone={draw_anns_alone}') rich.print(f'stack_imgs={stack_imgs}') rich.print(f'stack_anns={stack_anns}') if draw_imgs_alone or stack_imgs: img_canvas = kwimage.ensure_uint255(canvas, copy=True) if stack_imgs: img_stack_item = { 'im': img_canvas, 'chan': chan_group, } if draw_imgs_alone: img_header = kwimage.draw_header_text(image=img_canvas, text=header_text, stack=False, fit='shrink') img_header = kwimage.stack_images([img_header, img_canvas]) kwimage.imwrite(view_img_fpath, img_canvas) if draw_anns_alone or stack_anns: ONLY_BOXES = only_boxes if ONLY_BOXES: ub.schedule_deprecation( 'geowatch', 'only_boxes', 'argument', deprecate='now', error='1.0.0', remove='1.1.0', ) draw_on_kwargs = dict(sseg=False, labels=False) else: draw_on_kwargs = {} draw_on_kwargs['labels'] = bool(draw_labels) draw_on_kwargs['sseg'] = bool(draw_segmentations) draw_on_kwargs['boxes'] = bool(draw_boxes) requested_role_to_dets = role_to_dets.intersection(request_roles) need_ann_canvas = ( bool(requested_role_to_dets) or img_canvas is None or draw_anns_alone ) need_ann_canvas = True if need_ann_canvas: ann_canvas = kwimage.ensure_float01(canvas, copy=True) if draw_anns_alone and not requested_role_to_dets: # fallback to drawing all anns in this weird case requested_role_to_dets = role_to_dets for role_dets in requested_role_to_dets.values(): # TODO: better role handling colors = [kwimage.Color.coerce(c).as01() for c in role_dets.data['colors']] if verbose > 100: print('About to draw dets on a canvas') ann_canvas = role_dets.draw_on( ann_canvas, color=colors, ssegkw={'fill': False, 'border': True, 'edgecolor': colors}, # color='classes', **draw_on_kwargs, alpha=alpha) if verbose > 100: print('That seemed to work') if stack_anns: if ann_canvas is None: ann_canvas = img_canvas.copy() ann_stack_item = { 'im': ann_canvas, 'chan': chan_group, } if draw_anns_alone: assert ann_canvas is not None ann_canvas = kwimage.ensure_uint255(ann_canvas) if draw_header: ann_header = kwimage.draw_header_text(image=ann_canvas, text=header_text, stack=False, fit='shrink') ann_header = kwimage.imresize( ann_header, dsize=(ann_header.shape[1], 100), letterbox=True) ann_canvas = kwimage.stack_images([ann_header, ann_canvas]) kwimage.imwrite(view_ann_fpath, ann_canvas) if verbose > 100: print('returning canvases') return img_stack_item, ann_stack_item if __name__ == '__main__': main(cmdline=True)