import argparse import json import logging import os from datetime import datetime as dt from enum import Enum from functools import partial from multiprocessing import Pool from typing import Any, Dict, List, Optional, OrderedDict, Tuple import numpy as np import requests from jinja2 import Environment, FileSystemLoader, select_autoescape from matplotlib import dates as md from matplotlib import pyplot as plt FORMAT = "%(asctime)s - %(levelname)s - %(message)s" logging.basicConfig(format=FORMAT, level=logging.INFO) DATE_FORMAT = "%Y-%m-%d" DATA_URL = "https://data.drees.solidarites-sante.gouv.fr/api/records/1.0/search/?dataset=covid-19-resultats-par-age-issus-des-appariements-entre-si-vic-si-dep-et-vac-si&q=&rows=-1&facet=date&facet=vac_statut&facet=age" DATA_REPOSITORY = "data" STATIC_REPOSITORY = "static" OUTPUT_REPOSITORY = os.path.join(STATIC_REPOSITORY, "plots") BUILD_REPOSITORY = "build" MAIN_URL = "https://covid.thegux.fr/" # cycler could be better, but for ages plots it's ok AGE_COLORS = { 0: "pink", 1: "green", 2: "blue", 3: "red", 4: "gray", } class DreesEnum(bytes, Enum): def __new__(cls, value, label): obj = bytes.__new__(cls, [value]) obj._value_ = value obj.label = label return obj class Field(DreesEnum): HC = (0, "Hospitalisations") SC = (1, "Soins critiques") DC = (2, "Décés") class VacStatus(DreesEnum): NC = (0, "Non-vaccinés") PDR = (1, "Primo dose récente") PDE = (2, "Primo dose efficace") CM3MSR = (3, "Complet de moins de 3 mois - sans rappel") CM3MAR = (4, "Complet de moins de 3 mois - avec rappel") CM36MSR = (5, "Complet entre 3 mois et 6 mois - sans rappel") CM36MAR = (6, "Complet entre 3 mois et 6 mois - avec rappel") C6MAR = (7, "Complet de 6 mois et plus - avec rappel") C6MSR = (8, "Complet de 6 mois et plus - sans rappel") class AgeGroup(DreesEnum): VERY_YOUNG = (0, "[0,19]") YOUNG = (1, "[20,39]") MID_OLD = (2, "[40,59]") OLD = (3, "[60,79]") VERY_OLD = (4, "[80;+]") def get_data( file_path: Optional[str] = None, extension: Optional[str] = "json", refresh=False, ) -> Dict[str, Any]: """ collect covid data by age from DREES """ os.makedirs(DATA_REPOSITORY, exist_ok=True) data_url = DATA_URL.format(extension=extension) if data_url.endswith("/"): data_url = data_url[:-1] file_path = ( os.path.join(DATA_REPOSITORY, data_url.split("/")[-1]) if file_path is None else file_path ) if not os.path.isfile(file_path) or refresh: logging.info("fetching data...") r = requests.get(data_url) if not r.content: raise ValueError("no data provided froim the url : {}".format(data_url)) with open(file_path, "wb") as f: f.write(r.content) return json.loads(r.content) logging.info(f"opening {file_path}...") return json.load(open(file_path, "rb")) def get_enum_vac_status(value): for vac_status in VacStatus: if vac_status.label == value: return vac_status.value def get_enum_age(value): for age_group in AgeGroup: if age_group.label == value: return age_group.value def get_enum_field(value): for field in Field: if field.name.lower() == value: return field.value def group_by_age_date(data: Dict[str, Any]) -> Dict[dt, Any]: """ group the original dictionnary into a more readable one 'date': { 'age' : { 'vac_status' : { 'hc', 'sc', 'dc', ... } } } """ logging.info("restructuring the data...") dic_data_grouped: Dict[dt, Any] = OrderedDict() for row in data["records"]: row_fields = row["fields"] date = dt.strptime(row_fields["date"], DATE_FORMAT) age = row_fields["age"] vac_status = row_fields["vac_statut"] if date not in dic_data_grouped: dic_data_grouped[date] = OrderedDict() if age not in dic_data_grouped[date]: dic_data_grouped[date][age] = OrderedDict() if vac_status not in dic_data_grouped[date][age]: dic_data_grouped[date][age][vac_status] = OrderedDict() for field in Field: field_name = field.name.lower() dic_data_grouped[date][age][vac_status][field_name] = row_fields[field_name] logging.info("data restructured") return dic_data_grouped def get_np_data(dic_data_grouped: Dict[dt, Any]) -> Tuple[np.ndarray, np.ndarray]: """ store the data in numpy data structure helped by Enum """ logging.info("storing data in numpy data structure...") np_data = np.empty( (len(dic_data_grouped), len(AgeGroup), len(VacStatus), len(Field)) ) np_date = np.empty((len(dic_data_grouped)), dtype="datetime64[s]") for idx_date, (date, dic_age) in enumerate(dic_data_grouped.items()): np_date[idx_date] = date for age, dic_vac in dic_age.items(): idx_age = get_enum_age(age) for vac, dic_field in dic_vac.items(): idx_vac = get_enum_vac_status(vac) for field, value in dic_field.items(): idx_field = get_enum_field(field) np_data[idx_date, idx_age, idx_vac, idx_field] = value logging.info("date and data generated") date_start = np_date[0] date_end = np_date[len(np_date) - 1] logging.info(f"range period : {date_start} - {date_end}") return np_data, np_date def split_by_vac_status(np_data: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: return np.sum(np_data[:, :, 1:, :], axis=2), np_data[:, :, VacStatus.NC.value, :] def get_vaccine_percent(np_data: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """ get the vaccine percent per date, age and field the vaccine data holds all the vaccine status except unvaccine """ np_data_vac, np_data_unvac = split_by_vac_status(np_data) np_percent_vac = np_data_vac / np.sum(np_data, axis=2) np_percent_unvac = np_data_unvac / np.sum(np_data, axis=2) return np_percent_vac, np_percent_unvac def get_percent_age_by_date_field_vac_splited( np_data: np.ndarray, field: Field ) -> Tuple[np.ndarray, np.ndarray]: """ get numpy percent age grouped by date and field splited by vaccine status """ np_percent_age_vac = np.empty((len(np_data), len(AgeGroup))) np_percent_age_unvac = np.copy(np_percent_age_vac) np_data_vac, np_data_unvac = split_by_vac_status(np_data) for idx_date in range(len(np_data_vac)): sum_effectif = np.nansum(np_data_vac[idx_date, :, field.value]) for age_group in AgeGroup: np_percent_age_vac[idx_date, age_group.value] = np.round( (np_data_vac[idx_date, age_group.value, field.value] / sum_effectif) * 100, 2, ) for idx_date in range(len(np_data_unvac)): sum_effectif = np.nansum(np_data_unvac[idx_date, :, field.value]) for age_group in AgeGroup: np_percent_age_unvac[idx_date, age_group.value] = np.round( (np_data_unvac[idx_date, age_group.value, field.value] / sum_effectif) * 100, 2, ) return np_percent_age_vac, np_percent_age_unvac def get_percent_age_by_date_field(np_data: np.ndarray, field: Field) -> np.ndarray: """ get numpy percent age grouped by date and field """ np_percent_age = np.empty((len(np_data), len(AgeGroup))) for idx_date in range(len(np_data)): sum_effectif = np.nansum( np.nansum(np_data[idx_date, :, :, field.value], axis=1) ) for age_group in AgeGroup: np_percent_age[idx_date, age_group.value] = np.round( ( np.sum(np_data[idx_date, age_group.value, :, field.value], axis=0) / sum_effectif ) * 100, 2, ) return np_percent_age def get_plot_fig( grid: Optional[bool] = True, date_format: Optional[str] = DATE_FORMAT, figsize: Optional[Tuple[int, int]] = None, locator: Optional[Any] = md.MonthLocator(), ) -> plt.figure: """ return pyplot fig, ax to plot data over range period with date formatting """ fig, ax = plt.subplots(figsize=figsize) ax.grid(grid) date_formatter = md.DateFormatter(date_format) ax.xaxis.set_major_locator(locator) ax.xaxis.set_major_formatter(date_formatter) fig.autofmt_xdate() return fig, ax def save_and_close_fig( fig: plt.figure, output_path: str, has_legend: Optional[bool] = True, is_tight: Optional[bool] = True, ): logging.info(f"plotting : {output_path}...") if has_legend: plt.legend() if is_tight: plt.tight_layout() plt.savefig(output_path) plt.close(fig) logging.info(f"{output_path} plotted") def analyse(np_data: np.ndarray, np_date: np.ndarray) -> None: """ analyse data """ logging.info("analysing data...") np_percent_vac, _ = get_vaccine_percent(np_data) logging.info("--- field by age vaccine mean percent ---") for age_group in AgeGroup: for field in Field: mean_vac_percent = np.round( np.nanmean(np_percent_vac[:, age_group.value, field.value]) * 100, 2 ) print(f"{field.name} - {age_group.label} - vac : {mean_vac_percent}%") logging.info("--- age by field and vac status mean percent ---") for field in Field: np_percent_age = get_percent_age_by_date_field(np_data, field) ( np_percent_age_vac, np_percent_age_unvac, ) = get_percent_age_by_date_field_vac_splited(np_data, field) for age_group in AgeGroup: percent_age_mean = np.round( np.nanmean(np_percent_age[:, age_group.value]), 2 ) print(f"age: {age_group.label} - field: {field.name} = {percent_age_mean}%") percent_age_vac_mean = np.round( np.nanmean(np_percent_age_vac[:, age_group.value]), 2 ) print( f"age: {age_group.label} - status: vac - field: {field.name} = {percent_age_vac_mean}%" ) percent_age_unvac_mean = np.round( np.nanmean(np_percent_age_unvac[:, age_group.value]), 2 ) print( f"age: {age_group.label} - status: unvac - field: {field.name} = {percent_age_unvac_mean}%" ) def plot_bar_age_percent_vac_status_by_field( np_data_vac_status: np.ndarray, np_date: np.ndarray, field: Field, is_vac: Optional[bool] = True, ) -> None: fig, ax = get_plot_fig(figsize=(22, 8), locator=md.WeekdayLocator()) bottom = np_data_vac_status[:, 0] suffix = "vac" if is_vac else "unvac" title = "Vaccinés" if is_vac else "Non vaccinés" for age_group in AgeGroup: percents_age = np_data_vac_status[:, age_group.value] if age_group.value > 0: ax.bar( np_date, percents_age, label=age_group.label, bottom=bottom, color=AGE_COLORS[age_group.value], ) bottom += percents_age else: ax.bar( np_date, percents_age, label=age_group.label, color=AGE_COLORS[age_group.value], ) ax.set_ylabel("%") ax.set_title(f"{field.label} - {title}") plt.legend( [age_group.label for age_group in AgeGroup], loc="upper right", frameon=True ) save_and_close_fig( fig, os.path.join(OUTPUT_REPOSITORY, f"age_percent_{suffix}_{field.name.lower()}"), has_legend=False, ) def plot_bar_age_percent_by_field( np_data: np.ndarray, np_date: np.ndarray, field: Field ) -> None: """ plot percent vaccinated field group by age bar diagram """ ( np_percent_age_vac, np_percent_age_unvac, ) = get_percent_age_by_date_field_vac_splited(np_data, field) plot_bar_age_percent_vac_status_by_field(np_percent_age_vac, np_date, field) plot_bar_age_percent_vac_status_by_field( np_percent_age_unvac, np_date, field, is_vac=False ) def plot_cumulative_field( np_data: np.ndarray, np_date: np.ndarray, field: Field ) -> None: fig, _ = get_plot_fig() np_data_vac, np_data_unvac = split_by_vac_status(np_data) for age_group in AgeGroup: np_cumulate_vac: np.ndarray = np.cumsum( np_data_vac[:, age_group.value, field.value], axis=0 ) np_cumulate_unvac: np.ndarray = np.cumsum( np_data_unvac[:, age_group.value, field.value], axis=0 ) plt.plot(np_date, np_cumulate_vac, label=f"{age_group.label} Vaccinés") plt.plot(np_date, np_cumulate_unvac, label=f"{age_group.label} Non vaccinés") plt.title(f"{field.label} cumulés par âge") plt.xlabel("date") save_and_close_fig( fig, os.path.join(OUTPUT_REPOSITORY, f"cumulative_{field.name.lower()}") ) def plot_fields_by_age_vac( np_data: np.ndarray, np_date: np.ndarray, age_group: AgeGroup, vac_status: VacStatus ) -> None: """ plot data by vaccine status, age and field """ fig, _ = get_plot_fig() for field in Field: plt.plot( np_date, np_data[:, age_group.value, vac_status.value, field.value], label=f"{field.label}", ) plt.xlabel("date") plt.ylabel("nombre") plt.title(f"{age_group.label} - {vac_status.label}") save_and_close_fig( fig, os.path.join( OUTPUT_REPOSITORY, f"all_{age_group.name.lower()}_{vac_status.name.lower()}" ), ) def plot_bar_data_by_age_field( np_data: np.ndarray, np_date: np.ndarray, age_group: AgeGroup, field: Field, ) -> None: """ display a bar graph by field and age over the data period bars display vaccine status percent """ np_percent_vac, np_percent_unvac = get_vaccine_percent(np_data) # adjust the fig size to display correctly bars and labels fig, ax = get_plot_fig(figsize=(22, 8)) for idx_date in range(len(np_date)): vac_percent = np.round( np_percent_vac[idx_date, age_group.value, field.value] * 100, 2 ) unvac_percent = np.round( np_percent_unvac[idx_date, age_group.value, field.value] * 100, 2 ) bar_vac = ax.bar(idx_date, vac_percent, color="b", label="Vaccinés") ax.bar( idx_date, unvac_percent, bottom=vac_percent, color="r", label="Non vaccinés" ) ax.bar_label( bar_vac, label_type="edge", color="black", fontsize="7", fmt="%.0f" ) ax.set_ylim(top=105) # to display 100% label ax.set_ylabel("%") ax.set_title(f"{age_group.label} - {field.label}") # avoid displaying all dates ax.set( xticks=range(len(np_date)), xticklabels=[ d.strftime(DATE_FORMAT) if idx % 4 == 0 else "" for idx, d in enumerate(np_date.astype(dt)) ], ) plt.legend(["Vaccinés", "Non vaccinés"], loc="upper right", frameon=True) save_and_close_fig( fig, os.path.join( OUTPUT_REPOSITORY, f"vac_percent_{age_group.name.lower()}_{field.name.lower()}", ), has_legend=False, ) def get_age_vac_args() -> List[Tuple[AgeGroup, VacStatus]]: """ get tuple arguments to plot fields data by age and vac status on multiprocess """ pool_args: List[Tuple[AgeGroup, VacStatus]] = list() for age_group in AgeGroup: for vac_status in VacStatus: pool_args.append((age_group, vac_status)) return pool_args def get_age_field_args() -> List[Tuple[AgeGroup, Field]]: """ get tuple arguments to plot fields data by age and field on multiprocess """ pool_args: List[Tuple[AgeGroup, Field]] = list() for age_group in AgeGroup: for field in Field: pool_args.append((age_group, field)) return pool_args def generate_html_page() -> None: logging.info("generating html page with plots...") os.makedirs(BUILD_REPOSITORY, exist_ok=True) env = Environment( loader=FileSystemLoader("templates"), autoescape=select_autoescape() ) template = env.get_template("index.template.html") data = template.render( **{ "fields": Field, "ages": AgeGroup, "status": VacStatus, "static": os.path.join(MAIN_URL, STATIC_REPOSITORY), "src": DATA_URL, } ) with open(os.path.join(BUILD_REPOSITORY, "index.html"), "w") as f: f.write(data) logging.info("html page build") if __name__ == "__main__": """ This script aims to plot DRESS data Plots availables : - cumulative deaths by age - hc, sc, dc by vaccine status and age - hc, sc, dc (vaccine/unvaccine percent) by age - hc, sc, dc (age grouped percent) by field Main indicators are : - hospitalisations (hc) - criticals (sc) - deaths (dc) hc, sc, dc include positive PCR tests """ parser = argparse.ArgumentParser() parser.add_argument( "-r", "--refresh", action="store_true", default=False, help="redownload data for updates", ) parser.add_argument( "-np", "--no-plot", action="store_true", default=False, help="no plot data", ) parser.add_argument( "-th", "--to-html", action="store_true", default=False, help="create an html with the plots", ) args = parser.parse_args() dic_data: Dict[str, Any] = get_data( file_path=os.path.join(DATA_REPOSITORY, "dress.json"), refresh=args.refresh ) dic_data_grouped: Dict[dt, Any] = group_by_age_date(dic_data) np_data, np_date = get_np_data(dic_data_grouped) analyse(np_data, np_date) if not args.no_plot: os.makedirs(OUTPUT_REPOSITORY, exist_ok=True) plot_fields_args = get_age_vac_args() f_fields = partial(plot_fields_by_age_vac, np_data, np_date) plot_vac_percent_age_args = get_age_field_args() f_bars = partial(plot_bar_data_by_age_field, np_data, np_date) with Pool(2) as pool: pool.starmap(f_fields, plot_fields_args) pool.starmap(f_bars, plot_vac_percent_age_args) for field in Field: plot_cumulative_field(np_data, np_date, field) plot_bar_age_percent_by_field(np_data, np_date, field) if args.to_html: generate_html_page()