import os
from enum import Enum
from functools import reduce
from hestia_earth.schema import TermTermType
from hestia_earth.utils.lookup import download_lookup, get_table_value, column_name, extract_grouped_data_closest_date
from hestia_earth.utils.model import find_primary_product
from hestia_earth.utils.tools import list_sum, safe_parse_float
from hestia_earth.aggregation.log import debugRequirements
from . import value_difference
from .emission import all_in_system_boundary, is_in_system_boundary
from .lookup import production_quantity_lookup, production_quantity_country
FAOSTAT_PRODUCTION_LOOKUP_COLUMN = 'cropGroupingFaostatProduction'
YIELD_THRESHOLD = 20
REGION_PRODUCTION_THRESHOLD = 75
KEY = 'aggregatedQualityScore'
KEY_MAX = KEY + 'Max'
[docs]class ScoreKeys(Enum):
YIELD = 'yield'
NB_CYCLES = 'nb_cycles'
COMPLETENESS = 'completeness'
EMISSIONS_SYSTEM_BOUNDARY = 'emissions_system_boundary'
REGION_PRODUCTION_RATIO = 'countries_production_ratio'
def _faostat_crop_grouping(term_id: str):
lookup = download_lookup('crop.csv')
return get_table_value(lookup, 'termid', term_id, column_name(FAOSTAT_PRODUCTION_LOOKUP_COLUMN))
def _faostat_crop_yield(country_id: str, grouping: str, date: int):
lookup = download_lookup(f"region-crop-{FAOSTAT_PRODUCTION_LOOKUP_COLUMN}-yield.csv")
value = get_table_value(lookup, 'termid', country_id, column_name(grouping))
return safe_parse_float(extract_grouped_data_closest_date(value, date), 0) / 10
def _calculate_score_yield(cycle: dict, *args):
country_id = cycle.get('site', {}).get('country', {}).get('@id')
year = int(cycle.get('endDate'))
product = find_primary_product(cycle)
grouping = _faostat_crop_grouping((product or {}).get('term', {}).get('@id'))
faostat_yield = _faostat_crop_yield(country_id, grouping, year) if grouping else None
product_yield = list_sum(product.get('value')) if product else None
delta = value_difference(product_yield, faostat_yield) * 100 if faostat_yield and product_yield else 0
debugRequirements(id=cycle.get('id'),
country_id=country_id,
cycle_year=year,
faostat_grouping=grouping,
faostat_yield=faostat_yield,
product_yield=product_yield,
yield_delta=delta,
yield_delta_min=YIELD_THRESHOLD)
return delta <= YIELD_THRESHOLD
def _production_delta(cycle: dict, countries: list, lookup, lookup_column: str):
country_id = cycle.get('site', {}).get('country', {}).get('@id')
year = int(cycle.get('endDate'))
global_value = production_quantity_country(lookup, lookup_column, year, country_id=country_id)
country_values = [
production_quantity_country(lookup, lookup_column, year, country_id=country.get('@id')) for country in countries
]
total_value = list_sum(country_values)
delta = (total_value / global_value) * 100 if global_value and total_value else 0
debugRequirements(id=cycle.get('id'),
country_id=country_id,
cycle_year=year,
region_production_quantity=global_value,
countries_production_quantity=total_value,
production_delta=delta,
production_delta_min=REGION_PRODUCTION_THRESHOLD)
return delta
def _calculate_score_region_production(cycle: dict, countries: list):
product = find_primary_product(cycle)
term = (product or {}).get('term', {})
lookup, lookup_column = production_quantity_lookup(term)
delta = _production_delta(cycle, countries, lookup, lookup_column) if lookup is not None else 0
return delta >= REGION_PRODUCTION_THRESHOLD
def _calculate_score_nb_cycles(cycle: dict, *args):
nb_observations = cycle.get('numberOfCycles', 1)
debugRequirements(id=cycle.get('id'),
nb_observations=nb_observations)
return nb_observations >= 50
def _calculate_score_completeness(cycle: dict, *args):
values = [v for v in cycle.get('completeness', {}).values() if isinstance(v, bool)]
is_complete = all(values)
debugRequirements(id=cycle.get('id'),
is_complete=is_complete)
return is_complete
def _calculate_score_emissions_system_boundary(cycle: dict, *args):
product = find_primary_product(cycle)
term = (product or {}).get('term', {})
site_type = cycle.get('site', {}).get('siteType')
all_emissions_ids = all_in_system_boundary(term, site_type)
emissions_ids = list(set([e.get('term', {}).get('@id') for e in cycle.get('emissions', [])]))
emissions_ids = list(filter(is_in_system_boundary, emissions_ids))
missing_emissions = list(filter(lambda term_id: term_id not in emissions_ids, all_emissions_ids))
all_included = len(missing_emissions) == 0
debugRequirements(id=cycle.get('id'),
total_emissions_in_system_boundary=len(all_emissions_ids),
included_emissions=len(emissions_ids),
all_included=all_included,
missing_emissions=';'.join(missing_emissions))
return all_included
_FUNC_SCORE_KEY = {
ScoreKeys.YIELD: _calculate_score_yield,
ScoreKeys.COMPLETENESS: _calculate_score_completeness,
ScoreKeys.EMISSIONS_SYSTEM_BOUNDARY: _calculate_score_emissions_system_boundary,
ScoreKeys.NB_CYCLES: _calculate_score_nb_cycles,
ScoreKeys.REGION_PRODUCTION_RATIO: _calculate_score_region_production,
}
def _should_run_crop(cycle: dict, *args):
product = find_primary_product(cycle)
term = (product or {}).get('term', {})
return term.get('termType') == TermTermType.CROP.value
_RUN_SCORE_KEY = {
ScoreKeys.YIELD: _should_run_crop,
ScoreKeys.REGION_PRODUCTION_RATIO: lambda cycle, countries: len(countries) > 0 and _should_run_crop(cycle)
}
[docs]def calculate_score(cycle: dict, countries: list = []):
score_keys = [e for e in ScoreKeys if _RUN_SCORE_KEY.get(e, lambda *args: True)(cycle, countries)]
score = reduce(
lambda total, key: total + (1 if _FUNC_SCORE_KEY.get(key)(cycle, countries) else 0), score_keys, 0
)
return {
**cycle,
KEY: score,
KEY_MAX: len(score_keys)
}
[docs]def has_min_score(data: dict):
# aggregation is only valid if score is more than max-2 by default
return all([
# if ImpactAssessment is created without a Cycle, this will ignore the IA.
data.get(KEY, 0) >= 0,
data.get(KEY_MAX, 0) - data.get(KEY, 0) <= int(os.getenv('AGGREGATION_MAX_SCORE_DIFF', '2'))
])