Source code for cerf.process

"""Processing module for CERF

@author Chris R. vernon
@email chris.vernon@pnnl.gov

License:  BSD 2-Clause, see LICENSE and DISCLAIMER files

"""

import logging
import os
import time

import pandas as pd
from joblib import Parallel, delayed

import cerf.utils as util
from cerf.model import Model
from cerf.process_region import process_region


[docs]def generate_model(config_file=None, config_dict={}, initialize_site_data=None, log_level='info'): """Generate model instance for use in parallel applications. :param config_file: Full path with file name and extension to the input config.yml file :type config_file: str :param config_dict: Optional instead of config_file. Configuration dictionary. :type config_dict: dict :param initialize_site_data: None if no initialization is required, otherwise either a CSV file or Pandas DataFrame of siting data bearing the following required fields: xcoord: the X coordinate of the site in meters in USA_Contiguous_Albers_Equal_Area_Conic (EPSG: 102003) ycoord: the Y coordinate of the site in meters in USA_Contiguous_Albers_Equal_Area_Conic (EPSG: 102003) retirement_year: the year (int four digit, e.g., 2050) that the power plant is to be decommissioned buffer_in_km: the buffer around the site to apply in kilometers :param log_level: Log level. Options are 'info' and 'debug'. Default 'info' :type log_level: str """ return Model(config_file, config_dict, initialize_site_data, log_level)
[docs]def cerf_parallel(model, data, write_output=True, n_jobs=-1, method='sequential'): """Run all regions in parallel. :param model: Instantiated CERF model class containing configuration options :type model: class :param data: Data from cerf.stage.Stage containing NLC and suitability arrays :param config_file: Full path with file name and extension to the input config.yml file :type config_file: str :param write_output: Write output as a raster to the output directory specified in the config file :type write_output: bool :param n_jobs: The number of processors to utilize. Default is -1 which is all but 1. :type n_jobs: int :param method: Backend parallelization method used in Joblib. Default is `sequential` to manage overhead for local runs. Options for advanced configurations are: `loky`, `threading`, and `multiprocessing`. See https://joblib.readthedocs.io/en/latest/parallel.html for details. :type method: str :return: A 2D arrays containing sites as the technology ID per grid cell. All non-sited grid cells are given the value of NaN. """ # start time for parallel run t0 = time.time() # run all regions in parallel results = Parallel(n_jobs=n_jobs, backend=method)(delayed(process_region)(target_region_name=i, settings_dict=model.settings_dict, technology_dict=model.technology_dict, technology_order=model.technology_order, expansion_dict=model.expansion_dict, regions_dict=model.regions_dict, suitability_arr=data.suitability_arr, lmp_arr=data.lmp_arr, generation_arr=data.generation_arr, operating_cost_arr=data.operating_cost_arr, nov_arr=data.nov_arr, ic_arr=data.ic_arr, nlc_arr=data.nlc_arr, zones_arr=data.zones_arr, xcoords=data.xcoords, ycoords=data.ycoords, indices_2d=data.indices_2d, randomize=model.settings_dict.get('randomize', True), seed_value=model.settings_dict.get('seed_value', 0), verbose=model.settings_dict.get('verbose', False), write_output=False) for i in model.regions_dict.keys()) logging.info(f"All regions processed in {round((time.time() - t0), 7)} seconds.") logging.info("Aggregating outputs...") # create a data frame to hold the outputs df = pd.DataFrame(util.empty_sited_dict()).astype(util.sited_dtypes()) # add in the initialized siting data from a previous years run if so desired if model.initialize_site_data is not None: df = pd.concat([df, data.init_df]) # combine the outputs for all regions for i in results: # ensure some sites were able to be sited for the target region if i is not None: df = pd.concat([df, i.run_data.sited_df]) if write_output: # write output CSV out_csv = os.path.join(model.settings_dict.get('output_directory'), f"cerf_sited_{model.settings_dict.get('run_year')}_conus.csv") df.to_csv(out_csv, index=False) return df
[docs]def run(config_file=None, config_dict={}, write_output=True, n_jobs=-1, method='sequential', initialize_site_data=None, log_level='info'): """Run all CERF regions for the target year. :param config_file: Full path with file name and extension to the input config.yml file :type config_file: str :param config_dict: Optional instead of config_file. Configuration dictionary. :type config_dict: dict :param write_output: Write output as a raster to the output directory specified in the config file :type write_output: bool :param n_jobs: The number of processors to utilize. Default is -1 which is all but 1. :type n_jobs: int :param method: Backend parallelization method used in Joblib. Default is sequential to manage overhead for local runs. Options for advanced configurations are: loky, threading, and multiprocessing. See https://joblib.readthedocs.io/en/latest/parallel.html for details. :type method: str :param initialize_site_data: None if no initialization is required, otherwise either a CSV file or Pandas DataFrame of siting data bearing the following required fields: xcoord: the X coordinate of the site in meters in USA_Contiguous_Albers_Equal_Area_Conic (EPSG: 102003) ycoord: the Y coordinate of the site in meters in USA_Contiguous_Albers_Equal_Area_Conic (EPSG: 102003) retirement_year: the year (int four digit, e.g., 2050) that the power plant is to be decommissioned buffer_in_km: the buffer around the site to apply in kilometers :param log_level: Log level. Options are 'info' and 'debug'. Default 'info' :type log_level: str :return: A data frame containing each sited power plant and their attributes """ try: # instantiate CERF model model = generate_model(config_file, config_dict, initialize_site_data=initialize_site_data, log_level=log_level.lower()) # process supporting data data = model.stage() # process all CERF regions in parallel and store the result as a 2D arrays containing sites as # the technology ID per grid cell. All non-sited grid cells are given the value of NaN. df = cerf_parallel(model=model, data=data, write_output=write_output, n_jobs=n_jobs, method=method) logging.info(f"CERF model run completed in {round(time.time() - model.start_time, 7)} seconds") finally: # remove logging handlers logger = logging.getLogger() for handler in logger.handlers[:]: handler.close() logger.removeHandler(handler) logging.shutdown() return df