covid-plotter/drees.py

704 lines
23 KiB
Python

import argparse
import json
import logging
import os
import re
from collections import namedtuple
from datetime import datetime as dt
from enum import Enum
from functools import partial
from multiprocessing import Pool
from typing import Any, Dict, List, Optional, OrderedDict, Tuple, Union
import numpy as np
import requests
from jinja2 import Environment, FileSystemLoader, select_autoescape
from matplotlib import dates as md
from matplotlib import pyplot as plt
FORMAT = "%(asctime)s - %(levelname)s - %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
DATE_FORMAT = "%Y-%m-%d"
DATA_URL = "https://data.drees.solidarites-sante.gouv.fr/api/records/1.0/search/?dataset=covid-19-resultats-par-age-issus-des-appariements-entre-si-vic-si-dep-et-vac-si&q=&rows=-1&facet=date&facet=vac_statut&facet=age"
DATA_REPOSITORY = "data"
STATIC_REPOSITORY = "static"
OUTPUT_REPOSITORY = os.path.join(STATIC_REPOSITORY, "plots")
BUILD_REPOSITORY = "build"
TMP_SUFFIX = ".tmp"
FORMAT_SUFFIX = ".png"
OUTPUT_SUFFIX = f"{TMP_SUFFIX}{FORMAT_SUFFIX}"
TMP_FILE_REGEX = re.compile(r"^.*{}$".format(OUTPUT_SUFFIX.replace(".", "\.")))
MAIN_URL = "https://covid.thegux.fr/"
# MAIN_URL = "/home/romain/code/covid-plotter/" # to debug (adjust with your local path)
# cycler could be better, but for ages plots it's ok
AGE_COLORS = {
0: "pink",
1: "green",
2: "blue",
3: "red",
4: "gray",
}
class DreesEnum(bytes, Enum):
def __new__(cls, value, label):
obj = bytes.__new__(cls, [value])
obj._value_ = value
obj.label = label
return obj
class Field(DreesEnum):
HC = (0, "Hospitalisations")
SC = (1, "Soins critiques")
DC = (2, "Décés")
class Quota(DreesEnum):
EFFECTIF = (0 + len(Field), "Effectif")
class VacStatus(DreesEnum):
"""
WARN: Be careful, after refreshing dataset, some VacStatus can changed
"""
NC = (0, "Non-vaccinés")
PDR = (1, "Primo dose récente")
PDE = (2, "Primo dose efficace")
CM3MSR = (3, "Complet de moins de 3 mois - sans rappel")
CM3MAR = (4, "Complet - avec 1 rappel de moins de 3 mois")
CM36MSR = (5, "Complet entre 3 mois et 6 mois - sans rappel")
CM36MAR = (6, "Complet - avec 1 rappel entre 3 mois et 6 mois")
C6MAR = (7, "Complet - avec 1 rappel de 6 mois ou plus")
C6MSR = (8, "Complet de 6 mois ou plus - sans rappel")
CM3MAR2 = (9, "Complet - avec 2 rappel de moins de 3 mois")
CM36MAR2 = (10, "Complet - avec 2 rappel entre 3 mois et 6 mois")
C6MAR2 = (11, "Complet - avec 2 rappel de 6 mois ou plus")
class AgeGroup(DreesEnum):
VERY_YOUNG = (0, "[0,19]")
YOUNG = (1, "[20,39]")
MID_OLD = (2, "[40,59]")
OLD = (3, "[60,79]")
VERY_OLD = (4, "[80;+]")
# namedtuple used to store stats (could be better...)
VaccineMean = namedtuple("VaccineMean", ["age", "field", "percent"])
AgeMean = namedtuple("AgeMean", ["age", "field", "percent"])
def get_data(
file_path: Optional[str] = None,
extension: Optional[str] = "json",
refresh=False,
) -> Dict[str, Any]:
"""
collect covid data by age from DREES
src: DATA_URL
"""
os.makedirs(DATA_REPOSITORY, exist_ok=True)
data_url = DATA_URL.format(extension=extension)
if data_url.endswith("/"):
data_url = data_url[:-1]
file_path = (
os.path.join(DATA_REPOSITORY, data_url.split("/")[-1])
if file_path is None
else file_path
)
if not os.path.isfile(file_path) or refresh:
logging.info("fetching data...")
r = requests.get(data_url)
if not r.content:
raise ValueError("no data provided froim the url : {}".format(data_url))
with open(file_path, "wb") as f:
f.write(r.content)
return json.loads(r.content)
logging.info(f"opening {file_path}...")
return json.load(open(file_path, "rb"))
def get_enum_vac_status(value):
for vac_status in VacStatus:
if vac_status.label == value:
return vac_status.value
raise Exception(f"vac status : {value} does not exit in enum 'VacStatus'")
def get_enum_age(value):
for age_group in AgeGroup:
if age_group.label == value:
return age_group.value
raise Exception(f"age : {value} does not exit in enum 'AgeGroup'")
def get_enum_field(value):
for field in Field:
if field.name.lower() == value:
return field.value
for quota in Quota:
if quota.name.lower() == value:
return quota.value
raise Exception(f"field : {value} does not exit in enum 'Field'")
def structure_data(data: Dict[str, Any]) -> Dict[dt, Any]:
"""
struture the original dictionnary into a more readable one
'date': {
'age' : {
'vac_status' : {
'hc',
'sc',
'dc',
...
}
}
}
"""
logging.info("restructuring the data...")
dic_data: Dict[dt, Any] = OrderedDict()
for row in data["records"]:
row_fields = row["fields"]
date = dt.strptime(row_fields["date"], DATE_FORMAT)
age = row_fields["age"]
vac_status = row_fields["vac_statut"]
if date not in dic_data:
dic_data[date] = OrderedDict()
if age not in dic_data[date]:
dic_data[date][age] = OrderedDict()
if vac_status not in dic_data[date][age]:
dic_data[date][age][vac_status] = OrderedDict()
for field in Field:
field_name = field.name.lower()
dic_data[date][age][vac_status][field_name] = row_fields[field_name]
for quota in Quota:
quota_name = quota.name.lower()
dic_data[date][age][vac_status][quota_name] = row_fields[quota_name]
# order `dic_data` date keys in ascending order
dic_data = OrderedDict(sorted(dic_data.items(), key=lambda t: t[0]))
logging.info("data restructured")
return dic_data
def get_np_data(dic_data: Dict[dt, Any]) -> Tuple[np.ndarray, np.ndarray]:
"""
store the data in numpy data structure
"""
logging.info("storing data in numpy data structure...")
np_data = np.empty(
(len(dic_data), len(AgeGroup), len(VacStatus), len(Field) + len(Quota))
)
np_date = np.empty((len(dic_data)), dtype="datetime64[s]")
for idx_date, (date, dic_age) in enumerate(dic_data.items()):
np_date[idx_date] = date
for age, dic_vac in dic_age.items():
idx_age = get_enum_age(age)
for vac, dic_field in dic_vac.items():
idx_vac = get_enum_vac_status(vac)
for field, value in dic_field.items():
idx_field = get_enum_field(field)
np_data[idx_date, idx_age, idx_vac, idx_field] = value
logging.info("date and data generated")
date_start = np_date[0]
date_end = np_date[len(np_date) - 1]
logging.info(f"range period : {date_start} - {date_end}")
# set 'effectif' equals to 0 if effectif < 1 (0.04 means nothing...)
quota_mask = np_data[:, :, :, 3] < 1
np_data[quota_mask] = 0
return np_data, np_date
def split_by_vac_status(np_data: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
split data to get vaccine data (all vaccine status) and unvaccine data (no vaccine)
"""
return np.sum(np_data[:, :, 1:, :], axis=2), np_data[:, :, VacStatus.NC.value, :]
def get_vaccine_status_distribution(
np_data: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray]:
"""
get the vaccine distribution over the whole time period by age and field
the vaccine data holds all the vaccine status except unvaccine
"""
np_data_vac, np_data_unvac = split_by_vac_status(np_data)
np_vac_distri = np_data_vac / np.sum(np_data, axis=2)
np_unvac_distri = np_data_unvac / np.sum(np_data, axis=2)
return np_vac_distri, np_unvac_distri
def get_distribution_age_by_field_and_vac_status(
np_data: np.ndarray, field: Field
) -> Tuple[np.ndarray, np.ndarray]:
"""
get distribution age (percent) by field grouped by vaccine status
"""
np_age_vac_percent = np.empty((len(np_data), len(AgeGroup)))
np_age_unvac_percent = np.copy(np_age_vac_percent)
np_data_vac, np_data_unvac = split_by_vac_status(np_data)
for idx_date in range(len(np_data_vac)):
sum_effectif = np.nansum(np_data_vac[idx_date, :, field.value])
for age_group in AgeGroup:
np_age_vac_percent[idx_date, age_group.value] = np.round(
(np_data_vac[idx_date, age_group.value, field.value] / sum_effectif)
* 100,
2,
)
for idx_date in range(len(np_data_unvac)):
sum_effectif = np.nansum(np_data_unvac[idx_date, :, field.value])
for age_group in AgeGroup:
np_age_unvac_percent[idx_date, age_group.value] = np.round(
(np_data_unvac[idx_date, age_group.value, field.value] / sum_effectif)
* 100,
2,
)
return np_age_vac_percent, np_age_unvac_percent
def get_distribution_age_by_field(np_data: np.ndarray, field: Field) -> np.ndarray:
"""
get age distribution (percent) over the whole period by field
"""
np_percent_age = np.empty((len(np_data), len(AgeGroup)))
for idx_date in range(len(np_data)):
sum_effectif = np.nansum(
np.nansum(np_data[idx_date, :, :, field.value], axis=1)
)
for age_group in AgeGroup:
np_percent_age[idx_date, age_group.value] = np.round(
(
np.sum(np_data[idx_date, age_group.value, :, field.value], axis=0)
/ sum_effectif
)
* 100,
2,
)
return np_percent_age
def get_plot_fig(
grid: Optional[bool] = True,
date_format: Optional[str] = DATE_FORMAT,
figsize: Optional[Tuple[int, int]] = None,
locator: Optional[Any] = md.MonthLocator(),
auto_date_fmt: Optional[bool] = True,
) -> plt.figure:
"""
return pyplot fig, ax to plot data over range period with date formatting
"""
fig, ax = plt.subplots(figsize=figsize)
ax.grid(grid)
date_formatter = md.DateFormatter(date_format)
ax.xaxis.set_major_locator(locator)
ax.xaxis.set_major_formatter(date_formatter)
if auto_date_fmt:
fig.autofmt_xdate()
return fig, ax
def save_and_close_fig(
fig: plt.figure,
output_path: str,
has_legend: Optional[bool] = True,
is_tight: Optional[bool] = True,
):
logging.info(f"plotting : {output_path}...")
if has_legend:
plt.legend()
if is_tight:
plt.tight_layout()
plt.savefig(f"{output_path}{OUTPUT_SUFFIX}")
plt.close(fig)
logging.info(f"{output_path} plotted")
def analyse(np_data: np.ndarray) -> List[Union[VaccineMean, AgeMean]]:
"""
analyse DREES dataset
useful stats can be compute here if no plots needed
"""
logging.info("analysing data...")
lst_analyse_data: List[Union[VaccineMean, AgeMean]] = list()
np_vac_distri, _ = get_vaccine_status_distribution(np_data)
logging.info(
"--- field distribution by age and only vaccine status (averaged over the whole period) ---"
)
for age_group in AgeGroup:
for field in Field:
vac_percent_mean = np.round(
np.nanmean(np_vac_distri[:, age_group.value, field.value]) * 100, 2
)
print(f"{field.name} - {age_group.label} - vac : {vac_percent_mean}%")
lst_analyse_data.append(
VaccineMean(age_group.label, field.label, vac_percent_mean)
)
logging.info(
"--- age distribution by field and vac status (averaged over the whole period) ---"
)
for field in Field:
np_age_percent = get_distribution_age_by_field(np_data, field)
(
np_percent_age_vac,
np_percent_age_unvac,
) = get_distribution_age_by_field_and_vac_status(np_data, field)
for age_group in AgeGroup:
percent_age_mean = np.round(
np.nanmean(np_age_percent[:, age_group.value]), 2
)
print(f"age: {age_group.label} - field: {field.name} = {percent_age_mean}%")
lst_analyse_data.append(
AgeMean(age_group.label, field.label, percent_age_mean)
)
percent_age_vac_mean = np.round(
np.nanmean(np_percent_age_vac[:, age_group.value]), 2
)
print(
f"age: {age_group.label} - status: vac - field: {field.name} = {percent_age_vac_mean}%"
)
percent_age_unvac_mean = np.round(
np.nanmean(np_percent_age_unvac[:, age_group.value]), 2
)
print(
f"age: {age_group.label} - status: unvac - field: {field.name} = {percent_age_unvac_mean}%"
)
return lst_analyse_data
def plot_bar_age_distribution_by_field_and_vac_status(
np_data: np.ndarray,
np_date: np.ndarray,
field: Field,
is_vac: Optional[bool] = True,
) -> None:
"""
plot age distribution distribution (percent) by field and vaccine status
"""
fig, ax = get_plot_fig(figsize=(22, 8), locator=md.WeekdayLocator())
bottom = np_data[:, 0]
suffix = "vac" if is_vac else "unvac"
title = "Vaccinés" if is_vac else "Non vaccinés"
for age_group in AgeGroup:
percents_age = np_data[:, age_group.value]
if age_group.value > 0:
ax.bar(
np_date,
percents_age,
label=age_group.label,
bottom=bottom,
color=AGE_COLORS[age_group.value],
)
bottom += percents_age
else:
ax.bar(
np_date,
percents_age,
label=age_group.label,
color=AGE_COLORS[age_group.value],
)
ax.set_ylabel("%")
ax.set_title(f"{field.label} - {title}")
plt.legend(
[age_group.label for age_group in AgeGroup], loc="upper right", frameon=True
)
save_and_close_fig(
fig,
os.path.join(OUTPUT_REPOSITORY, f"age_percent_{suffix}_{field.name.lower()}"),
has_legend=False,
)
def plot_bar_age_distribution_by_field(
np_data: np.ndarray, np_date: np.ndarray, field: Field
) -> None:
"""
plot age distribution (percent) by field
"""
(
np_age_vac_percent,
np_age_unvac_percent,
) = get_distribution_age_by_field_and_vac_status(np_data, field)
plot_bar_age_distribution_by_field_and_vac_status(
np_age_vac_percent, np_date, field
)
plot_bar_age_distribution_by_field_and_vac_status(
np_age_unvac_percent, np_date, field, is_vac=False
)
def plot_cumulative_field(
np_data: np.ndarray, np_date: np.ndarray, field: Field
) -> None:
"""
plot cumulative field by age and vaccine status (cases per million)
"""
np_data_vac, np_data_unvac = split_by_vac_status(np_data)
for age_group in AgeGroup:
fig, _ = get_plot_fig(auto_date_fmt=False)
np_cumulate_vac: np.ndarray = np.cumsum(
np_data_vac[:, age_group.value, field.value], axis=0
)
np_cumulate_unvac: np.ndarray = np.cumsum(
np_data_unvac[:, age_group.value, field.value], axis=0
)
plt.plot(np_date, np_cumulate_vac, label=f"Vaccinés")
plt.plot(np_date, np_cumulate_unvac, label=f"Non vaccinés")
plt.title(f"{age_group.label} - {field.label}")
plt.xlabel("Date")
plt.ylabel("Nombre de cas")
plt.xticks(rotation=30)
save_and_close_fig(
fig,
os.path.join(
OUTPUT_REPOSITORY,
f"cumulative_{age_group.name.lower()}_{field.name.lower()}",
),
)
def plot_fields_by_age_vac(
np_data: np.ndarray, np_date: np.ndarray, age_group: AgeGroup, vac_status: VacStatus
) -> None:
"""
plot field data by age and vaccine status (cases per million)
"""
fig, _ = get_plot_fig(auto_date_fmt=False)
for field in Field:
np_result = (
10e6
* np_data[:, age_group.value, vac_status.value, field.value]
/ np_data[:, age_group.value, vac_status.value, Quota.EFFECTIF.value]
)
plt.plot(
np_date, np_result, label=f"{field.label}", linestyle="dotted", linewidth=2
)
plt.xlabel("Date")
plt.ylabel("Cas par million de personnes")
plt.xticks(rotation=30)
plt.title(f"{age_group.label} - {vac_status.label}")
save_and_close_fig(
fig,
os.path.join(
OUTPUT_REPOSITORY, f"all_{age_group.name.lower()}_{vac_status.name.lower()}"
),
)
def plot_bar_vaccine_status_distribution_by_age_field(
np_data: np.ndarray,
np_date: np.ndarray,
age_group: AgeGroup,
field: Field,
) -> None:
"""
display vaccine/unvaccine distribution (percent) over the whole period by age and field
"""
np_vac_distri, np_unvac_distri = get_vaccine_status_distribution(np_data)
# adjust the fig size to display correctly bars and labels
fig, ax = get_plot_fig(figsize=(22, 8), locator=md.WeekdayLocator())
for idx_date, date in enumerate(np_date):
vac_percent = np.round(
np_vac_distri[idx_date, age_group.value, field.value] * 100, 2
)
unvac_percent = np.round(
np_unvac_distri[idx_date, age_group.value, field.value] * 100, 2
)
bar_vac = ax.bar(date, vac_percent, color="b", label="Vaccinés")
ax.bar(date, unvac_percent, bottom=vac_percent, color="r", label="Non vaccinés")
if vac_percent not in (0, 100):
ax.bar_label(
bar_vac, label_type="edge", color="black", fontsize="6.5", fmt="%.0f"
)
ax.set_ylabel("%")
ax.set_title(f"{age_group.label} - {field.label}")
plt.legend(["Vaccinés", "Non vaccinés"], loc="upper right", frameon=True)
save_and_close_fig(
fig,
os.path.join(
OUTPUT_REPOSITORY,
f"vac_percent_{age_group.name.lower()}_{field.name.lower()}",
),
has_legend=False,
)
def check_timestep(np_date: np.ndarray):
# get the difference between each element (return timedelta64 array)
np_diff = np.diff(np_date)
# check if all timestep are equals
assert np.all(np_diff == np_diff[0]), "some timesteps missing !"
def get_age_vac_args() -> List[Tuple[AgeGroup, VacStatus]]:
"""
build pool age and vac status arguments
"""
pool_args: List[Tuple[AgeGroup, VacStatus]] = list()
for age_group in AgeGroup:
for vac_status in VacStatus:
pool_args.append((age_group, vac_status))
return pool_args
def get_age_field_args() -> List[Tuple[AgeGroup, Field]]:
"""
build pool age and field arguments
"""
pool_args: List[Tuple[AgeGroup, Field]] = list()
for age_group in AgeGroup:
for field in Field:
pool_args.append((age_group, field))
return pool_args
def get_field_args() -> List[Tuple[Field]]:
"""
build pool field arguments
"""
pool_args: List[Tuple[Field]] = list()
for field in Field:
pool_args.append((field,))
return pool_args
def move_tmp_plots() -> None:
"""
move .tmp.png plots into .png after generation
"""
logging.info(f"moving '{OUTPUT_SUFFIX}' file in {FORMAT_SUFFIX}...")
for filename in os.listdir(OUTPUT_REPOSITORY):
file_path = os.path.join(OUTPUT_REPOSITORY, filename)
if re.match(TMP_FILE_REGEX, filename):
os.rename(file_path, file_path.replace(OUTPUT_SUFFIX, FORMAT_SUFFIX))
logging.info("files moved")
def generate_html_page(
np_date: np.ndarray, lst_analyse_data: List[Union[VaccineMean, AgeMean]]
) -> None:
logging.info("generating html page with plots...")
os.makedirs(BUILD_REPOSITORY, exist_ok=True)
env = Environment(
loader=FileSystemLoader("templates"), autoescape=select_autoescape()
)
template = env.get_template("index.template.html")
date_start = np_date[0].astype(dt).strftime(DATE_FORMAT)
date_end = np_date[-1].astype(dt).strftime(DATE_FORMAT)
date_build = dt.strftime(dt.now(), "%Y%m%d")
owid_path = f"fra-{date_build}.png"
data = template.render(
**{
"fields": Field,
"ages": AgeGroup,
"status": VacStatus,
"static": os.path.join(MAIN_URL, STATIC_REPOSITORY),
"src": DATA_URL,
"period": f"{date_start} - {date_end}",
"vaccine_mean": [x for x in lst_analyse_data if type(x) == VaccineMean],
"age_mean": [x for x in lst_analyse_data if type(x) == AgeMean],
"owid_path": owid_path
if os.path.isfile(os.path.join(OUTPUT_REPOSITORY, owid_path))
else "",
}
)
with open(os.path.join(BUILD_REPOSITORY, "index.html"), "w") as f:
f.write(data)
logging.info("html page build")
if __name__ == "__main__":
"""
This script aims to analyse and plot DREES data
Stats availables:
- Age distribution (percent) by field (vaccine and unvaccine)
- Vaccine/unvaccine distribution (percent) by field and age
Plots availables :
- cumulative hc, sc, dc by age and vaccine status
- hc, sc, dc by vaccine status and age (cases per million)
- hc, sc, dc (vaccine/unvaccine percent distribution) by age
- hc, sc, dc (age percent distribution) by field
Main indicators are :
- hospitalisations (hc)
- criticals (sc)
- deaths (dc)
hc, sc, dc include positive PCR tests
"""
parser = argparse.ArgumentParser()
parser.add_argument(
"-r",
"--refresh",
action="store_true",
default=False,
help="redownload data for updates",
)
parser.add_argument(
"-np",
"--no-plot",
action="store_true",
default=False,
help="no plot data",
)
parser.add_argument(
"-th",
"--to-html",
action="store_true",
default=False,
help="create an html with the plots",
)
args = parser.parse_args()
dic_data_unstructured: Dict[str, Any] = get_data(
file_path=os.path.join(DATA_REPOSITORY, "drees.json"), refresh=args.refresh
)
dic_data: Dict[dt, Any] = structure_data(dic_data_unstructured)
np_data, np_date = get_np_data(dic_data)
lst_analyse_data = analyse(np_data)
check_timestep(np_date)
if not args.no_plot:
os.makedirs(OUTPUT_REPOSITORY, exist_ok=True)
f_fields = partial(plot_fields_by_age_vac, np_data, np_date)
f_bars_vaccine = partial(
plot_bar_vaccine_status_distribution_by_age_field, np_data, np_date
)
f_bars_age = partial(plot_bar_age_distribution_by_field, np_data, np_date)
f_cumulate = partial(plot_cumulative_field, np_data, np_date)
with Pool(2) as pool:
pool.starmap(f_fields, get_age_vac_args())
pool.starmap(f_bars_vaccine, get_age_field_args())
pool.starmap(f_bars_age, get_field_args())
pool.starmap(f_cumulate, get_field_args())
move_tmp_plots()
if args.to_html:
generate_html_page(np_date, lst_analyse_data)