import argparse import json import logging import os import re from collections import namedtuple from datetime import datetime as dt from enum import Enum from functools import partial from multiprocessing import Pool from typing import Any, Dict, List, Optional, OrderedDict, Tuple, Union import numpy as np import requests from jinja2 import Environment, FileSystemLoader, select_autoescape from matplotlib import dates as md from matplotlib import pyplot as plt FORMAT = "%(asctime)s - %(levelname)s - %(message)s" logging.basicConfig(format=FORMAT, level=logging.INFO) DATE_FORMAT = "%Y-%m-%d" DATA_URL = "https://data.drees.solidarites-sante.gouv.fr/api/records/1.0/search/?dataset=covid-19-resultats-par-age-issus-des-appariements-entre-si-vic-si-dep-et-vac-si&q=&rows=-1&facet=date&facet=vac_statut&facet=age" DATA_REPOSITORY = "data" STATIC_REPOSITORY = "static" OUTPUT_REPOSITORY = os.path.join(STATIC_REPOSITORY, "plots") BUILD_REPOSITORY = "build" TMP_SUFFIX = ".tmp" FORMAT_SUFFIX = ".png" OUTPUT_SUFFIX = f"{TMP_SUFFIX}{FORMAT_SUFFIX}" TMP_FILE_REGEX = re.compile(r"^.*{}$".format(OUTPUT_SUFFIX.replace(".", "\."))) MAIN_URL = "https://covid.thegux.fr/" # MAIN_URL = "/home/romain/code/covid-plotter/" # to debug (adjust with your local path) # cycler could be better, but for ages plots it's ok AGE_COLORS = { 0: "pink", 1: "green", 2: "blue", 3: "red", 4: "gray", } class DreesEnum(bytes, Enum): def __new__(cls, value, label): obj = bytes.__new__(cls, [value]) obj._value_ = value obj.label = label return obj class Field(DreesEnum): HC = (0, "Hospitalisations") SC = (1, "Soins critiques") DC = (2, "Décés") class Quota(DreesEnum): EFFECTIF = (0 + len(Field), "Effectif") class VacStatus(DreesEnum): """ WARN: Be careful, after refreshing dataset, some VacStatus can changed """ NC = (0, "Non-vaccinés") PDR = (1, "Primo dose récente") PDE = (2, "Primo dose efficace") CM3MSR = (3, "Complet de moins de 3 mois - sans rappel") CM3MAR = (4, "Complet - avec 1 rappel de moins de 3 mois") CM36MSR = (5, "Complet entre 3 mois et 6 mois - sans rappel") CM36MAR = (6, "Complet - avec 1 rappel entre 3 mois et 6 mois") C6MAR = (7, "Complet - avec 1 rappel de 6 mois ou plus") C6MSR = (8, "Complet de 6 mois ou plus - sans rappel") CM3MAR2 = (9, "Complet - avec 2 rappel de moins de 3 mois") CM36MAR2 = (10, "Complet - avec 2 rappel entre 3 mois et 6 mois") C6MAR2 = (11, "Complet - avec 2 rappel de 6 mois ou plus") class AgeGroup(DreesEnum): VERY_YOUNG = (0, "[0,19]") YOUNG = (1, "[20,39]") MID_OLD = (2, "[40,59]") OLD = (3, "[60,79]") VERY_OLD = (4, "[80;+]") # namedtuple used to store stats (could be better...) VaccineMean = namedtuple("VaccineMean", ["age", "field", "percent"]) AgeMean = namedtuple("AgeMean", ["age", "field", "percent"]) def get_data( file_path: Optional[str] = None, extension: Optional[str] = "json", refresh=False, ) -> Dict[str, Any]: """ collect covid data by age from DREES src: DATA_URL """ os.makedirs(DATA_REPOSITORY, exist_ok=True) data_url = DATA_URL.format(extension=extension) if data_url.endswith("/"): data_url = data_url[:-1] file_path = ( os.path.join(DATA_REPOSITORY, data_url.split("/")[-1]) if file_path is None else file_path ) if not os.path.isfile(file_path) or refresh: logging.info("fetching data...") r = requests.get(data_url) if not r.content: raise ValueError("no data provided froim the url : {}".format(data_url)) with open(file_path, "wb") as f: f.write(r.content) return json.loads(r.content) logging.info(f"opening {file_path}...") return json.load(open(file_path, "rb")) def get_enum_vac_status(value): for vac_status in VacStatus: if vac_status.label == value: return vac_status.value raise Exception(f"vac status : {value} does not exit in enum 'VacStatus'") def get_enum_age(value): for age_group in AgeGroup: if age_group.label == value: return age_group.value raise Exception(f"age : {value} does not exit in enum 'AgeGroup'") def get_enum_field(value): for field in Field: if field.name.lower() == value: return field.value for quota in Quota: if quota.name.lower() == value: return quota.value raise Exception(f"field : {value} does not exit in enum 'Field'") def structure_data(data: Dict[str, Any]) -> Dict[dt, Any]: """ struture the original dictionnary into a more readable one 'date': { 'age' : { 'vac_status' : { 'hc', 'sc', 'dc', ... } } } """ logging.info("restructuring the data...") dic_data: Dict[dt, Any] = OrderedDict() for row in data["records"]: row_fields = row["fields"] date = dt.strptime(row_fields["date"], DATE_FORMAT) age = row_fields["age"] vac_status = row_fields["vac_statut"] if date not in dic_data: dic_data[date] = OrderedDict() if age not in dic_data[date]: dic_data[date][age] = OrderedDict() if vac_status not in dic_data[date][age]: dic_data[date][age][vac_status] = OrderedDict() for field in Field: field_name = field.name.lower() dic_data[date][age][vac_status][field_name] = row_fields[field_name] for quota in Quota: quota_name = quota.name.lower() dic_data[date][age][vac_status][quota_name] = row_fields[quota_name] # order `dic_data` date keys in ascending order dic_data = OrderedDict(sorted(dic_data.items(), key=lambda t: t[0])) logging.info("data restructured") return dic_data def get_np_data(dic_data: Dict[dt, Any]) -> Tuple[np.ndarray, np.ndarray]: """ store the data in numpy data structure """ logging.info("storing data in numpy data structure...") np_data = np.empty( (len(dic_data), len(AgeGroup), len(VacStatus), len(Field) + len(Quota)) ) np_date = np.empty((len(dic_data)), dtype="datetime64[s]") for idx_date, (date, dic_age) in enumerate(dic_data.items()): np_date[idx_date] = date for age, dic_vac in dic_age.items(): idx_age = get_enum_age(age) for vac, dic_field in dic_vac.items(): idx_vac = get_enum_vac_status(vac) for field, value in dic_field.items(): idx_field = get_enum_field(field) np_data[idx_date, idx_age, idx_vac, idx_field] = value logging.info("date and data generated") date_start = np_date[0] date_end = np_date[len(np_date) - 1] logging.info(f"range period : {date_start} - {date_end}") # set 'effectif' equals to 0 if effectif < 1 (0.04 means nothing...) quota_mask = np_data[:, :, :, 3] < 1 np_data[quota_mask] = 0 return np_data, np_date def split_by_vac_status(np_data: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """ split data to get vaccine data (all vaccine status) and unvaccine data (no vaccine) """ return np.sum(np_data[:, :, 1:, :], axis=2), np_data[:, :, VacStatus.NC.value, :] def get_vaccine_status_distribution( np_data: np.ndarray, ) -> Tuple[np.ndarray, np.ndarray]: """ get the vaccine distribution over the whole time period by age and field the vaccine data holds all the vaccine status except unvaccine """ np_data_vac, np_data_unvac = split_by_vac_status(np_data) np_vac_distri = np_data_vac / np.sum(np_data, axis=2) np_unvac_distri = np_data_unvac / np.sum(np_data, axis=2) return np_vac_distri, np_unvac_distri def get_distribution_age_by_field_and_vac_status( np_data: np.ndarray, field: Field ) -> Tuple[np.ndarray, np.ndarray]: """ get distribution age (percent) by field grouped by vaccine status """ np_age_vac_percent = np.empty((len(np_data), len(AgeGroup))) np_age_unvac_percent = np.copy(np_age_vac_percent) np_data_vac, np_data_unvac = split_by_vac_status(np_data) for idx_date in range(len(np_data_vac)): sum_effectif = np.nansum(np_data_vac[idx_date, :, field.value]) for age_group in AgeGroup: np_age_vac_percent[idx_date, age_group.value] = np.round( (np_data_vac[idx_date, age_group.value, field.value] / sum_effectif) * 100, 2, ) for idx_date in range(len(np_data_unvac)): sum_effectif = np.nansum(np_data_unvac[idx_date, :, field.value]) for age_group in AgeGroup: np_age_unvac_percent[idx_date, age_group.value] = np.round( (np_data_unvac[idx_date, age_group.value, field.value] / sum_effectif) * 100, 2, ) return np_age_vac_percent, np_age_unvac_percent def get_distribution_age_by_field(np_data: np.ndarray, field: Field) -> np.ndarray: """ get age distribution (percent) over the whole period by field """ np_percent_age = np.empty((len(np_data), len(AgeGroup))) for idx_date in range(len(np_data)): sum_effectif = np.nansum( np.nansum(np_data[idx_date, :, :, field.value], axis=1) ) for age_group in AgeGroup: np_percent_age[idx_date, age_group.value] = np.round( ( np.sum(np_data[idx_date, age_group.value, :, field.value], axis=0) / sum_effectif ) * 100, 2, ) return np_percent_age def get_plot_fig( grid: Optional[bool] = True, date_format: Optional[str] = DATE_FORMAT, figsize: Optional[Tuple[int, int]] = None, locator: Optional[Any] = md.MonthLocator(), auto_date_fmt: Optional[bool] = True, ) -> plt.figure: """ return pyplot fig, ax to plot data over range period with date formatting """ fig, ax = plt.subplots(figsize=figsize) ax.grid(grid) date_formatter = md.DateFormatter(date_format) ax.xaxis.set_major_locator(locator) ax.xaxis.set_major_formatter(date_formatter) if auto_date_fmt: fig.autofmt_xdate() return fig, ax def save_and_close_fig( fig: plt.figure, output_path: str, has_legend: Optional[bool] = True, is_tight: Optional[bool] = True, ): logging.info(f"plotting : {output_path}...") if has_legend: plt.legend() if is_tight: plt.tight_layout() plt.savefig(f"{output_path}{OUTPUT_SUFFIX}") plt.close(fig) logging.info(f"{output_path} plotted") def analyse(np_data: np.ndarray) -> List[Union[VaccineMean, AgeMean]]: """ analyse DREES dataset useful stats can be compute here if no plots needed """ logging.info("analysing data...") lst_analyse_data: List[Union[VaccineMean, AgeMean]] = list() np_vac_distri, _ = get_vaccine_status_distribution(np_data) logging.info( "--- field distribution by age and only vaccine status (averaged over the whole period) ---" ) for age_group in AgeGroup: for field in Field: vac_percent_mean = np.round( np.nanmean(np_vac_distri[:, age_group.value, field.value]) * 100, 2 ) print(f"{field.name} - {age_group.label} - vac : {vac_percent_mean}%") lst_analyse_data.append( VaccineMean(age_group.label, field.label, vac_percent_mean) ) logging.info( "--- age distribution by field and vac status (averaged over the whole period) ---" ) for field in Field: np_age_percent = get_distribution_age_by_field(np_data, field) ( np_percent_age_vac, np_percent_age_unvac, ) = get_distribution_age_by_field_and_vac_status(np_data, field) for age_group in AgeGroup: percent_age_mean = np.round( np.nanmean(np_age_percent[:, age_group.value]), 2 ) print(f"age: {age_group.label} - field: {field.name} = {percent_age_mean}%") lst_analyse_data.append( AgeMean(age_group.label, field.label, percent_age_mean) ) percent_age_vac_mean = np.round( np.nanmean(np_percent_age_vac[:, age_group.value]), 2 ) print( f"age: {age_group.label} - status: vac - field: {field.name} = {percent_age_vac_mean}%" ) percent_age_unvac_mean = np.round( np.nanmean(np_percent_age_unvac[:, age_group.value]), 2 ) print( f"age: {age_group.label} - status: unvac - field: {field.name} = {percent_age_unvac_mean}%" ) return lst_analyse_data def plot_bar_age_distribution_by_field_and_vac_status( np_data: np.ndarray, np_date: np.ndarray, field: Field, is_vac: Optional[bool] = True, ) -> None: """ plot age distribution distribution (percent) by field and vaccine status """ fig, ax = get_plot_fig(figsize=(22, 8), locator=md.WeekdayLocator()) bottom = np_data[:, 0] suffix = "vac" if is_vac else "unvac" title = "Vaccinés" if is_vac else "Non vaccinés" for age_group in AgeGroup: percents_age = np_data[:, age_group.value] if age_group.value > 0: ax.bar( np_date, percents_age, label=age_group.label, bottom=bottom, color=AGE_COLORS[age_group.value], ) bottom += percents_age else: ax.bar( np_date, percents_age, label=age_group.label, color=AGE_COLORS[age_group.value], ) ax.set_ylabel("%") ax.set_title(f"{field.label} - {title}") plt.legend( [age_group.label for age_group in AgeGroup], loc="upper right", frameon=True ) save_and_close_fig( fig, os.path.join(OUTPUT_REPOSITORY, f"age_percent_{suffix}_{field.name.lower()}"), has_legend=False, ) def plot_bar_age_distribution_by_field( np_data: np.ndarray, np_date: np.ndarray, field: Field ) -> None: """ plot age distribution (percent) by field """ ( np_age_vac_percent, np_age_unvac_percent, ) = get_distribution_age_by_field_and_vac_status(np_data, field) plot_bar_age_distribution_by_field_and_vac_status( np_age_vac_percent, np_date, field ) plot_bar_age_distribution_by_field_and_vac_status( np_age_unvac_percent, np_date, field, is_vac=False ) def plot_cumulative_field( np_data: np.ndarray, np_date: np.ndarray, field: Field ) -> None: """ plot cumulative field by age and vaccine status (cases per million) """ np_data_vac, np_data_unvac = split_by_vac_status(np_data) for age_group in AgeGroup: fig, _ = get_plot_fig(auto_date_fmt=False) np_cumulate_vac: np.ndarray = np.cumsum( np_data_vac[:, age_group.value, field.value], axis=0 ) np_cumulate_unvac: np.ndarray = np.cumsum( np_data_unvac[:, age_group.value, field.value], axis=0 ) plt.plot(np_date, np_cumulate_vac, label=f"Vaccinés") plt.plot(np_date, np_cumulate_unvac, label=f"Non vaccinés") plt.title(f"{age_group.label} - {field.label}") plt.xlabel("Date") plt.ylabel("Nombre de cas") plt.xticks(rotation=30) save_and_close_fig( fig, os.path.join( OUTPUT_REPOSITORY, f"cumulative_{age_group.name.lower()}_{field.name.lower()}", ), ) def plot_fields_by_age_vac( np_data: np.ndarray, np_date: np.ndarray, age_group: AgeGroup, vac_status: VacStatus ) -> None: """ plot field data by age and vaccine status (cases per million) """ fig, _ = get_plot_fig(auto_date_fmt=False) for field in Field: np_result = ( 10e6 * np_data[:, age_group.value, vac_status.value, field.value] / np_data[:, age_group.value, vac_status.value, Quota.EFFECTIF.value] ) plt.plot( np_date, np_result, label=f"{field.label}", linestyle="dotted", linewidth=2 ) plt.xlabel("Date") plt.ylabel("Cas par million de personnes") plt.xticks(rotation=30) plt.title(f"{age_group.label} - {vac_status.label}") save_and_close_fig( fig, os.path.join( OUTPUT_REPOSITORY, f"all_{age_group.name.lower()}_{vac_status.name.lower()}" ), ) def plot_bar_vaccine_status_distribution_by_age_field( np_data: np.ndarray, np_date: np.ndarray, age_group: AgeGroup, field: Field, ) -> None: """ display vaccine/unvaccine distribution (percent) over the whole period by age and field """ np_vac_distri, np_unvac_distri = get_vaccine_status_distribution(np_data) # adjust the fig size to display correctly bars and labels fig, ax = get_plot_fig(figsize=(22, 8), locator=md.WeekdayLocator()) for idx_date, date in enumerate(np_date): vac_percent = np.round( np_vac_distri[idx_date, age_group.value, field.value] * 100, 2 ) unvac_percent = np.round( np_unvac_distri[idx_date, age_group.value, field.value] * 100, 2 ) bar_vac = ax.bar(date, vac_percent, color="b", label="Vaccinés") ax.bar(date, unvac_percent, bottom=vac_percent, color="r", label="Non vaccinés") if vac_percent not in (0, 100): ax.bar_label( bar_vac, label_type="edge", color="black", fontsize="6.5", fmt="%.0f" ) ax.set_ylabel("%") ax.set_title(f"{age_group.label} - {field.label}") plt.legend(["Vaccinés", "Non vaccinés"], loc="upper right", frameon=True) save_and_close_fig( fig, os.path.join( OUTPUT_REPOSITORY, f"vac_percent_{age_group.name.lower()}_{field.name.lower()}", ), has_legend=False, ) def check_timestep(np_date: np.ndarray): # get the difference between each element (return timedelta64 array) np_diff = np.diff(np_date) # check if all timestep are equals assert np.all(np_diff == np_diff[0]), "some timesteps missing !" def get_age_vac_args() -> List[Tuple[AgeGroup, VacStatus]]: """ build pool age and vac status arguments """ pool_args: List[Tuple[AgeGroup, VacStatus]] = list() for age_group in AgeGroup: for vac_status in VacStatus: pool_args.append((age_group, vac_status)) return pool_args def get_age_field_args() -> List[Tuple[AgeGroup, Field]]: """ build pool age and field arguments """ pool_args: List[Tuple[AgeGroup, Field]] = list() for age_group in AgeGroup: for field in Field: pool_args.append((age_group, field)) return pool_args def get_field_args() -> List[Tuple[Field]]: """ build pool field arguments """ pool_args: List[Tuple[Field]] = list() for field in Field: pool_args.append((field,)) return pool_args def move_tmp_plots() -> None: """ move .tmp.png plots into .png after generation """ logging.info(f"moving '{OUTPUT_SUFFIX}' file in {FORMAT_SUFFIX}...") for filename in os.listdir(OUTPUT_REPOSITORY): file_path = os.path.join(OUTPUT_REPOSITORY, filename) if re.match(TMP_FILE_REGEX, filename): os.rename(file_path, file_path.replace(OUTPUT_SUFFIX, FORMAT_SUFFIX)) logging.info("files moved") def generate_html_page( np_date: np.ndarray, lst_analyse_data: List[Union[VaccineMean, AgeMean]] ) -> None: logging.info("generating html page with plots...") os.makedirs(BUILD_REPOSITORY, exist_ok=True) env = Environment( loader=FileSystemLoader("templates"), autoescape=select_autoescape() ) template = env.get_template("index.template.html") date_start = np_date[0].astype(dt).strftime(DATE_FORMAT) date_end = np_date[-1].astype(dt).strftime(DATE_FORMAT) date_build = dt.strftime(dt.now(), "%Y%m%d") owid_path = f"fra-{date_build}.png" data = template.render( **{ "fields": Field, "ages": AgeGroup, "status": VacStatus, "static": os.path.join(MAIN_URL, STATIC_REPOSITORY), "src": DATA_URL, "period": f"{date_start} - {date_end}", "vaccine_mean": [x for x in lst_analyse_data if type(x) == VaccineMean], "age_mean": [x for x in lst_analyse_data if type(x) == AgeMean], "owid_path": owid_path if os.path.isfile(os.path.join(OUTPUT_REPOSITORY, owid_path)) else "", } ) with open(os.path.join(BUILD_REPOSITORY, "index.html"), "w") as f: f.write(data) logging.info("html page build") if __name__ == "__main__": """ This script aims to analyse and plot DREES data Stats availables: - Age distribution (percent) by field (vaccine and unvaccine) - Vaccine/unvaccine distribution (percent) by field and age Plots availables : - cumulative hc, sc, dc by age and vaccine status - hc, sc, dc by vaccine status and age (cases per million) - hc, sc, dc (vaccine/unvaccine percent distribution) by age - hc, sc, dc (age percent distribution) by field Main indicators are : - hospitalisations (hc) - criticals (sc) - deaths (dc) hc, sc, dc include positive PCR tests """ parser = argparse.ArgumentParser() parser.add_argument( "-r", "--refresh", action="store_true", default=False, help="redownload data for updates", ) parser.add_argument( "-np", "--no-plot", action="store_true", default=False, help="no plot data", ) parser.add_argument( "-th", "--to-html", action="store_true", default=False, help="create an html with the plots", ) args = parser.parse_args() dic_data_unstructured: Dict[str, Any] = get_data( file_path=os.path.join(DATA_REPOSITORY, "drees.json"), refresh=args.refresh ) dic_data: Dict[dt, Any] = structure_data(dic_data_unstructured) np_data, np_date = get_np_data(dic_data) lst_analyse_data = analyse(np_data) check_timestep(np_date) if not args.no_plot: os.makedirs(OUTPUT_REPOSITORY, exist_ok=True) f_fields = partial(plot_fields_by_age_vac, np_data, np_date) f_bars_vaccine = partial( plot_bar_vaccine_status_distribution_by_age_field, np_data, np_date ) f_bars_age = partial(plot_bar_age_distribution_by_field, np_data, np_date) f_cumulate = partial(plot_cumulative_field, np_data, np_date) with Pool(2) as pool: pool.starmap(f_fields, get_age_vac_args()) pool.starmap(f_bars_vaccine, get_age_field_args()) pool.starmap(f_bars_age, get_field_args()) pool.starmap(f_cumulate, get_field_args()) move_tmp_plots() if args.to_html: generate_html_page(np_date, lst_analyse_data)