Source code for hestia_earth.aggregation.models.countries

from functools import reduce
from hestia_earth.utils.lookup import download_lookup, get_table_value, column_name, extract_grouped_data_closest_date
from hestia_earth.utils.tools import non_empty_list, safe_parse_float, flatten

from hestia_earth.aggregation.log import debugWeights, debugRequirements
from hestia_earth.aggregation.utils import parse_node_value, _end_date_year, _min, _max, _sd
from hestia_earth.aggregation.utils.term import (
    DEFAULT_COUNTRY_ID, _blank_node_completeness, _format_organic, _format_irrigated
)


def _organic_weight(country_id: str, year: int):
    lookup = download_lookup('region-standardsLabels-isOrganic.csv')
    data = get_table_value(lookup, 'termid', country_id, 'organic')
    # default to 0 => assume nothing organic
    value = safe_parse_float(extract_grouped_data_closest_date(data, year), None)

    debugRequirements(country_id=country_id, year=year,
                      organic_weight=value)

    return min(1, value / 100) if value else None


def _irrigated_weight(country_id: str, year: int, siteType: str = 'all'):
    lookup = download_lookup('region-irrigated.csv')

    total_area_data = get_table_value(lookup, 'termid', country_id, column_name(siteType))
    # default to 1 => assume whole area
    total_area = safe_parse_float(extract_grouped_data_closest_date(total_area_data, year), 1)

    irrigated_data = get_table_value(lookup, 'termid', country_id, column_name(f"{siteType} irrigated"))
    irrigated = safe_parse_float(extract_grouped_data_closest_date(irrigated_data, year), None)

    debugRequirements(country_id=country_id, year=year,
                      site_type=siteType,
                      total_area=total_area,
                      irrigated_area=irrigated)

    return irrigated / total_area if irrigated else None


def _add_weights(country_id: str, year: int):
    def apply(prev: dict, node: dict):
        organic_weight = _organic_weight(country_id, year) or _organic_weight(DEFAULT_COUNTRY_ID, year)
        irrigated_weight = (
            _irrigated_weight(country_id, year, 'cropland') or
            _irrigated_weight(country_id, year, 'agriculture') or
            _irrigated_weight(country_id, year) or
            0
        )
        weight = (
            organic_weight if node.get('organic', False) else 1-organic_weight
        ) * (
            irrigated_weight if node.get('irrigated', False) else 1-irrigated_weight
        )
        return {**prev, node.get('id'): {'weight': weight, 'completeness': node.get('completeness', {})}}
    return apply


def _weighted_value(weights: dict, key: str = 'value'):
    def apply(node: dict):
        value = parse_node_value(node.get(key))
        weight = weights.get(node.get('id'), {}).get('weight')
        return None if (value is None or weight is None) else (value, weight)
    return apply


def _missing_weights(nodes: list):
    completeness_key = _blank_node_completeness(nodes[0])
    keys = ['-'.join([
        _format_organic(node.get('organic')), _format_irrigated(node.get('irrigated'))
    ]) for node in nodes]

    def apply(item: tuple):
        key, weight = item
        is_complete = weight.get('completeness', {}).get(completeness_key, False)
        is_missing = all([k not in key for k in keys])
        return (0, weight.get('weight')) if is_complete and is_missing else None
    return apply


def _aggregate_weighted(term: dict, nodes: list, weights: dict):
    # account for complete missing values
    missing_weights = non_empty_list(map(_missing_weights(nodes), weights.items()))

    values = non_empty_list(map(_weighted_value(weights), nodes)) + missing_weights

    observations = sum(flatten([n.get('observations', 1) for n in nodes])) + len(missing_weights)

    total_weight = sum(weight for _v, weight in values)
    weighted_values = [value * weight for value, weight in values]
    value = sum(weighted_values) / (total_weight if total_weight != 0 else 1)

    # get min/max from weighted min/max values
    min_values = [v for v, _w in non_empty_list(map(_weighted_value(weights, 'min'), nodes))]
    max_values = [v for v, _w in non_empty_list(map(_weighted_value(weights, 'max'), nodes))]

    return {
        'nodes': nodes,
        'node': nodes[0],
        'term': term,
        'value': value if len(values) > 0 else None,
        'min': _min(min_values if len(min_values) else weighted_values, observations),
        'max': _max(max_values if len(min_values) else weighted_values, observations),
        'sd': _sd(weighted_values),
        'observations': observations
    }


def _aggregate_nodes(aggregate_key: str, weights: dict):
    def aggregate(data: dict):
        def aggregate(term_id: str):
            blank_nodes = data.get(aggregate_key).get(term_id)
            term = blank_nodes[0].get('term')
            return _aggregate_weighted(term, blank_nodes, weights)

        aggregates = flatten(map(aggregate, data.get(aggregate_key, {}).keys()))
        return (aggregates, data) if len(aggregates) > 0 else ([], {})

    def aggregate_multiple(data: dict):
        return reduce(
            lambda prev, curr: {**prev, curr: _aggregate_nodes(curr, weights)(data)}, aggregate_key, {}
        )

    return aggregate if isinstance(aggregate_key, str) else aggregate_multiple


[docs]def aggregate(aggregate_key: str, groups: dict) -> list: nodes = next((data.get('nodes') for data in groups.values() if len(data.get('nodes', [])) > 0), []) first_node = nodes[0] country_id = first_node.get('country').get('@id') year = _end_date_year(first_node) weights = reduce(_add_weights(country_id, year), nodes, {}) debugWeights(weights) return non_empty_list(map(_aggregate_nodes(aggregate_key, weights), groups.values()))