import tempfile import subprocess import json import os import pystac import ubelt as ub from os.path import join def egress_item(stac_item, outbucket, aws_base_command): if isinstance(stac_item, dict): stac_item_dict = stac_item elif isinstance(stac_item, pystac.Item): stac_item_dict = stac_item.to_dict() else: raise TypeError("Expecting 'stac_item' to be either a dictionary " "or pystac.Item") stac_item_outpath = join( outbucket, "{}.json".format(stac_item.id)) assets_outdir = join(outbucket, stac_item.id) for asset_name, asset in stac_item_dict.get('assets', {}).items(): asset_basename = os.path.basename(asset['href']) asset_outpath = join(assets_outdir, asset_basename) command = [*aws_base_command, asset['href'], asset_outpath] print("Running: {}".format(' '.join(command))) # TODO: Manually check return code / output subprocess.run(command, check=True) # Update feature asset href to point to local outpath asset['href'] = asset_outpath with tempfile.NamedTemporaryFile() as temporary_file: with open(temporary_file.name, 'w') as f: print(json.dumps(stac_item_dict, indent=2), file=f) command = [*aws_base_command, temporary_file.name, stac_item_outpath] subprocess.run(command, check=True) output_stac_item = pystac.Item.from_dict(stac_item_dict) output_stac_item.set_self_href(stac_item_outpath) return output_stac_item def ingress_item(feature, outdir, aws_base_command, dryrun, relative=False, virtual=False): """ Originally from the baseline_framework_ingress code; could probably be cleaned up. FIXME: Something is this is not concurrent-safe """ import subprocess from urllib.parse import urlparse import pystac SENTINEL_PLATFORMS = {'sentinel-2b', 'sentinel-2a'} # Adding a reference back to the original STAC # item if not already present self_link = None has_original = False for link in feature.get('links', ()): if link['rel'] == 'self': self_link = link elif link['rel'] == 'original': has_original = True if not has_original and self_link is not None: feature.setdefault('links', []).append( {'rel': 'original', 'href': self_link['href'], 'type': 'application/json'}) # Should only be added the first time the item is ingressed if 'watch:original_item_id' not in feature['properties']: feature['properties']['watch:original_item_id'] = feature['id'] assets = feature.get('assets', {}) # HTML index page for certain Landsat items, not needed here # so remove from assets dict if 'index' in assets: del assets['index'] new_assets = {} assets_to_remove = set() for asset_name, asset in assets.items(): asset_basename = os.path.basename(asset['href']) feature_output_dir = os.path.join(outdir, feature['id']) asset_outpath = os.path.join(feature_output_dir, asset_basename) asset_href = asset['href'] try: if ('productmetadata' not in assets and feature['properties']['platform'] in SENTINEL_PLATFORMS and asset_name == 'metadata'): asset_outpath = os.path.join( feature_output_dir, 'MTD_TL.xml') new_asset = download_mtd_msil1c( feature['properties']['sentinel:product_id'], asset_href, feature_output_dir, aws_base_command, dryrun) if new_asset is not None: new_assets['productmetadata'] = new_asset except KeyError: pass local_asset_href = os.path.abspath(asset_outpath) if relative: local_asset_href = os.path.relpath(asset_outpath, outdir) if not dryrun: os.makedirs(feature_output_dir, exist_ok=True) if os.path.isfile(asset_outpath): print('Asset already exists at outpath {!r}, ' 'not redownloading'.format(asset_outpath)) # Update feature asset href to point to local outpath asset['href'] = local_asset_href else: # Prefer to pull asset from S3 if available parsed_asset_href = urlparse(asset_href) if (parsed_asset_href.scheme != 's3' and 'alternate' in asset and 's3' in asset['alternate']): asset_href_for_download = asset['alternate']['s3']['href'] else: asset_href_for_download = asset_href # Need to reparse the href if it switched from http to s3 parsed_asset_href = urlparse(asset_href_for_download) if virtual: if parsed_asset_href.scheme == 's3': virtual_href = '/vsis3/{}{}'.format( parsed_asset_href.netloc, parsed_asset_href.path) # print(f'virtual_href={virtual_href}') asset['href'] = virtual_href elif parsed_asset_href.scheme in {'http', 'https'}: virtual_href = '/vsicurl/{}://{}{}'.format( parsed_asset_href.scheme, parsed_asset_href.netloc, parsed_asset_href.path) # print(f'virtual_href={virtual_href}') asset['href'] = virtual_href else: print("* Unsupported URI scheme '{}' for virtual ingress; " "not updating href: {}".format( parsed_asset_href.scheme, asset_href)) else: try: success = download_file(asset_href_for_download, asset_outpath, aws_base_command, dryrun) except subprocess.CalledProcessError: print("* Error * Couldn't download asset from href: '{}', " "removing asset from item!".format( asset_href_for_download)) assets_to_remove.add(asset_name) continue else: if success: asset['href'] = local_asset_href else: print('Warning unrecognized scheme for asset href: ' '{!r}, skipping!'.format( asset_href_for_download)) continue for asset_name in assets_to_remove: del assets[asset_name] for new_asset_name, new_asset in new_assets.items(): assets[new_asset_name] = new_asset item = pystac.Item.from_dict(feature) item.set_collection(None) # Clear the collection if present item_href = os.path.join(outdir, feature['id'], feature['id'] + '.json') # Transform to absolute item_href = os.path.abspath(item_href) if relative: # Transform to relative if requested item_href = os.path.relpath(item_href, outdir) item.set_self_href(item_href) # print('item = {}'.format(ub.urepr(item.to_dict(), nl=2))) return item def download_mtd_msil1c(product_id, metadata_href, outdir, aws_base_command, dryrun): from datetime import datetime as datetime_cls import subprocess from urllib.parse import urlparse # The metadata of the product, which tile is part of, are available in # parallel folder (productInfo.json contains the name of the product). # This can be found in products/[year]/[month]/[day]/[product name]. # (https://roda.sentinel-hub.com/sentinel-s2-l1c/readme.html) try: dt = datetime_cls.strptime(product_id.split('_')[2], '%Y%m%dT%H%M%S') except ValueError: # Support for older format product ID format, e.g.: # "S2A_OPER_PRD_MSIL1C_PDMC_20160413T135705_R065_V20160412T102058_20160412T102058" dt = datetime_cls.strptime(product_id.split('_')[7][1:], '%Y%m%dT%H%M%S') scheme, netloc, path, *_ = urlparse(metadata_href) index = path.find('tiles') path = (path[:index] + f'products/{dt.year}/{dt.month}/{dt.day}/{product_id}/metadata.xml') mtd_msil1c_href = f'{scheme}://{netloc}{path}' mtd_msil1c_outpath = os.path.join(outdir, 'MTD_MSIL1C.xml') try: success = download_file( mtd_msil1c_href, mtd_msil1c_outpath, aws_base_command, dryrun) except subprocess.CalledProcessError: print('* Warning * Failed to download MTD_MSIL1C.xml') return None if success: return { 'href': mtd_msil1c_outpath, 'type': 'application/xml', 'title': 'Product XML metadata', 'roles': ['metadata'] } else: print('Warning unrecognized scheme for asset href: {!r}, ' 'skipping!'.format(mtd_msil1c_href)) return None def download_file(href, outpath, aws_base_command, dryrun): import subprocess from urllib.parse import urlparse # TODO: better handling of possible download failure? scheme, *_ = urlparse(href) verbose = 0 if scheme == 's3': command = [*aws_base_command, href, outpath] if verbose > 1: print('Running: {}'.format(' '.join(command))) # TODO: Manually check return code / output subprocess.run(command, check=True) elif scheme in {'https', 'http'}: if verbose > 1: print('Downloading: {!r} to {!r}'.format(href, outpath)) if not dryrun: download_http_file(href, outpath) else: return False return True def download_http_file(url, outpath): import requests response = requests.get(url) with open(outpath, 'wb') as outf: for chunk in response.iter_content(chunk_size=128): outf.write(chunk) class CacheItemOutputS3Wrapper: def __init__(self, item_map, outbucket, aws_profile=None): self.item_map = item_map self.outbucket = outbucket aws_cp = AWS_S3_Command('cp') aws_cp.update( profile=aws_profile, no_progress=True, ) self.aws_base_command = aws_cp.finalize() def __call__(self, stac_item, *args, **kwargs): with tempfile.TemporaryDirectory() as tmpdirname: status_file_basename = '{}.done'.format(stac_item['id']) status_item_s3_path = os.path.join( self.outbucket, 'status', status_file_basename) status_item_local_path = os.path.join( tmpdirname, status_file_basename) try: subprocess.run([*self.aws_base_command, status_item_s3_path, status_item_local_path], check=True) except subprocess.CalledProcessError: pass else: print("* Item: {} previously processed, not " "re-processing".format(stac_item['id'])) with open(status_item_local_path) as f: return [json.loads(line) for line in f] output_stac_items = self.item_map(stac_item, *args, **kwargs) output_status_file = os.path.join( tmpdirname, '{}.output.done'.format(stac_item['id'])) with open(output_status_file, 'w') as outf: if isinstance(output_stac_items, dict): print(json.dumps(output_stac_items), file=outf) elif isinstance(output_stac_items, pystac.Item): print(json.dumps(output_stac_items.to_dict()), file=outf) else: for output_item in output_stac_items: if isinstance(output_item, pystac.Item): print(json.dumps(output_item.to_dict()), file=outf) else: print(json.dumps(output_item), file=outf) subprocess.run([*self.aws_base_command, output_status_file, status_item_s3_path], check=True) return output_stac_items def _default_item_selector(stac_item): return True def _default_asset_selector(asset_name, asset): return True class IngressProcessEgressWrapper: def __init__(self, item_map, outbucket, aws_base_command, dryrun=False, stac_item_selector=_default_item_selector, asset_selector=_default_asset_selector, skip_egress=False): self.item_map = item_map self.outbucket = outbucket self.aws_base_command = aws_base_command self.dryrun = dryrun self.stac_item_selector = stac_item_selector self.asset_selector = asset_selector self.skip_egress = skip_egress def __call__(self, stac_item, *args, **kwargs): # Assumes that the 'self.item_map' function accepts # 'stac_item' and 'working_dir' arguments. TODO: actually # check this via introspection print("* Processing item: {}".format(stac_item['id'])) if not self.stac_item_selector(stac_item): print("** STAC item {} did not satisfy selector, not " "processing".format(stac_item['id'])) return [stac_item] with tempfile.TemporaryDirectory() as tmpdirname: ingressed_item = ingress_item( stac_item, os.path.join(tmpdirname, 'ingress'), self.aws_base_command, self.dryrun, relative=False, asset_selector=self.asset_selector) # Stripping the 'root' link here as it usually refers to # the catalog which isn't ingressed when we call # ingress_item directly (can throw exceptions when trying # to convert to dict or serialize when the catalog is # missing) ingressed_item.remove_links('root') processed_item = self.item_map( ingressed_item, tmpdirname, *args, **kwargs) processed_items = [] if isinstance(processed_item, dict): processed_items.append(pystac.Item.from_dict(processed_item)) elif isinstance(processed_item, pystac.Item): processed_items.append(processed_item) else: # Assume already an iterable of pystac.Item processed_items = processed_item if self.skip_egress: return processed_items output_items = [] for item in processed_items: output_items.append(egress_item(item, self.outbucket, self.aws_base_command)) # Returning a list here return output_items def download_region(input_region_path, output_region_path, aws_profile=None, strip_nonregions=False, ensure_comments=False): from geowatch.utils.util_fsspec import FSPath from geowatch.geoannots import geomodels # TODO: handle aws_profile assert aws_profile is None, 'unhandled' input_region_path = FSPath.coerce(input_region_path) with input_region_path.open('r') as file: out_region_data = json.load(file) region = geomodels.RegionModel.coerce(out_region_data) region.fixup() if strip_nonregions: region.strip_body_features() if ensure_comments: region.ensure_comments() with open(output_region_path, 'w') as f: print(region.dumps(indent=2), file=f) return output_region_path def determine_region_id(region_fpath): """ Args: region_fpath (str | PathLike): the path to a region model geojson file Returns: str | None : the region id if we can find one """ region_id = None with open(region_fpath, 'r') as file: region_data = json.load(file) for feature in region_data.get('features', []): props = feature['properties'] if props['type'] == 'region': region_id = props.get('region_id', props.get('region_model_id')) break return region_id class AWS_S3_Command: """ Helper to build and execute AWS S3 bash commands Note: probably should use fsspec instead of this in most cases. References: https://docs.aws.amazon.com/cli/latest/reference/s3/ Example: >>> from geowatch.utils.util_framework import * # NOQA >>> self = AWS_S3_Command('ls', 's3://foo/bar') >>> self.update(profile='myprofile') >>> print(self.finalize()) ['aws', 's3', '--profile', 'myprofile', 'ls', 's3://foo/bar'] >>> self = AWS_S3_Command('cp', 's3://foo/bar', '/foo/bar', quiet=True, no_progress=True, color='auto') >>> print(self.finalize()) ['aws', 's3', '--quiet', '--no-progress', '--color', 'auto', 'cp', 's3://foo/bar', '/foo/bar'] Example: >>> # Reuse the same command object with different positional args >>> aws_cmd = AWS_S3_Command('cp') >>> aws_cmd.update( >>> profile='myprof', >>> only_show_errors=True >>> ) >>> aws_cmd.args = ['s3://data1', '/local/data1'] >>> print(aws_cmd.finalize()) ['aws', 's3', '--only-show-errors', '--profile', 'myprof', 'cp', 's3://data1', '/local/data1'] >>> # Set the `args` attribute to get a new command while keeping >>> # existing options. >>> aws_cmd.update(recursive=True) >>> aws_cmd.args = ['s3://data2', '/local/data2'] >>> print(aws_cmd.finalize()) ['aws', 's3', '--only-show-errors', '--recursive', '--profile', 'myprof', 'cp', 's3://data2', '/local/data2'] Example: >>> # There is no need to specify the entire command. If you want >>> # to simply build a command prefix, then that works too. >>> aws_cmd = AWS_S3_Command('cp', profile='myprof', aws_storage_class='foobar') >>> print(aws_cmd.finalize()) ['aws', 's3', '--profile', 'myprof', '--aws-storage-class', 'foobar', 'cp'] """ # Register known options for known commands # TODO: multi values cmd_known_flags = {} cmd_known_keyvals = {} cmd_known_flags['ls'] = [ 'recursive', 'human-readable', 'summarize', 'debug', 'no-verify-ssl', 'no-paginate', 'no-sign-request', ] cmd_known_keyvals['ls'] = [ 'endpoint-url', 'page-size', 'request-payer', 'output', 'query', 'profile', 'region', 'version', 'color', 'ca-bundle', 'cli-read-timeout', 'cli-connect-timeout', ] cmd_known_flags['sync'] = [ 'dryrun', 'quiet', 'follow-symlinks', 'no-follow-symlinks', 'no-guess-mime-type', 'only-show-errors', 'no-progress', 'ignore-glacier-warnings', 'force-glacier-transfer', 'size-only', 'exact-timestamps', 'delete', 'debug', 'no-verify-ssl', 'no-paginate', 'no-sign-request', ] cmd_known_keyvals['sync'] = [ 'include', 'exclude', 'acl', 'sse', 'sse-c', 'sse-c-key', 'sse-kms-key-id', 'sse-c-copy-source', 'sse-c-copy-source-key', 'storage-class', 'grants', 'website-redirect', 'content-type', 'cache-control', 'content-disposition', 'content-encoding', 'content-language', 'expires', 'source-region', 'page-size', 'request-payer', 'metadata', 'metadata-directive', 'endpoint-url', 'output', 'query', 'profile', 'region', 'version', 'color', 'ca-bundle', 'cli-read-timeout', 'cli-connect-timeout', ] cmd_known_flags['cp'] = [ 'dryrun', 'quiet', 'follow-symlinks', 'no-follow-symlinks', 'no-guess-mime-type', 'only-show-errors', 'no-progress', 'ignore-glacier-warnings', 'force-glacier-transfer', 'recursive', 'debug', 'no-verify-ssl', 'no-paginate', ] cmd_known_keyvals['cp'] = [ 'include', 'exclude', 'acl', 'sse', 'sse-c', 'sse-c-key', 'sse-kms-key-id', 'sse-c-copy-source', 'sse-c-copy-source-key', 'storage-class', 'grants', 'website-redirect', 'content-type', 'cache-control', 'content-disposition', 'content-encoding', 'content-language', 'expires', 'source-region', 'page-size', 'request-payer', 'metadata', 'metadata-directive', 'expected-size', 'endpoint-url', 'output', 'query', 'profile', 'region', 'version', 'color', 'no-sign-request', 'ca-bundle', 'cli-read-timeout', 'cli-connect-timeout', ] def __init__(self, command, *args, **options): """ Args: command (str): can be: cp, ls, mv, rm, sync *args: positional arguments **options: key value options (e.g. profile) """ self.command = command self.args = args self._known_flags = self.cmd_known_flags.get(self.command, []) self._known_keyvals = self.cmd_known_keyvals.get(self.command, []) self._known_flags = self._known_flags + [k.replace('-', '_') for k in self._known_flags] self._known_keyvals = self._known_keyvals + [k.replace('-', '_') for k in self._known_keyvals] # Key / value CLI arguments self._keyval_options = {} self._flag_options = {} self.update(options) def update(self, arg=None, /, **options): """ Update key / value options. This function is aware of what options need to be flags versus key/values So quiet=True will result in `--quiet`, quiet=False will have include no option. Likewise profile=foo will result in `--profile foo` and profile=None will include no option. """ if arg is not None: options = ub.udict(arg) | options for k, v in options.items(): if k in self._known_flags: self._flag_options[k] = v elif k in self._known_keyvals: self._keyval_options[k] = v else: # Fallback to key/value self._keyval_options[k] = v def finalize(self): """ Returns: List[str]: commands suitable for passing to POpen """ parts = ['aws', 's3'] for k, v in self._flag_options.items(): if v: parts.extend(['--' + k.replace('_', '-')]) for k, v in self._keyval_options.items(): if v is not None: parts.extend(['--' + k.replace('_', '-'), v]) parts.append(self.command) parts.extend(self.args) return parts def run(self, check=True, shell=False, capture=False, verbose=3): """ Execute the S3 command Returns: Dict : ubelt cmd info dict """ final_command = self.finalize() print('final_command = {}'.format(ub.urepr(final_command, nl=1))) run_info = ub.cmd(final_command, verbose=verbose, shell=shell, capture=capture) if check: run_info.check_returncode() return run_info def ta2_collate_output(aws_base_command, local_region_dir, local_sites_dir, destination_s3_bucket, performer_suffix='KIT'): """ I think this is for putting the final system regions / sites into the place that T&E wants them. """ from glob import glob from geowatch.utils import util_fsspec assert aws_base_command is None, 'unused' def _get_suffixed_basename(local_path): base, ext = os.path.splitext(os.path.basename(local_path)) if performer_suffix is not None and performer_suffix != '': return "{}_{}{}".format(base, performer_suffix, ext) else: return "{}{}".format(base, ext) for region in glob(join(local_region_dir, '*.geojson')): region_s3_outpath = '/'.join((destination_s3_bucket, 'region_models', _get_suffixed_basename(region))) region = util_fsspec.FSPath.coerce(region) region_s3_outpath = util_fsspec.FSPath.coerce(region_s3_outpath) region.copy(region_s3_outpath) for site in glob(join(local_sites_dir, '*.geojson')): site_s3_outpath = '/'.join((destination_s3_bucket, 'site_models', _get_suffixed_basename(site))) site = util_fsspec.FSPath.coerce(site) site_s3_outpath = util_fsspec.FSPath.coerce(site_s3_outpath) site.copy(site_s3_outpath) def fixup_and_validate_site_and_region_models(region_dpath, site_dpath): """ Read, fix, and validate all site and region models. """ # Validate and fix all outputs from geowatch.geoannots import geomodels from geowatch.utils import util_gis region_infos = list(util_gis.coerce_geojson_datas(region_dpath, format='json')) site_infos = list(util_gis.coerce_geojson_datas(site_dpath, format='json')) region_models = [] for region_info in region_infos: fpath = region_info['fpath'] region = geomodels.RegionModel(**region_info['data']) region.fixup() fpath.write_text(region.dumps(indent=' ')) region_models.append(region) region.validate() site_models = [] for site_info in site_infos: fpath = site_info['fpath'] site = geomodels.SiteModel(**site_info['data']) site.fixup() fpath.write_text(site.dumps(indent=' ')) site.validate() site_models.append(site) check_region_and_site_models_agree(region_models, site_models) def check_region_and_site_models_agree(region_models, site_models): # Check that region / site models are consistent with each other all_site_summaries = list(ub.flatten(region.site_summaries() for region in region_models)) all_site_headers = [site.header for site in site_models] sitesum_dups = ub.find_duplicates(sitesum['properties']['site_id'] for sitesum in all_site_summaries) assert not sitesum_dups, 'site summaries have duplicate site-ids' site_dups = ub.find_duplicates(header['properties']['site_id'] for header in all_site_headers) assert not site_dups, 'site models have duplicate site-ids' siteid_to_summary = {sitesum['properties']['site_id']: sitesum for sitesum in all_site_summaries} siteid_to_header = {header['properties']['site_id']: header for header in all_site_headers} site_ids_in_summaries = set(siteid_to_summary) site_ids_in_header = set(siteid_to_header) if site_ids_in_summaries != site_ids_in_header: diff1 = site_ids_in_header - site_ids_in_header diff2 = site_ids_in_summaries - site_ids_in_header print(f'diff1 = {ub.urepr(diff1, nl=1)}') print(f'diff2 = {ub.urepr(diff2, nl=1)}') raise Exception('site-ids differe between summaries and site headers') # These properties should be the same in both variants common_properties = ['start_date', 'end_date', 'status'] errors = [] import math for site_id in site_ids_in_summaries: summary = siteid_to_summary[site_id] header = siteid_to_header[site_id] summary_props = ub.udict(summary['properties']) header_props = ub.udict(header['properties']) summary_common_props = summary_props & common_properties header_common_props = header_props & common_properties score1 = summary_props.get('score', 1.0) score2 = header_props.get('score', 1.0) has_diff = summary_common_props != header_common_props has_diff |= (not math.isclose(score1, score2)) if has_diff: print(f'FOUND INCONSISTENCY IN {site_id}') print(f'header_props = {ub.urepr(header_props, nl=1)}') print(f'summary_props = {ub.urepr(summary_props, nl=1)}') errors.append(site_id) # assert summary_props['start_date'] == header_props['start_date'] # assert summary_props['end_date'] == header_props['end_date'] # assert summary_props['status'] == header_props['status'] if errors: raise AssertionError(f'There were {len(errors)} / {len(site_ids_in_summaries)} inconsistent sites') else: print(f'Checked {len(site_ids_in_summaries)} sites which seem consistent') class NodeStateDebugger: """ Prints information about the current node that is helpful for debugging. Use in the smartflow CLI nodes. Maintains some internal state to keep things organized. Example: >>> from geowatch.utils.util_framework import * # NOQA >>> import ubelt as ub >>> watch_appdir_dpath = ub.Path.appdir('geowatch') >>> self = NodeStateDebugger() >>> self.print_environment() >>> self.print_current_state(watch_appdir_dpath) >>> config = {'foo': 'bar'} >>> self.print_local_invocation(config) """ def __init__(self): self.current_iteration = 0 def print_environment(self): """ Print info about what version of the code we are running on """ import os import geowatch print(' --- --- ') print(' * Print current version of the code & environment') ub.cmd('git log -n 1', verbose=3, cwd=ub.Path(geowatch.__file__).parent) print('geowatch.__version__ = {}'.format(ub.urepr(geowatch.__version__, nl=1))) print('geowatch.__file__ = {}'.format(ub.urepr(geowatch.__file__, nl=1))) print('os.environ = {}'.format(ub.urepr(dict(os.environ), nl=1))) # Check to make sure our times are in sync with amazon servers if 0: ub.cmd('date -u', verbose=3) ub.cmd('curl http://s3.amazonaws.com -v', verbose=3) print(' --- --- ') # TASK_IMAGE_NAME = os.environ.get('TASK_IMAGE_NAME', None) # if TASK_IMAGE_NAME: # self.print_local_invocation() def print_local_invocation(self, config=None): """ Attempt to build a string that will allow the user to start stepping through a local run of this smartflow step in IPython. Args: config (scriptconfig.DataConfig): the config used to invoke the script """ # Print out a command to help developers debug this image in a # local environment. # TODO: make this more generic for other people. # This is somewhat ill-defined because we can't know which # local machine the user will want to run on but here are issues # with the current command: # * the external code / data is jon-specific, # * the location of the mapped ingress directory is arbitrary. # * the location of the local .aws directory is usually correct. # * Not every image needs runtime=nvidia # * pip cache is only necessary if installing new packages, but # location is a reasonable default. # * there may be environment variables passed by smartflow that # also need to be passed here, but we dont want to have a huge # command, so there is a tradeoff. print('To run in a similar environment locally:') TASK_IMAGE_NAME = os.environ.get('TASK_IMAGE_NAME', None) from kwutil.slugify_ext import smart_truncate import kwutil # import uuid # unique_suffix = ub.hash_data(uuid.uuid4())[0:8] unique_suffix = kwutil.util_time.isoformat(kwutil.util_time.datetime.now(), pathsafe=1) if config is None: step_name = 'unknown_step' else: # TODO: ensure pathsafe name utility step_name = smart_truncate(config.__class__.__name__, 24, trunc_loc=1, head="_", tail="_") temp_location = f'debug_{step_name}_{unique_suffix}' create_local_env_command = ub.codeblock( fr''' # Set these environment variables to reasonable locations on your # host machine. LOCAL_WORK_DPATH=$HOME/temp/{temp_location}/ingress LOCAL_CODE_DPATH=$HOME/code LOCAL_DATA_DPATH=$HOME/data # Run the docker image mkdir -p "$LOCAL_WORK_DPATH" cd "$LOCAL_WORK_DPATH" docker run \ --gpus=all \ --volume "$LOCAL_WORK_DPATH":/tmp/ingress \ --volume "$LOCAL_CODE_DPATH":/extern_code:ro \ --volume "$LOCAL_DATA_DPATH":/extern_data:ro \ --volume "$HOME"/.aws:/root/.aws:ro \ --volume "$HOME"/.cache/pip:/pip_cache \ --env AWS_PROFILE=iarpa \ --env TASK_IMAGE_NAME={TASK_IMAGE_NAME} \ -it {TASK_IMAGE_NAME} bash ''') # Do we use --gpus all or --runtime=nvidia print() print(create_local_env_command) if 1: helper_text = ub.codeblock( ''' # Point the container repo at the repo on your host # system to pull in any updates for testing. git remote add host /extern_code/geowatch/.git git fetch host # may need to do some of: # BRANCH= # git reset --hard "host/$BRANCH" # git checkout "$BRANCH" # git pull "host/$BRANCH" ''') print() print(helper_text) # node_modname = 'geowatch.cli.smartflow.run_sc_datagen' if config is not None: node_modname = config.__class__.__module__ if node_modname == '__main__': # Try to get the real name import sys node_modname = ub.modpath_to_modname(sys.modules[node_modname].__file__) ipython_setup_command = ub.codeblock( f''' %load_ext autoreload %autoreload 2 from {node_modname} import * ''') # TODO: some configs which have quotes need to be escaped, # otherwise they break here. config_text = 'config = ' + ub.urepr(config, nl=1) ipython_setup_command = ipython_setup_command + '\n' + config_text print() print('# In IPython') # print(ipython_setup_command) wrapped_text = "ipython -i -c 'if 1:\n" wrapped_text += ub.indent(ipython_setup_command).replace("'", '"') wrapped_text += "\n'" print(wrapped_text) def print_current_state(self, dpath): print(f' --- --- ') dpath = ub.Path(dpath).resolve() # cwd_paths = sorted([p.resolve() for p in dpath.glob('*')]) # print('cwd_paths = {}'.format(ub.urepr(cwd_paths, nl=1))) self.print_directory_contents(dpath) print(f' * Print some disk and machine statistics ({self.current_iteration})') ub.cmd('df -h', verbose=3) from geowatch.utils import util_hardware with_units = bool(ub.modname_to_modpath('pint')) mem_info = util_hardware.get_mem_info(with_units=with_units) print('mem_info = {}'.format(ub.urepr(mem_info, nl=1, align=':'))) print(f' --- --- ') self.current_iteration += 1 def print_directory_contents(self, dpath): print(f'* Printing directory contents: {dpath}') dpath = ub.Path(dpath).resolve() # cwd_paths = sorted([p.resolve() for p in dpath.glob('*')]) # print('cwd_paths = {}'.format(ub.urepr(cwd_paths, nl=1))) if dpath.exists(): ub.cmd('ls -al', verbose=3, cwd=dpath) else: print(f'dpath={dpath} does not exist') class PrintLogger: """ Ducktype a logger """ def info(self, msg, *args, **kwargs): print(msg % args) def debug(self, msg, *args, **kwargs): print(msg % args) def error(self, msg, *args, **kwargs): print(msg % args) def warning(self, msg, *args, **kwargs): print(msg % args) def critical(self, msg, *args, **kwargs): print(msg % args) def _test_s3_hack(): """ An issue that can occur in will manifest as: [2023-10-27, 23:28:51 UTC] {pod_manager.py:342} INFO - botocore.exceptions.ClientError: An error occurred (RequestTimeTooSkewed) when calling the PutObject operation: The difference between the request time and the current time is too large. botocore.exceptions.ClientError: An error occurred (RequestTimeTooSkewed) when calling the PutObject operation: The difference between the request time and the current time is too large. File "/root/code/watch/geowatch/cli/smartflow_egress.py", line 174, in smartflow_egress local_path.copy(asset_s3_outpath) File "/root/.pyenv/versions/3.11.2/lib/python3.11/site-packages/s3fs/core.py", line 140, in _error_wrapper raise err PermissionError: The difference between the request time and the current time is too large. """ from geowatch.utils import util_fsspec util_fsspec.S3Path._new_fs(profile='iarpa') s3_dpath = util_fsspec.S3Path.coerce('s3://smartflow-023300502152-us-west-2/smartflow/env/kw-v3-0-0/work/preeval17_batch_v120/batch/kit/CN_C000/2021-08-31/split/mono/products/dummy-test') s3_dpath.parent.ls() dst_dpath = s3_dpath dpath = util_fsspec.LocalPath.appdir('geowatch/fsspec/test-s3-hack/').ensuredir() # dst_dpath = (dpath / 'dst') src_dpath = (dpath / 'src').ensuredir() for i in range(100): (src_dpath / f'file_{i:03d}.txt').write_text('hello world' * 100) from fsspec.callbacks import TqdmCallback callback = TqdmCallback(tqdm_kwargs={"desc": "Your tqdm description"}) src_dpath.copy(dst_dpath, callback=callback) from geowatch.utils import util_framework aws_cmd = util_framework.AWS_S3_Command( 'sync', src_dpath, dst_dpath, **dst_dpath.fs.storage_options) print(aws_cmd.finalize()) aws_cmd.run()