#!/usr/bin/env python3 """ See Old Version: ../../../scripts/run_bas_fusion_eval3_for_baseline.py SeeAlso: ~/code/watch-smartflow-dags/KIT_TA2_PREEVAL10_PYENV.py """ import json import os import scriptconfig as scfg import shutil import ubelt as ub class BasFusionConfig(scfg.DataConfig): """ Run TA-2 BAS fusion as baseline framework component python ~/code/watch/geowatch/cli/smartflow/run_bas_fusion.py """ input_path = scfg.Value(None, type=str, position=1, required=True, help=ub.paragraph( ''' Path to the STAC items this step can use as inputs. This is usually an S3 Path. '''), alias=['input_stac_path']) input_region_path = scfg.Value(None, type=str, position=2, required=True, help=ub.paragraph( ''' Path to input T&E Baseline Framework Region definition JSON ''')) output_path = scfg.Value(None, type=str, position=3, required=True, help=ub.paragraph( ''' Path to the STAC items that register the outputs of this stage. This is usually an S3 Path. '''), alias=['output_stac_path']) aws_profile = scfg.Value(None, type=str, help=ub.paragraph( ''' AWS Profile to use for AWS S3 CLI commands. ''')) dryrun = scfg.Value(False, isflag=True, short_alias=['d'], help='DEPRECATED. DO NOT USE.') outbucket = scfg.Value(None, type=str, required=True, short_alias=['o'], help=ub.paragraph( ''' S3 Output directory for STAC item / asset egress ''')) ta2_s3_collation_bucket = scfg.Value(None, type=str, help=ub.paragraph( ''' S3 Location for collated TA-2 output (bucket name should include up to eval name) ''')) previous_bas_outbucket = scfg.Value(None, type=str, help=ub.paragraph( ''' S3 Output directory for previous interval BAS fusion output ''')) bas_pxl_config = scfg.Value(None, type=str, help=ub.paragraph( ''' Raw json/yaml or a path to a json/yaml file that specifies the config for fusion.predict. ''')) bas_poly_config = scfg.Value(None, type=str, help=ub.paragraph( ''' Raw json/yaml or a path to a json/yaml file that specifies the config for bas tracking. ''')) time_dense = scfg.Value(False, isflag=True, help=ub.paragraph( ''' Use time_dense imagery. Defaults to False and uses time averaged data. ''')) egress_intermediate_outputs = scfg.Value(False, isflag=True, help=ub.paragraph( ''' If true egress intermediate heatmaps, otherwise only egress the geojson ''')) __debug_notes__ = r""" config = { 'input_path' : 's3://smartflow-023300502152-us-west-2/smartflow/env/kw-v3-0-0/work/preeval17_batch_v130/batch/kit/KR_R002/2021-08-31/split/mono/products/cold/items.jsonl', 'input_region_path' : 's3://smartflow-023300502152-us-west-2/smartflow/env/kw-v3-0-0/work/preeval17_batch_v130/batch/kit/KR_R002/2021-08-31/input/mono/region_models/KR_R002.geojson', 'output_path' : 's3://smartflow-023300502152-us-west-2/smartflow/env/kw-v3-0-0/work/preeval17_batch_v130/batch/kit/KR_R002/2021-08-31/split/mono/products/bas-fusion/items.jsonl', 'aws_profile' : None, 'dryrun' : False, 'outbucket' : 's3://smartflow-023300502152-us-west-2/smartflow/env/kw-v3-0-0/work/preeval17_batch_v130/batch/kit/KR_R002/2021-08-31/split/mono/products/bas-fusion', 'ta2_s3_collation_bucket': None, 'previous_bas_outbucket' : None, 'bas_pxl_config' : 'chip_dims: auto\nchip_overlap: 0.3\nfixed_resolution: 10GSD\nnum_workers: 24\npackage_fpath: /root/data/smart_expt_dvc/models/fusion/uconn/D7-V2-COLD-candidate/epoch=203-step=4488.pt\ntime_sampling: soft4\ntime_span: auto\ntta_fliprot: 3\ntta_time: 3', 'bas_poly_config' : 'agg_fn: probs\ninner_agg_fn: mean\ninner_window_size: 1y\nmax_area_square_meters: 8000000\nmin_area_square_meters: 7200\nmoving_window_size: null\nnorm_ord: inf\npoly_merge_method: v2\npolygon_simplify_tolerance: 1\nresolution: 10GSD\nthresh: 0.375\ntime_thresh: 0.8', } config = BasFusionConfig(**config) """ def main(): config = BasFusionConfig.cli(strict=True) print('config = {}'.format(ub.urepr(dict(config), nl=1, align=':'))) run_bas_fusion_for_baseline(config) def run_bas_fusion_for_baseline(config): from geowatch.cli.smartflow_ingress import smartflow_ingress from geowatch.cli.smartflow_egress import smartflow_egress from geowatch.cli.concat_kwcoco_videos import concat_kwcoco_datasets from geowatch.utils.util_framework import download_region, determine_region_id from geowatch.tasks.fusion.predict import predict from kwutil.util_yaml import Yaml from geowatch.utils import util_framework from geowatch.utils import util_fsspec #### from geowatch.utils.util_framework import NodeStateDebugger node_state = NodeStateDebugger() node_state.print_environment() node_state.print_local_invocation(config) input_path = config.input_path input_region_path = config.input_region_path outbucket = config.outbucket aws_profile = config.aws_profile dryrun = config.dryrun previous_bas_outbucket = config.previous_bas_outbucket ta2_s3_collation_bucket = config.ta2_s3_collation_bucket if aws_profile is not None: # This should be sufficient, but it is not tested. util_fsspec.S3Path._new_fs(profile=aws_profile) assert not dryrun, 'unsupported' # 1. Ingress data print("* Running baseline framework kwcoco ingress *") ingress_dir = ub.Path('/tmp/ingress') assets = [ 'enriched_bas_kwcoco_file', 'enriched_bas_kwcoco_teamfeats', 'enriched_bas_kwcoco_rawbands', {"key": 'hacked_cold_assets', "allow_missing": True}, {"key": 'landcover_assets', "allow_missing": True}, ] if config.time_dense: assets += [ 'timedense_bas_kwcoco_file', 'timedense_bas_kwcoco_rawbands', ] ingressed_assets = smartflow_ingress( input_path=input_path, assets=assets, outdir=ingress_dir, aws_profile=aws_profile, dryrun=dryrun ) # # 2. Download and prune region file print("* Downloading and pruning region file *") local_region_path = '/tmp/region.json' download_region( input_region_path=input_region_path, output_region_path=local_region_path, aws_profile=aws_profile, strip_nonregions=True, ensure_comments=True, ) # Determine the region_id in the region file. region_id = determine_region_id(local_region_path) # 3. Run fusion print("* Running BAS fusion *") bas_fusion_kwcoco_path = ingress_dir / 'bas_fusion_kwcoco.json' node_state.print_current_state(ingress_dir) # TODO: remove these defaults or replace them with whatever is the # default in predict. The params should be fully given in the DAG, not # here. default_predict_config = ub.udict({ "chip_overlap": 0.3, "chip_dims": "auto", "time_span": "auto", "time_sampling": "auto", "drop_unused_frames": True, "batch_size": 1, "num_workers": 2, 'package_fpath': None, }) bas_pxl_config = default_predict_config | Yaml.coerce(config.bas_pxl_config or {}) if bas_pxl_config.get('package_fpath', None) is None: raise ValueError('Requires package_fpath') print('bas_pxl_config = {}'.format(ub.urepr(bas_pxl_config, nl=1))) ingress_kwcoco_path = ingressed_assets['enriched_bas_kwcoco_file'] if 1: # Print debug info about input BAS items print('-- input BAS kwcoco stats --') ub.cmd(f'kwcoco stats {ingress_kwcoco_path}', verbose=3) ub.cmd(f'geowatch stats {ingress_kwcoco_path}', verbose=3) if 0: import kwcoco src_dset = kwcoco.CocoDataset(ingress_kwcoco_path) src_dset.validate() predict(devices='0,', write_preds=False, write_probs=True, with_change=False, with_saliency=True, with_class=False, test_dataset=ingress_kwcoco_path, pred_dataset=bas_fusion_kwcoco_path, **bas_pxl_config) node_state.print_current_state(ingress_dir) # 3.1. If a previous interval was run; concatenate BAS fusion # output KWCOCO files for tracking if previous_bas_outbucket is not None: combined_bas_fusion_kwcoco_path = ingress_dir / 'combined_bas_fusion_kwcoco.json' previous_ingress_dir = ub.Path('/tmp/ingress_previous') previous_bas_outbucket = util_fsspec.FSPath.coerce(previous_bas_outbucket) previous_ingress_dir = util_fsspec.FSPath.coerce(previous_ingress_dir) previous_bas_outbucket.copy(previous_ingress_dir, recursive=True) previous_bas_fusion_kwcoco_path = previous_ingress_dir / 'combined_bas_fusion_kwcoco.json' # On first interval nothing will be copied down so need to # check that we have the input explicitly if os.path.isfile(previous_bas_fusion_kwcoco_path): concat_kwcoco_datasets( (previous_bas_fusion_kwcoco_path, bas_fusion_kwcoco_path), combined_bas_fusion_kwcoco_path) # Copy saliency assets from previous bas fusion shutil.copy_tree( previous_ingress_dir / '_assets/pred_saliency', ingress_dir / '_assets/pred_saliency' ) # Copy original assets from previous bas rusion shutil.copy_tree( previous_ingress_dir / region_id, ingress_dir / region_id ) else: # Copy current bas_fusion_kwcoco_path to combined path as # this is the first interval shutil.copy(bas_fusion_kwcoco_path, combined_bas_fusion_kwcoco_path) else: combined_bas_fusion_kwcoco_path = bas_fusion_kwcoco_path # 4. Compute tracks (BAS) print("* Computing tracks (BAS) *") bas_region_models_outdir = (ingress_dir / 'region_models').ensuredir() bas_site_models_outdir = (ingress_dir / 'site_models_bas').ensuredir() region_models_manifest_outdir = (ingress_dir / 'tracking_manifests_bas').ensuredir() region_models_manifest_outpath = region_models_manifest_outdir / 'region_models_manifest.json' site_models_manifest_outpath = region_models_manifest_outdir / 'site_models_manifest.json' # Copy input region model into region_models outdir to be updated # (rather than generated from tracking, which may not have the # same bounds as the original) shutil.copy(local_region_path, bas_region_models_outdir / f'{region_id}.geojson') # TODO: remove these defaults or replace them with whatever is the # default in tracker. The params should be fully given in the DAG, # not here. default_bas_tracking_config = ub.udict({ "thresh": 0.1, "moving_window_size": None, "polygon_simplify_tolerance": 1, "max_area_behavior": 'ignore' }) bas_tracking_config = (default_bas_tracking_config | Yaml.coerce(config.bas_poly_config or {})) min_area_square_meters = bas_tracking_config.get('min_area_square_meters', None) time_pad_after = bas_tracking_config.pop('time_pad_after', None) time_pad_before = bas_tracking_config.pop('time_pad_before', None) # TODO: use smart_pipeline.BAS_PolygonPrediction tracked_bas_kwcoco_path = '_tracked'.join( os.path.splitext(bas_fusion_kwcoco_path)) ub.cmd([ 'python', '-m', 'geowatch.cli.run_tracker', combined_bas_fusion_kwcoco_path, '--out_sites_dir', bas_site_models_outdir, '--out_sites_fpath', site_models_manifest_outpath, '--out_site_summaries_dir', bas_region_models_outdir, '--out_site_summaries_fpath', region_models_manifest_outpath, '--out_kwcoco', tracked_bas_kwcoco_path, '--default_track_fn', 'saliency_heatmaps', '--append_mode', 'True', '--time_pad_after', str(time_pad_after), '--time_pad_before', str(time_pad_before), # TODO: # use boundary_region here? '--track_kwargs', json.dumps(bas_tracking_config)], check=True, verbose=3) # Remove after boundary_region here cropped_region_models_outdir = (ingress_dir / 'cropped_region_models_bas').ensuredir() cropped_site_models_outdir = (ingress_dir / 'cropped_site_models_bas').ensuredir() node_state.print_current_state(ingress_dir) crop_cmd = [ 'python', '-m', 'geowatch.cli.crop_sites_to_regions', '--site_models', bas_site_models_outdir / '*.geojson', '--region_models', bas_region_models_outdir / '*.geojson', '--new_site_dpath', cropped_site_models_outdir, '--new_region_dpath', cropped_region_models_outdir, '--min_area_square_meters', str(min_area_square_meters) ] ub.cmd(crop_cmd, check=True, verbose=3) if 1: # Print debug info about output BAS items print('-- output BAS kwcoco stats --') ub.cmd(f'kwcoco stats {tracked_bas_kwcoco_path}', verbose=3) ub.cmd(f'geowatch stats {tracked_bas_kwcoco_path}', verbose=3) print('Show stats about site inputs:') ub.cmd(f'geowatch site_stats {cropped_region_models_outdir}', verbose=3, system=True) # Validate and fix all outputs try: util_framework.fixup_and_validate_site_and_region_models( region_dpath=cropped_region_models_outdir, site_dpath=cropped_site_models_outdir, ) except Exception as ex: # noqa print(f'Encountered Exception, ex={ex}, uploading debug informaiton and then exiting') debug_s3_outdir = os.path.join(outbucket, '_debug/') cropped_region_models_outdir = util_fsspec.FSPath.coerce(cropped_region_models_outdir) debug_s3_outdir = util_fsspec.FSPath.coerce(debug_s3_outdir) cropped_region_models_outdir.copy(debug_s3_outdir, recursive=True) cropped_site_models_outdir.copy(debug_s3_outdir, recursive=True) raise if __debug__: from geowatch.geoannots import geomodels all_regions_models = list(geomodels.RegionModel.coerce_multiple(cropped_region_models_outdir)) all_site_models = list(geomodels.SiteModel.coerce_multiple(cropped_site_models_outdir)) assert len(all_regions_models) == 1, 'should only be 1 output region model' out_region_model = all_regions_models[0] num_site_summaries = len(list(out_region_model.site_summaries())) num_sites = len(all_site_models) assert num_sites == num_site_summaries, 'number of site summaries should be the same as the number of sites' # 6. (Optional) collate TA-2 output if ta2_s3_collation_bucket is not None: # only used if we are going to submit our BAS predictions as the final # ones? print("* Collating TA-2 output") util_framework.ta2_collate_output(None, cropped_region_models_outdir, cropped_site_models_outdir, ta2_s3_collation_bucket) EGRESS_INTERMEDIATE_OUTPUTS = config.egress_intermediate_outputs if EGRESS_INTERMEDIATE_OUTPUTS: # Reroot kwcoco files to make downloaded results easier to work with ub.cmd(['kwcoco', 'reroot', f'--src={bas_fusion_kwcoco_path}', '--inplace=1', '--absolute=0']) ub.cmd(['kwcoco', 'reroot', f'--src={tracked_bas_kwcoco_path}', '--inplace=1', '--absolute=0']) # Add BAS saliency outputs to egressed attributes for debugging ingressed_assets['bas_pred_saliency_assets'] = ingress_dir / '_assets/pred_saliency' ingressed_assets['bas_fusion_kwcoco_path'] = bas_fusion_kwcoco_path ingressed_assets['bas_original_site_models_outdir'] = bas_site_models_outdir ingressed_assets['bas_original_region_models_outdir'] = bas_region_models_outdir ingressed_assets['tracked_bas_kwcoco_path'] = tracked_bas_kwcoco_path node_state.print_current_state(ingress_dir) # 6. Egress (envelop KWCOCO dataset in a STAC item and egress; # will need to recursive copy the kwcoco output directory up to # S3 bucket) print("* Egressing KWCOCO dataset and associated STAC item *") ingressed_assets['cropped_region_models_bas'] = cropped_region_models_outdir ingressed_assets['cropped_site_models_bas'] = cropped_site_models_outdir smartflow_egress(ingressed_assets, local_region_path, config.output_path, config.outbucket, aws_profile=None, dryrun=False, newline=False) if __name__ == "__main__": main()