import argparse import json import logging import os from datetime import datetime as dt from enum import Enum from functools import partial from multiprocessing import Pool from typing import Any, Dict, List, Optional, OrderedDict, Tuple import numpy as np import requests from matplotlib import dates as md from matplotlib import pyplot as plt FORMAT = "%(asctime)s - %(levelname)s - %(message)s" logging.basicConfig(format=FORMAT, level=logging.INFO) DATE_FORMAT = "%Y-%m-%d" DATA_URL = "https://data.drees.solidarites-sante.gouv.fr/api/records/1.0/search/?dataset=covid-19-resultats-par-age-issus-des-appariements-entre-si-vic-si-dep-et-vac-si&q=&rows=-1&facet=date&facet=vac_statut&facet=age" DATA_REPOSITORY = "data" OUTPUT_REPOSITORY = "output" class DreesEnum(bytes, Enum): def __new__(cls, value, label): obj = bytes.__new__(cls, [value]) obj._value_ = value obj.label = label return obj class Field(DreesEnum): HC = (0, "hc") SC = (1, "sc") DC = (2, "dc") class VacStatus(DreesEnum): NC = (0, "Non-vaccinés") PDR = (1, "Primo dose récente") PDE = (2, "Primo dose efficace") CM3MSR = (3, "Complet de moins de 3 mois - sans rappel") CM3MAR = (4, "Complet de moins de 3 mois - avec rappel") CM36MSR = (5, "Complet entre 3 mois et 6 mois - sans rappel") CM36MAR = (6, "Complet entre 3 mois et 6 mois - avec rappel") C6MAR = (7, "Complet de 6 mois et plus - avec rappel") C6MSR = (8, "Complet de 6 mois et plus - sans rappel") class AgeGroup(DreesEnum): VERY_YOUNG = (0, "[0,19]") YOUNG = (1, "[20,39]") MID_OLD = (2, "[40,59]") OLD = (3, "[60,79]") VERY_OLD = (4, "[80;+]") def get_data( file_path: Optional[str] = None, extension: Optional[str] = "json", refresh=False, ) -> Dict[str, Any]: """ collect covid data by age from DREES """ logging.info("fetching data...") os.makedirs(DATA_REPOSITORY, exist_ok=True) data_url = DATA_URL.format(extension=extension) if data_url.endswith("/"): data_url = data_url[:-1] file_path = ( os.path.join(DATA_REPOSITORY, data_url.split("/")[-1]) if file_path is None else file_path ) if not os.path.isfile(file_path) or refresh: r = requests.get(data_url) if not r.content: raise ValueError("no data provided froim the url : {}".format(data_url)) with open(file_path, "wb") as f: f.write(r.content) return json.loads(r.content) return json.load(open(file_path, "rb")) def get_enum_vac_status(value): for vac_status in VacStatus: if vac_status.label == value: return vac_status.value def get_enum_age(value): for age_group in AgeGroup: if age_group.label == value: return age_group.value def get_enum_field(value): for field in Field: if field.label == value: return field.value def group_by_age_date(data: Dict[str, Any]) -> Dict[dt, Any]: """ group the original dictionnary into a more readable one 'date': { 'age' : { 'vac_status' : { 'hc', 'sc', 'dc', ... } } } """ logging.info("restructuring the data...") dic_data_grouped: Dict[dt, Any] = OrderedDict() for row in data["records"]: row_fields = row["fields"] date = dt.strptime(row_fields["date"], DATE_FORMAT) age = row_fields["age"] vac_status = row_fields["vac_statut"] if date not in dic_data_grouped: dic_data_grouped[date] = OrderedDict() if age not in dic_data_grouped[date]: dic_data_grouped[date][age] = OrderedDict() if vac_status not in dic_data_grouped[date][age]: dic_data_grouped[date][age][vac_status] = OrderedDict() for field in Field: dic_data_grouped[date][age][vac_status][field.label] = row_fields[ field.label ] logging.info("data restructured") return dic_data_grouped def get_np_data(dic_data_grouped: Dict[dt, Any]) -> Tuple[np.ndarray, np.ndarray]: """ store the data in numpy data structure helped by Enum """ logging.info("storing data in numpy data structure...") np_data = np.empty( (len(dic_data_grouped), len(AgeGroup), len(VacStatus), len(Field)) ) np_date = np.empty((len(dic_data_grouped)), dtype="datetime64[s]") for idx_date, (date, dic_age) in enumerate(dic_data_grouped.items()): np_date[idx_date] = date for age, dic_vac in dic_age.items(): idx_age = get_enum_age(age) for vac, dic_field in dic_vac.items(): idx_vac = get_enum_vac_status(vac) for field, value in dic_field.items(): idx_field = get_enum_field(field) np_data[idx_date, idx_age, idx_vac, idx_field] = value logging.info("date and data generated") return np_data, np_date def get_plot_fig( grid: Optional[bool] = True, date_format: Optional[str] = DATE_FORMAT, figsize: Optional[Tuple[int, int]] = None, ) -> plt.figure: """ return pyplot fig, ax to plot data over range period with date formatting """ fig, ax = plt.subplots(figsize=figsize) ax.grid(grid) date_formatter = md.DateFormatter(date_format) ax.xaxis.set_major_locator(md.AutoDateLocator()) ax.xaxis.set_major_formatter(date_formatter) fig.autofmt_xdate() return fig, ax def save_and_close_fig( fig: plt.figure, output_path: str, has_legend: Optional[bool] = True, is_tight: Optional[bool] = True, ): logging.info(f"plotting : {output_path}...") if has_legend: plt.legend() if is_tight: plt.tight_layout() plt.savefig(output_path) plt.close(fig) logging.info(f"{output_path} plotted") def split_by_vac_status(np_data: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: return np.sum(np_data[:, :, 1:, :], axis=2), np_data[:, :, VacStatus.NC.value, :] def plot_cumulative_field( np_data: np.ndarray, np_date: np.ndarray, field: Field ) -> None: fig, _ = get_plot_fig() np_data_vac, np_data_unvac = split_by_vac_status(np_data) for age_group in AgeGroup: np_cumulate_vac: np.ndarray = np.cumsum( np_data_vac[:, age_group.value, field.value], axis=0 ) np_cumulate_unvac: np.ndarray = np.cumsum( np_data_unvac[:, age_group.value, field.value], axis=0 ) plt.plot(np_date, np_cumulate_vac, label=f"{age_group.label} vax") plt.plot(np_date, np_cumulate_unvac, label=f"{age_group.label} no vax") plt.title(f"nombre de {field.label} cumulé par age") plt.xlabel("date") save_and_close_fig( fig, os.path.join(OUTPUT_REPOSITORY, f"cumulative_{field.label}") ) def plot_fields_by_age_vac( np_data: np.ndarray, np_date: np.ndarray, age_group: AgeGroup, vac_status: VacStatus ) -> None: """ plot data by vaccine status, age and field """ fig, _ = get_plot_fig() for field in Field: plt.plot( np_date, np_data[:, age_group.value, vac_status.value, field.value], label=f"{field.label}", ) plt.xlabel("date") plt.ylabel("nombre") plt.title(f"{age_group.label}ans - {vac_status.label}") save_and_close_fig( fig, os.path.join(OUTPUT_REPOSITORY, f"all_{age_group.label}_{vac_status.label}"), ) def get_vaccine_percent(np_data: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """ get the vaccine percent per date, age and field the vaccine data holds all the vaccine status except unvaccine """ np_data_vac, np_data_unvac = split_by_vac_status(np_data) np_percent_vac = np_data_vac / np.sum(np_data, axis=2) np_percent_unvac = np_data_unvac / np.sum(np_data, axis=2) return np_percent_vac, np_percent_unvac def plot_bar_data_by_age_field( np_data: np.ndarray, np_date: np.ndarray, age_group: AgeGroup, field: Field, ) -> None: """ display a bar graph by field and age over the data period bars display vaccine status percent a limit days period is set to have an readable plot """ np_percent_vac, np_percent_unvac = get_vaccine_percent(np_data) # adjust the fig size to display correctly bars and labels fig, ax = get_plot_fig(figsize=(22, 8)) for idx_date in range(len(np_date)): vac_percent = np.round( np_percent_vac[idx_date, age_group.value, field.value] * 100 ) unvac_percent = np.round( np_percent_unvac[idx_date, age_group.value, field.value] * 100 ) bar_vac = ax.bar(idx_date, vac_percent, color="b", label="vax") ax.bar(idx_date, unvac_percent, bottom=vac_percent, color="r", label="no vax") ax.bar_label(bar_vac, label_type="edge", color="black", fontsize="8") ax.set_ylim(top=105) # to display 100% label ax.set_ylabel("%") ax.set_title(f"{age_group.label} - {field.label}") # avoid displaying all dates ax.set( xticks=range(len(np_date)), xticklabels=[ d.strftime(DATE_FORMAT) if idx % 4 == 0 else "" for idx, d in enumerate(np_date.astype(dt)) ], ) plt.legend(["vax", "no vax"], loc=0, frameon=True) save_and_close_fig( fig, os.path.join(OUTPUT_REPOSITORY, f"vac_percent_{age_group.label}_{field.label}"), has_legend=False, ) def get_age_vac_args() -> List[Tuple[AgeGroup, VacStatus]]: """ get tuple arguments to plot fields data by age and vac status on multiprocess """ pool_args: List[Tuple[AgeGroup, VacStatus]] = list() for age_group in AgeGroup: for vac_status in VacStatus: pool_args.append((age_group, vac_status)) return pool_args def get_age_field_args() -> List[Tuple[AgeGroup, Field]]: """ get tuple arguments to plot fields data by age and field on multiprocess """ pool_args: List[Tuple[AgeGroup, Field]] = list() for age_group in AgeGroup: for field in Field: pool_args.append((age_group, field)) return pool_args if __name__ == "__main__": """ This script aims to plot DRESS data Plots availables : - cumulative deaths by age - hc, sc, dc by vaccine status and age - hc, sc, dc (vaccine/unvaccine percent) by age Main indicators are : - hospitalisations (hc) - criticals (sc) - deaths (dc) """ parser = argparse.ArgumentParser() parser.add_argument( "-r", "--refresh", action="store_true", default=False, help="redownload data for updates", ) args = parser.parse_args() os.makedirs(OUTPUT_REPOSITORY, exist_ok=True) dic_data: Dict[str, Any] = get_data( file_path=os.path.join(DATA_REPOSITORY, "dress.json"), refresh=args.refresh ) dic_data_grouped: Dict[dt, Any] = group_by_age_date(dic_data) np_data, np_date = get_np_data(dic_data_grouped) plot_fields_args = get_age_vac_args() f_fields = partial(plot_fields_by_age_vac, np_data, np_date) plot_vac_percent_age_args = get_age_field_args() f_bars = partial(plot_bar_data_by_age_field, np_data, np_date) with Pool(2) as pool: pool.starmap(f_fields, plot_fields_args) pool.starmap(f_bars, plot_vac_percent_age_args) for field in Field: plot_cumulative_field(np_data, np_date, field)