diff --git a/drees.py b/drees.py index 675bc7b..53bba28 100644 --- a/drees.py +++ b/drees.py @@ -3,50 +3,59 @@ import json import logging import os from datetime import datetime as dt -from enum import Enum +from enum import Enum, IntEnum from functools import partial from multiprocessing import Pool from typing import Any, Dict, List, Optional, OrderedDict, Tuple import numpy as np -import pandas as pd import requests from matplotlib import dates as md from matplotlib import pyplot as plt -from numba import njit FORMAT = "%(asctime)s - %(levelname)s - %(message)s" logging.basicConfig(format=FORMAT, level=logging.INFO) DATE_FORMAT = "%Y-%m-%d" +PLOT_FORMAT = "png" DATA_URL = "https://data.drees.solidarites-sante.gouv.fr/api/records/1.0/search/?dataset=covid-19-resultats-par-age-issus-des-appariements-entre-si-vic-si-dep-et-vac-si&q=&rows=-1&facet=date&facet=vac_statut&facet=age" DATA_REPOSITORY = "data" OUTPUT_REPOSITORY = "output" -class Field(str, Enum): - HC = "hc" - SC = "sc" - DC = "dc" +class DreesEnum(bytes, Enum): + def __new__(cls, value, label): + obj = bytes.__new__(cls, [value]) + obj._value_ = value + obj.label = label + return obj -class VacStatus(str, Enum): - NC = "Non-vaccinés" - PDR = "Primo dose récente" - PDE = "Primo dose efficace" - CM3MSR = "Complet de moins de 3 mois - sans rappel" - CM3MAR = "Complet de moins de 3 mois - avec rappel" - CM36MSR = "Complet entre 3 mois et 6 mois - sans rappel" - CM36MAR = "Complet entre 3 mois et 6 mois - avec rappel" +class Field(DreesEnum): + HC = (0, "hc") + SC = (1, "sc") + DC = (2, "dc") -class AgeGroup(str, Enum): - VERY_YOUNG = "[0,19]" - YOUNG = "[20,39]" - MID_OLD = "[40,59]" - OLD = "[60,79]" - VERY_OLD = "[80;+]" +class VacStatus(DreesEnum): + NC = (0, "Non-vaccinés") + PDR = (1, "Primo dose récente") + PDE = (2, "Primo dose efficace") + CM3MSR = (3, "Complet de moins de 3 mois - sans rappel") + CM3MAR = (4, "Complet de moins de 3 mois - avec rappel") + CM36MSR = (5, "Complet entre 3 mois et 6 mois - sans rappel") + CM36MAR = (6, "Complet entre 3 mois et 6 mois - avec rappel") + C6MAR = (7, "Complet de 6 mois et plus - avec rappel") + C6MSR = (8, "Complet de 6 mois et plus - sans rappel") + + +class AgeGroup(DreesEnum): + VERY_YOUNG = (0, "[0,19]") + YOUNG = (1, "[20,39]") + MID_OLD = (2, "[40,59]") + OLD = (3, "[60,79]") + VERY_OLD = (4, "[80;+]") def get_data( @@ -57,6 +66,7 @@ def get_data( """ collect covid data by age from DREES """ + logging.info("fetching data...") os.makedirs(DATA_REPOSITORY, exist_ok=True) data_url = DATA_URL.format(extension=extension) if data_url.endswith("/"): @@ -76,6 +86,24 @@ def get_data( return json.load(open(file_path, "rb")) +def get_enum_vac_status(value): + for vac_status in VacStatus: + if vac_status.label == value: + return vac_status.value + + +def get_enum_age(value): + for age_group in AgeGroup: + if age_group.label == value: + return age_group.value + + +def get_enum_field(value): + for field in Field: + if field.label == value: + return field.value + + def group_by_age_date(data: Dict[str, Any], fields: List[str]) -> Dict[dt, Any]: """ group the original dictionnary into a more readable one @@ -90,6 +118,7 @@ def group_by_age_date(data: Dict[str, Any], fields: List[str]) -> Dict[dt, Any]: } } """ + logging.info("restructuring the data...") dic_data_grouped: Dict[dt, Any] = OrderedDict() for row in data["records"]: row_fields = row["fields"] @@ -104,204 +133,176 @@ def group_by_age_date(data: Dict[str, Any], fields: List[str]) -> Dict[dt, Any]: dic_data_grouped[date][age][vac_status] = OrderedDict() for field in fields: dic_data_grouped[date][age][vac_status][field] = row_fields[field] + logging.info("data restructured") return dic_data_grouped -@njit -def cumulate_array(array: np.ndarray) -> np.ndarray: - cumulate = list() - sum: float = 0 - for item in array: - sum += item - cumulate.append(sum) - return np.array(cumulate) +def get_np_data(dic_data_grouped: Dict[dt, Any]) -> Tuple[np.ndarray, np.ndarray]: + """ + store the data in numpy data structure helped by Enum + """ + logging.info("storing data in numpy data structure...") + np_data = np.empty( + (len(dic_data_grouped), len(AgeGroup), len(VacStatus), len(Field)) + ) + np_date = np.empty((len(dic_data_grouped)), dtype="datetime64[s]") + for idx_date, (date, dic_age) in enumerate(dic_data_grouped.items()): + np_date[idx_date] = date + for age, dic_vac in dic_age.items(): + idx_age = get_enum_age(age) + for vac, dic_field in dic_vac.items(): + idx_vac = get_enum_vac_status(vac) + for field, value in dic_field.items(): + idx_field = get_enum_field(field) + np_data[idx_date, idx_age, idx_vac, idx_field] = value + logging.info("date and data generated") + return np_data, np_date def get_plot_fig( - grid: Optional[bool] = True, date_format: Optional[str] = DATE_FORMAT + grid: Optional[bool] = True, + date_format: Optional[str] = DATE_FORMAT, + figsize: Optional[Tuple[int, int]] = None, ) -> plt.figure: """ return pyplot fig, ax to plot data over range period with date formatting """ - fig, ax = plt.subplots() + fig, ax = plt.subplots(figsize=figsize) ax.grid(grid) date_formatter = md.DateFormatter(date_format) ax.xaxis.set_major_locator(md.AutoDateLocator()) ax.xaxis.set_major_formatter(date_formatter) fig.autofmt_xdate() - return fig + return fig, ax def save_and_close_fig( - fig: plt.figure, output_path: str, has_legend: Optional[bool] = True + fig: plt.figure, + output_path: str, + has_legend: Optional[bool] = True, + is_tight: Optional[bool] = True, ): + logging.info(f"plotting : {output_path}...") if has_legend: plt.legend() + if is_tight: + plt.tight_layout() plt.savefig(output_path) plt.close(fig) + logging.info(f"{output_path} plotted") -def get_cumulative_field_by_age( - dic_data_grouped: Dict[dt, Any], age: str, field: Field -) -> Tuple[np.ndarray, List[dt]]: - """ - cumulate field values over data period - """ - dcs: List[int] = list() - dates: List[dt] = list() - for date, dic_age_grouped in dic_data_grouped.items(): - if (dic_age := dic_age_grouped.get(age)) is None: - logging.error(f"{age} not found in grouped ages") - continue - for dic_vac_status in dic_age.values(): - if (field_value := dic_vac_status[field.value]) is not None: - dcs.append(field_value) - dates.append(date) - np_dcs = np.array(dcs) - np_cumulate = cumulate_array(np_dcs) - return np_cumulate, dates +def split_by_vac_status(np_data: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: + return np.sum(np_data[:, :, 1:, :], axis=2), np_data[:, :, VacStatus.NC.value, :] -def get_values_by_age_vac_field( - dic_data_grouped: Dict[dt, Any], age: AgeGroup, vac_status: VacStatus, field: Field -) -> Tuple[List[dt], List[float]]: - """ - get deep field data by age, vaccine status and field - """ - dates: List[dt] = list() - fields: List[float] = list() - for date, dic_age_grouped in dic_data_grouped.items(): - if (dic_vac_status := dic_age_grouped.get(age.value)) is not None: - if (dic_field := dic_vac_status.get(vac_status.value)) is not None: - if (field_value := dic_field.get(field.value)) is not None: - fields.append(field_value) - dates.append(date) - return dates, fields - - -def get_values_by_age_vac( - dic_data_grouped: Dict[dt, Any], age: AgeGroup, vac_status: VacStatus -) -> Tuple[List[dt], List[Dict[str, Any]]]: - """ - get deep fields data by age and vaccine status - """ - dates: List[dt] = list() - fields: List[Dict[str, Any]] = list() - for date, dic_age_grouped in dic_data_grouped.items(): - if (dic_vac_status := dic_age_grouped.get(age.value)) is not None: - if (dic_field := dic_vac_status.get(vac_status.value)) is not None: - fields.append(dic_field) - dates.append(date) - return dates, fields - - -def plot_cumulative_field(dic_data_grouped: Dict[dt, Any], field: Field) -> None: - fig = get_plot_fig() +def plot_cumulative_field( + np_data: np.ndarray, np_date: np.ndarray, field: Field +) -> None: + fig, _ = get_plot_fig() + np_data_vac, np_data_unvac = split_by_vac_status(np_data) for age_group in AgeGroup: - deaths, dates = get_cumulative_field_by_age( - dic_data_grouped, age_group.value, field + np_cumulate_vac: np.ndarray = np.cumsum( + np_data_vac[:, age_group.value, field.value], axis=0 ) - plt.plot(dates, deaths, label=age_group.value) + np_cumulate_unvac: np.ndarray = np.cumsum( + np_data_unvac[:, age_group.value, field.value], axis=0 + ) + plt.plot(np_date, np_cumulate_vac, label=f"{age_group.label} vaccinate") + plt.plot(np_date, np_cumulate_unvac, label=f"{age_group.label} unvaccinate") - plt.title( - f"nombre de {field.value} cumulé par age (status vaccinal non pris en compte)" - ) + plt.title(f"nombre de {field.label} cumulé par age") plt.xlabel("date") save_and_close_fig( - fig, os.path.join(OUTPUT_REPOSITORY, f"cumulative_{field.value}.pdf") + fig, os.path.join(OUTPUT_REPOSITORY, f"cumulative_{field.label}") ) -def extract_field_values(fields: List[Dict[str, Any]], field: Field) -> np.ndarray: - field_values: List[float] = list() - for item in fields: - if (value := item.get(field)) is not None: - field_values.append(value) - return np.asarray(field_values) - - -def plot_data_by_age_vac( - dic_data_grouped: Dict[dt, Any], age: AgeGroup, vac_status: VacStatus +def plot_fields_by_age_vac( + np_data: np.ndarray, np_date: np.ndarray, age_group: AgeGroup, vac_status: VacStatus ) -> None: """ plot data by vaccine status, age and field """ - fig = get_plot_fig() - - dates, fields = get_values_by_age_vac(dic_data_grouped, age, vac_status) + fig, _ = get_plot_fig() for field in Field: - field_values = extract_field_values(fields, field) - plt.plot(dates, field_values, label=f"{field.value}") + plt.plot( + np_date, + np_data[:, age_group.value, vac_status.value, field.value], + label=f"{field.label}", + ) plt.xlabel("date") plt.ylabel("nombre") - plt.title(f"{age}ans - {vac_status}") + plt.title(f"{age_group.label}ans - {vac_status.label}") - save_and_close_fig(fig, os.path.join(OUTPUT_REPOSITORY, f"{age}_{vac_status}.pdf")) - - -def group_by_date_age_vac( - dic_data_grouped: Dict[dt, Any], - field: Field, - is_vac: Optional[bool] = True, - limit_days: Optional[int] = 30, -) -> Dict[str, Any]: - dic_data: Dict[str, Any] = OrderedDict() - for date, dic_age in dic_data_grouped.items(): - if abs((date - dt.now())).days >= limit_days: - continue - date_format = date.strftime(DATE_FORMAT) - dic_data[date_format] = OrderedDict() - for age, dic_vac in dic_age.items(): - nb_vac, nb_unvac = 0, 0 - for vac_status, dic_field in dic_vac.items(): - if vac_status == VacStatus.NC.value: - nb_unvac += dic_field.get(field.value, 0) - continue - nb_vac += dic_field.get(field.value, 0) - sum_vac = nb_vac + nb_unvac - try: - percent_vac = (nb_vac / sum_vac) * 100 - except ZeroDivisionError: - percent_vac = 0 - try: - percent_unvac = (nb_unvac / sum_vac) * 100 - except ZeroDivisionError: - percent_unvac = 0 - dic_data[date_format][age] = percent_vac if is_vac else percent_unvac - return dic_data - - -def plot_bar_data_by_field( - dic_data_grouped: Dict[dt, Any], field: Field, is_vac: Optional[bool] = True -) -> None: - """ - display a bar graph by field grouped by age over the data period - bars displays vaccine status percent - """ - plt.rcParams["font.size"] = "24" - dic_data = group_by_date_age_vac(dic_data_grouped, field, is_vac=is_vac) - df = pd.DataFrame(dic_data).T - - ax = df.plot.bar(figsize=(26, 15)) - ax.set_title(f"{field.value} vaccinate percent grouped by age") - ax.set_xlabel("date") - fig = ax.get_figure() - - plt.xticks(rotation=45) - plt.legend(loc="upper right") - plt.tight_layout() - - filename = "vac" if is_vac else "unvac" - fig.savefig( - os.path.join(OUTPUT_REPOSITORY, f"{filename}_age_grouped_{field.value}.pdf") + save_and_close_fig( + fig, + os.path.join(OUTPUT_REPOSITORY, f"all_{age_group.label}_{vac_status.label}"), ) -def build_data_pool_args() -> List[Tuple[AgeGroup, VacStatus]]: +def get_vaccine_percent(np_data: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """ - build tuple arguments to plot all data on multiprocess + get the vaccine percent per date, age and field + the vaccine data holds all the vaccine status except unvaccine + """ + np_data_vac, np_data_unvac = split_by_vac_status(np_data) + np_percent_vac = np_data_vac / np.sum(np_data, axis=2) + np_percent_unvac = np_data_unvac / np.sum(np_data, axis=2) + return np_percent_vac, np_percent_unvac + + +def plot_bar_data_by_age_field( + np_data: np.ndarray, + np_date: np.ndarray, + age_group: AgeGroup, + field: Field, +) -> None: + """ + display a bar graph by field and age over the data period + bars display vaccine status percent + a limit days period is set to have an readable plot + """ + np_percent_vac, np_percent_unvac = get_vaccine_percent(np_data) + fig, ax = get_plot_fig(figsize=(22, 8)) + + for idx_date in range(len(np_date)): + vac_percent = np.round( + np_percent_vac[idx_date, age_group.value, field.value] * 100 + ) + unvac_percent = np.round( + np_percent_unvac[idx_date, age_group.value, field.value] * 100 + ) + bar_vac = ax.bar(idx_date, vac_percent, color="b", label="vac") + bar_unvac = ax.bar( + idx_date, unvac_percent, bottom=vac_percent, color="r", label="novac" + ) + ax.bar_label(bar_vac, label_type="edge", color="black", fontsize="8") + ax.set_ylim(top=105) + + ax.set_ylabel("%") + ax.set_title(f"{age_group.label} - {field.label}") + ax.set( + xticks=range(len(np_date)), + xticklabels=[ + d.strftime(DATE_FORMAT) if idx % 4 == 0 else "" + for idx, d in enumerate(np_date.astype(dt)) + ], + ) + plt.legend(["vaccinate", "unvaccinate"], loc=0, frameon=True) + + save_and_close_fig( + fig, + os.path.join(OUTPUT_REPOSITORY, f"vac_percent_{age_group.label}_{field.label}"), + has_legend=False, + ) + + +def get_age_vac_args() -> List[Tuple[AgeGroup, VacStatus]]: + """ + get tuple arguments to plot fields data by age and vac status on multiprocess """ pool_args: List[Tuple[AgeGroup, VacStatus]] = list() for age_group in AgeGroup: @@ -310,17 +311,28 @@ def build_data_pool_args() -> List[Tuple[AgeGroup, VacStatus]]: return pool_args +def get_age_field_args() -> List[Tuple[AgeGroup, Field]]: + """ + get tuple arguments to plot fields data by age and field on multiprocess + """ + pool_args: List[Tuple[AgeGroup, Field]] = list() + for age_group in AgeGroup: + for field in Field: + pool_args.append((age_group, field)) + return pool_args + + if __name__ == "__main__": """ This script aims to plot DRESS data Plots availables : - cumulative deaths by age - - indicators by vaccine status and age - - indicators vaccine/unvaccine percent grouped by age + - hc, sc, dc by vaccine status and age + - hc, sc, dc (vaccine/unvaccine percent) by age Main indicators are : - - hospitalisations - - criticals - - deaths + - hospitalisations (hc) + - criticals (sc) + - deaths (dc) """ parser = argparse.ArgumentParser() @@ -340,17 +352,18 @@ if __name__ == "__main__": file_path=os.path.join(DATA_REPOSITORY, "dress.json"), refresh=args.refresh ) dic_data_grouped: Dict[dt, Any] = group_by_age_date( - dic_data, [x.value for x in Field] + dic_data, [x.label for x in Field] ) - plot_data_pool_args = build_data_pool_args() - f = partial(plot_data_by_age_vac, dic_data_grouped) - with Pool() as pool: - pool.starmap(f, plot_data_pool_args) + np_data, np_date = get_np_data(dic_data_grouped) + + plot_fields_args = get_age_vac_args() + f_fields = partial(plot_fields_by_age_vac, np_data, np_date) + plot_vac_percent_age_args = get_age_field_args() + f_bars = partial(plot_bar_data_by_age_field, np_data, np_date) + with Pool(2) as pool: + pool.starmap(f_fields, plot_fields_args) + pool.starmap(f_bars, plot_vac_percent_age_args) for field in Field: - plot_cumulative_field(dic_data_grouped, field) - - for field in Field: - plot_bar_data_by_field(dic_data_grouped, field) - plot_bar_data_by_field(dic_data_grouped, field, is_vac=False) + plot_cumulative_field(np_data, np_date, field)