Source code for hestia_earth.aggregation.models.terms

from functools import reduce
from hestia_earth.utils.tools import non_empty_list

from hestia_earth.aggregation.log import logger
from hestia_earth.aggregation.utils import parse_node_value, _min, _max, _sd
from hestia_earth.aggregation.utils.term import _blank_node_completeness


def _debugNodes(nodes: list):
    for node in nodes:
        if node.get('yield'):
            logger.debug(
                'id=%s, yield=%s, weight=%s, ratio=%s/%s, organic=%s, irrigated=%s',
                node.get('@id'),
                round(node.get('yield')),
                100/len(nodes),
                1,
                len(nodes),
                node.get('organic'),
                node.get('irrigated')
            )


def _weighted_value(node: dict):
    value = parse_node_value(node.get('value'))
    weight = node.get('productValue', 1)
    return None if (value is None or weight is None) else (value, weight)


def _aggregate(nodes: list, completeness: dict):
    first_node = nodes[0]
    term = first_node.get('term')
    completeness_key = _blank_node_completeness(first_node)
    completeness_count = len([node for node in nodes if node.get('completeness', False)])
    completeness_count_total = completeness.get(completeness_key, 0)
    completeness_count_missing = (
        completeness_count_total - completeness_count
    ) if completeness_count_total > completeness_count else 0

    values = non_empty_list(map(_weighted_value, nodes))
    # account for complete nodes which have no value
    values_with_completeness = values + ([(0, 1)] * completeness_count_missing)
    values = [value for value, _w in values_with_completeness]
    total_weight = sum(weight for _v, weight in values_with_completeness)
    weighted_values = [value * weight for value, weight in values_with_completeness]
    value = sum(weighted_values) / (total_weight if total_weight != 0 else 1)

    return {
        'nodes': nodes,
        'node': first_node,
        'term': term,
        'value': value,
        'max': _max(values),
        'min': _min(values),
        'sd': _sd(values),
        'observations': len(values)
    } if len(values) > 0 else None


def _aggregate_term(aggregates_map: dict, completeness: dict):
    def aggregate(term_id: str):
        blank_nodes = [node for node in aggregates_map.get(term_id, []) if not node.get('deleted')]
        return _aggregate(blank_nodes, completeness) if len(blank_nodes) > 0 else None
    return aggregate


def _aggregate_nodes(aggregate_key: str, index=0):
    def aggregate(data: dict):
        if index == 0:
            _debugNodes(data.get('nodes', []))
        completeness = data.get('completeness', {})
        terms = data.get(aggregate_key).keys()
        aggregates = non_empty_list(map(_aggregate_term(data.get(aggregate_key), completeness), terms))
        return (aggregates, data) if len(aggregates) > 0 else ([], {})

    def aggregate_multiple(data: dict):
        return reduce(
            lambda prev, curr: {**prev, curr[1]: _aggregate_nodes(curr[1], curr[0])(data)}, enumerate(aggregate_key), {}
        )

    return aggregate if isinstance(aggregate_key, str) else aggregate_multiple


[docs]def aggregate(aggregate_key: str, groups: dict) -> list: return non_empty_list(map(_aggregate_nodes(aggregate_key), groups.values()))