Source code for esm_analysis.esm_analysis

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Author: Paul Gierz <pgierz>
# @Date:   2020-02-04T07:04:02+01:00
# @Email:  pgierz@awi.de
# @Filename: esm_analysis.py
# @Last modified by:   pgierz
# @Last modified time: 2020-02-04T08:37:01+01:00
"""
The ESM Analysis module allows for creation of several common analyis from
Python objects.

The main object you probably want to use is: ``EsmAnalysis``. You can create one
like this, if you are currently working within an experiment directory::

    >>> analyser = EsmAnalysis()

Optionally, you can give it a path where to start from::

    >>> analyser = EsmAnalysis(exp_base="/work/ba0989/a270077/AWICM_PISM/LGM_011")

You can also tell it where to store analysis::

    >>> analyser = EsmAnalysis(preferred_analysis_dir="/work/ba0989/a270077/store_analysis_here")

Once you have created the ``analyzer`` object, you can use the attached methods
to quickly get some typical analyses. The methods always return an
``xarray.Dataset`` object. Typically, the only required argument is the variable
name.


    >>> t2m_fldmean = analyser.fldmean("temp2")
"""

import glob
import importlib
import logging
import os
import re
import sys

import cdo
import yaml


[docs]def clean_top_of_tree(basedir): """ Cleans up top of experiment tree by adding comment character to the first line Parameters ---------- basedir : str Whre the file ``.top_of_exp_tree`` should be found Returns ------- None """ with open(os.path.join(basedir, ".top_of_exp_tree")) as f: contents = f.readlines() new_contents = [] for l in contents: if l.startswith("Top of"): l = "# " + l new_contents.append(l) different_contents = contents != new_contents try: with open(os.path.join(basedir, ".top_of_exp_tree"), "w") as f: for l in new_contents: f.write(l) except PermissionError: if different_contents: raise PermissionError( "Needed to fixup .top_of_tree to include comment character for YAML parsing; but permission is denied!" ) else: logging.warning( "Probably OK to proceed; but I can't write changes to %s" % os.path.join(basedir, ".top_of_exp_tree") )
[docs]def load_yaml(f): """Returns dictionary of YAML file ``f``""" with open(f) as yml: for l in yml.readlines(): logging.debug(l) with open(f) as yml: d = yaml.load(yml, Loader=yaml.SafeLoader) return d or {}
[docs]def walk_up(bottom): """ mimic os.walk, but walk 'up' instead of down the directory tree Parameters ---------- bottom: str Where to start walking up from Yields ------ Tuple of (bottom, dirs, nondirs) """ bottom = os.path.realpath(bottom) # Get files in current dir try: names = os.listdir(bottom) except Exception as e: print(e) return dirs, nondirs = [], [] for name in names: if os.path.isdir(os.path.join(bottom, name)): dirs.append(name) else: nondirs.append(name) yield bottom, dirs, nondirs new_path = os.path.realpath(os.path.join(bottom, "..")) # See if we are at the top if new_path == bottom: return for x in walk_up(new_path): yield x
################################################################################ # NOTES: # # This needs to be refactored correctly. Actually, we need two big classes, # not just one. # # The first needs to be for components, the second for the entire experiment. # Otherwise you inherit methods into the components that actually don't work, # since they rely on attributes for the entire setup. ################################################################################
[docs]class AnalysisComponent(object): pass
[docs]class EsmAnalysis(object): def __init__(self, exp_base=None, preferred_analysis_dir=None): """ Base Class for Analysis, other component specific analysis classes should inherit from this one Sets up the following directories and attributes from anywhere within the experiment tree: + ``EXP_ID`` + ``ANALYSIS_DIR`` + ``CONFIG_DIR`` + ``FORCING_DIR`` + ``INPUT_DIR`` + ``OUTDATA_DIR`` + ``RESTART_DIR`` + ``SCRIPT_DIR`` Parameters ---------- preferred_analysis_dir : str Where the analysis files should be stored, defaults to the current experiment. """ # Figure out what the top of the experiment is by finding upwards a # file called .top_of_exp_tree if not exp_base: for bottom, _, files in walk_up(os.getcwd()): if ".top_of_exp_tree" in files: self.EXP_BASE = bottom break else: self.EXP_BASE = input( "Enter the top-level directory of your experiment: " ) basedir = os.path.dirname(self.EXP_BASE) if not os.path.exists(basedir): print("Generating directories for %s" % basedir) input("Press Enter to continue, Ctrl-C to canel...") try: os.makedirs(basedir) except PermissionError: print("Sorry, you don't have permission to write here!") print("Making marker file .top_of_exp_tree in %s" % basedir) input("Press Enter to continue, Ctrl-C to cancel...") try: with open(self.EXP_BASE + "/.top_of_exp_tree", "w") as f: os.utime(f, None) except PermissionError: print("Sorry, you don't have permission to write here!") else: self.EXP_BASE = exp_base self.EXP_ID = os.path.basename(self.EXP_BASE) clean_top_of_tree(self.EXP_BASE) self._config = load_yaml(os.path.join(self.EXP_BASE, ".top_of_exp_tree")) logging.debug(self._config) self.ANALYSIS_DIR = self.EXP_BASE + "/analysis/" self.CONFIG_DIR = self.EXP_BASE + "/config/" self.FORCING_DIR = self.EXP_BASE + "/forcing/" self.INPUT_DIR = self.EXP_BASE + "/input/" self.OUTDATA_DIR = self.EXP_BASE + "/outdata/" self.RESTART_DIR = self.EXP_BASE + "/restart/" self.SCRIPT_DIR = self.EXP_BASE + "/scripts/" # Here's yer CDO: self.CDO = cdo.Cdo() # Ensure that the analysis directory exists for the top: logging.info("Before call: %s", self.ANALYSIS_DIR) self.create_analysis_dir(preferred_analysis_dir=preferred_analysis_dir) logging.info("After call: %s", self.ANALYSIS_DIR) # Make a list to hold the analysis components self._analysis_components = []
[docs] def create_analysis_dir(self, preferred_analysis_dir=None): """ Create the analysis directory and any intermediate directories if needed. Parameters ---------- preferred_analysis_dir : str The location where analysis should be stored Notes ----- If you give an argument to ``preferred_analysis_dir``, the attribute ``self.ANALYSIS_DIR`` is changed to this. """ if preferred_analysis_dir is not None: logging.info("Modifying self.ANALYSIS_DIR:") self.ANALYSIS_DIR = preferred_analysis_dir logging.info(self.ANALYSIS_DIR) if not os.path.isdir(self.ANALYSIS_DIR): logging.info("Creating directory: %s", self.ANALYSIS_DIR) os.makedirs(self.ANALYSIS_DIR)
[docs] def initialize_analysis_components(self, preferred_analysis_dir=None): """ Creates analysis objects for each component found in the ``OUTDATA_DIR`` directory. It is assumed that the component analysis object can be initialized without any arguments. If no class has been defined yet, a warning is sent. """ for component in os.listdir(self.OUTDATA_DIR): try: # TODO: I don't really like this, it'd be nicer with relative # imports (maybe? I am not sure...) logging.debug("Trying to import esm_analysis.components." + component) comp_module = importlib.import_module( "esm_analysis.components." + component ) logging.debug("Import worked!") comp_analyzer = getattr( comp_module, component.capitalize() + "Analysis" )(exp_base=self.EXP_BASE, preferred_analysis_dir=preferred_analysis_dir) logging.debug("Init worked!") # PG: Not sure I like the next two lines, they already confuse # me 10 minutes after I wrote them... if preferred_analysis_dir: component_analysis_dir = preferred_analysis_dir + "/" + component comp_analyzer.create_analysis_dir( preferred_analysis_dir=component_analysis_dir ) else: comp_analyzer.create_analysis_dir() logging.debug("Finished setting up analysis dir for " + component) # Make it a object attribute for access interactively: setattr(self, comp_analyzer.NAME, comp_analyzer) logging.debug("Setting attributes for the big analyser") # Put it in the list for easy access from inside the class: self._analysis_components.append(comp_analyzer) except: logging.warning( "Oops: Trouble initializing or no analysis class available for: %s" % component ) logging.error("Error was: %s", sys.exc_info()[0]) if component == "fesom": raise
[docs] def determine_variable_dict_from_code_files(self): """ A generic method to create a dictionary containing file patterns and variable information for each file pattern found in the ``OUTDATA_DIR`` folder. Notes ----- This is meant to be overloaded!! The default implementation works for ECHAM6 and JSBACH. Maybe it is better to completely move this to just those two classes, or a base class which both can build on. Returns ------- A nested dictionary. The outermost key is a file pattern, with the value/inner key is the variable short name. The inner value is a dictionary of code_number, levels, short_name, long_name. """ logging.debug("Starting to determine variable dict...") variables = {} # CLEANUP: This is a lot of duplicate code... all_code_files = glob.glob( self.OUTDATA_DIR + self.EXP_ID + "_" + self.NAME + "*.codes" ) # Remove the file path: code_files = [ code_file.replace(self.OUTDATA_DIR, "") for code_file in all_code_files ] code_streams = [ code_file.replace(self.EXP_ID, "").replace(".codes", "") for code_file in code_files ] code_streams_no_digits = { "".join(r"\d" if c.isdigit() else c for c in code_file) for code_file in code_streams } code_streams_full = { self.OUTDATA_DIR + self.EXP_ID + stream + ".codes" for stream in code_streams_no_digits } code_regexes = {re.compile(stream) for stream in code_streams_full} deduped_streams = [ list(filter(r.match, all_code_files))[0] for r in code_regexes ] for f in deduped_streams: logging.debug("Working on: %s", f) file_stream = ( f.replace(self.OUTDATA_DIR, "") .replace(self.EXP_ID + "_" + self.NAME + "_", "") .replace(".codes", "") ) file_stream = "".join(r"\d" if c.isdigit() else c for c in file_stream) # BUG: This depends on knowing that the file extension is GRIB. This might not always be the case... file_pattern = ( self.OUTDATA_DIR + self.EXP_ID + "_" + self.NAME + "_" + file_stream + ".grb" ) # Replace digits with regex-able expression: # # POTENTIAL BUG: if two streams are very similar, e.g. # EXP_ID_echam6_echam_stream123.grb and # EXP_ID_echam6_echam_stream456.grb, these will end up being the # same; and this fails... logging.debug("File pattern will be: %s", file_pattern) variables[file_pattern] = {} with open(f) as code_file: code_file_list = [" ".join(line.split()) for line in code_file] for entry in code_file_list: code_number, levels, short_name, _, _, long_name = entry.split( " ", 5 ) variables[file_pattern][short_name] = { "code_number": code_number, "levels": levels, "short_name": short_name, "long_name": long_name, } logging.debug("Variable dict given back will be: %s", variables) return variables
def _get_files_for_variable_short_name_single_component(self, varname): fpattern_list = [] for (file_pattern, short_names_in_file_pattern) in self._variables.items(): for short_name in short_names_in_file_pattern: logging.debug("Checking: %s = %s", short_name, varname) if short_name == varname: fpattern_list.append( sorted( list( filter( re.compile(file_pattern).match, [ self.OUTDATA_DIR + f for f in os.listdir(self.OUTDATA_DIR) ], ) ) ) ) # fpattern_list.append(sorted(glob.glob(file_pattern))) if len(fpattern_list) > 1: print("Multiple file patterns have requested variable %s" % varname) for index, fpattern in enumerate(fpattern_list): print("[%s] %s" % (index + 1, fpattern[0])) index_choice = int(input("Please choose a filepattern: ")) - 1 return fpattern_list[index_choice] return fpattern_list[0]
[docs] def get_component_for_variable_short_name(self, varname): """ Checks all known component and gets a list of files that should be used for a specific variable name. There is currently an implicit assumption for the following to be true: 1. Each ``component`` has a private attribute ``_variables`` 1. This attribute should be a dictionary 1. The dictionary needs to have a "file_pattern" as a key, and all short names contained in this file pattern as values. Parameters ---------- varname : str The short variable name which is being looked for Returns ------- file_pattern_list, component : tuple A tuple of: 1. A list for all files currently available with the varname. Note that sorting currently does not have a specific mechanism, so it **probably** sorts alphabetically/numerically. 2. The component object for analysis with these files. """ fpattern_list = [] logging.debug(self._analysis_components) for component in self._analysis_components: for ( file_pattern, short_names_in_file_pattern, ) in component._variables.items(): for short_name in short_names_in_file_pattern: logging.debug("Checking: %s = %s", short_name, varname) if short_name == varname: fpattern_list.append( ( sorted( list( filter( re.compile(file_pattern).match, [ component.OUTDATA_DIR + f for f in os.listdir( component.OUTDATA_DIR ) ], ) ) ), component, ) ) # fpattern_list.append( # (sorted(glob.glob(file_pattern)), component) # ) multi_comps = [] for index, (fpattern, component) in enumerate(fpattern_list): if component not in multi_comps: multi_comps.append(component) if len(multi_comps) > 1: print("Multiple components have requested variable %s" % varname) for index, component in enumerate(multi_comps): print("[%s] %s" % (index + 1, component.NAME)) index_choice = int(input("Please choose a component: ")) - 1 return fpattern_list[index_choice][0], multi_comps[index_choice] else: return fpattern_list[0][0], multi_comps[0]
# Some common operations. If a specific model needs to do this in a # different way, you can overload the methods (e.g. FESOM needs to do # weighting of the triangles to get correct fldmean)
[docs] def fldmean(self, varname): """ Generates a field mean over the entire model domain for a the specified varname. """ flist, component = self.get_component_for_variable_short_name(varname) return component.fldmean(varname, flist)
[docs] def ymonmean(self, varname): """ Generates a ymonmean over the entire model domain for the specified varname. """ flist, component = self.get_component_for_variable_short_name(varname) return component.ymonmean(varname, flist)
[docs] def yseasmean(self, varname): """ Generates a yseasmean over the entire model domain for the specified varname. """ flist, component = self.get_component_for_variable_short_name(varname) return component.yseasmean(varname, flist)
[docs] def newest_climatology(self, varname): _, component = self.get_component_for_variable_short_name(varname) return component.newest_climatology(varname)