RKI-Style-Nowcasting

Thomas Viehmann [email protected]

Dieser Code und die Verfahren werden in https://lernapparat.de/rki-nowcasting/ dokumentiert.

Bitte beachten Sie die Hinweise dort zu den Bedingungen der Nutzung dieses Codes und der Berechnungsresultate.

In [1]:
import pandas
if 'get_ipython' in dir():
    INTERACTIVE = True
    %matplotlib inline
else:
    INTERACTIVE = False
from matplotlib import pyplot
import numpy
import datetime
import math
import matplotlib
import scipy.stats
#import seaborn
import pathlib
#seaborn.set()
import os
import shutil
import itertools

def reverse_cumprod(x):
    return x[::-1].cumprod()[::-1]
def reverse_cumsum(x):
    return x[::-1].cumsum()[::-1]

from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()

def smooth(s, size=4):
    return pandas.Series(numpy.convolve(s, numpy.full((size,), 1/size))[:-(size-1)], index=s.index, name=s.name)

bl_abbr = {"Deutschland": "D", "Baden-Württemberg": "BW", "Bayern": "BY", "Berlin": "BE", "Brandenburg": "BB", "Bremen": "HB", "Hamburg": "HH",
           "Hessen": "HE", "Mecklenburg-Vorpommern": "MV", "Niedersachsen": "NI", "Nordrhein-Westfalen": "NW",
           "Rheinland-Pfalz": "RP", "Saarland": "SL", "Sachsen": "SN", "Sachsen-Anhalt": "ST", "Schleswig-Holstein": "SH",
           "Thüringen": "TH"}

p_output = pathlib.Path('./outputs/')
In [2]:
def copyfile(src, dst):
    tmp = str(dst)+'.tmp'
    shutil.copy(src, tmp)
    os.replace(tmp, dst)
In [3]:
dfs = []
for date in pandas.date_range('2020-04-29', '2020-05-21', freq='D'):
    dstr = date.date().isoformat()
    adf = pandas.read_csv(f'data/rki_covid19.{dstr}.csv')
    adf['Datenstand'] = pandas.to_datetime(adf.Datenstand.apply(lambda s: '-'.join(s.split(',')[0].split('.')[::-1])))
    assert (date==adf['Datenstand'].min()) and (date==adf['Datenstand'].max())
    dfs.append(adf)
large_df = pandas.concat(dfs, axis=0, sort=True)
for k in ('Meldedatum', 'Datenstand', 'Refdatum'):
    large_df[k] = pandas.to_datetime(large_df[k])
large_df['delay'] = (large_df.Meldedatum - large_df.Refdatum).dt.days # nur gültig für IstErkrankungsbeginn==1
large_df['Eingangsdatum'] = large_df.Datenstand - pandas.Timedelta(days=1) # nur gültig für NeuerFall==1
large_df['delay_eingang'] = (large_df.Eingangsdatum - large_df.Refdatum).dt.days # nur gültig für NeuerFall==1


# df['Meldedatum'] = df.Meldedatum.dt.tz_localize(None)
In [ ]:
 
In [4]:
def plot_vergleich(eigene, vergleich, titel, fn=None):
    eigene_smooth = smooth(eigene)[vergleich.index.min():vergleich.index.max()]
    pyplot.figure(figsize=(15, 5))
    pyplot.suptitle(titel)
    pyplot.subplot(1,3,1)
    pyplot.plot(eigene_smooth, label='TV')
    pyplot.plot(vergleich, label='RKI')
    pyplot.xticks(rotation=90)
    pyplot.legend()
    pyplot.subplot(1,3,2)
    pyplot.plot(eigene_smooth[-20:], label='TV')
    pyplot.plot(vergleich[-20:], label='RKI')
    pyplot.xticks(rotation=90)
    pyplot.legend()
    pyplot.subplot(1,3,3)
    pyplot.plot(eigene_smooth[-20:]-vergleich[-20:], label='Differenz')
    pyplot.xticks(rotation=90)
    pyplot.legend()
    if fn is not None:
        pyplot.savefig(fn, bbox_inches = "tight")
    pyplot.show()
    pyplot.close()
In [5]:
def faelle_mit_erkrankungsbeginn_nach_erkrankungsbeginn(large_df):
    # eigentlich müsste es Delay zum jeweiligen Eingang sein. Kennen wir i.A. aber nicht
    known = large_df[(large_df.Datenstand == large_df.Datenstand.max())
                          & (large_df.IstErkrankungsbeginn == 1) 
                          & (large_df.NeuerFall >= 0)
                          & (large_df.delay >= 0)
                          & (large_df.delay <= 30)].groupby(['Refdatum']).AnzahlFall.sum()
    known = known.reindex(pandas.date_range('2020-01-01', large_df.Eingangsdatum.max()), fill_value=0)
    return known
In [ ]:
 
In [6]:
def faelle_ohne_erkrankungsbeginn_nach_rekonstruiertem_eingangsdatum(large_df, missing_tol=2):
    unbekannte_roh = large_df[(large_df.IstErkrankungsbeginn == 0)
                              & (large_df.NeuerFall >= 0)].groupby(['Datenstand', 'NeuerFall',  'Meldedatum', 'Altersgruppe', 'Geschlecht']).AnzahlFall.sum()
    unbekannte_roh = unbekannte_roh.reindex(pandas.MultiIndex.from_product(unbekannte_roh.index.levels, names=unbekannte_roh.index.names),
                                            fill_value=0)
    new_cases_melde = {}
    old_cases = None
    newer_sum = 0
    for d in unbekannte_roh.index.levels[0][::-1]:
        if old_cases is None:
            old_cases = unbekannte_roh[d, 0]
            new_cases = unbekannte_roh[d, 1]
        else:
            last_old_cases = old_cases
            old_cases = (old_cases - unbekannte_roh.loc[d, 1]).clip(lower=0)
            new_cases = last_old_cases - old_cases
        newer_sum += new_cases.sum()
        # Eingangsdatum = Datum vor Datenstand
        new_cases_melde[d - pandas.Timedelta(days=1)] = new_cases

    new_cases = pandas.DataFrame(new_cases_melde).sort_index(axis=1)
    new_new = new_cases.sum(level=[1,2])
    unknown_new_cases_per_day = pandas.concat([old_cases[:d-pandas.Timedelta(days=2)].unstack(level=0, fill_value=0), new_new], axis=1)

    missing = old_cases[d-pandas.Timedelta(days=1):]
    missing = missing[missing > 0]
    # Wir verlieren für D nach Alter/Geschlecht mind. 2 Fälle, die offenbar nie als neuer Fall markiert sind.
    # Für Teil-Daten ggf. mehr.
    print(missing.sum())
    assert missing.sum() <= missing_tol
    return unknown_new_cases_per_day
In [ ]:
 
In [ ]:
 
In [7]:
def verzug_von_erkrankung_bis_eingang(large_df):
    bekannte_roh = large_df[(large_df.IstErkrankungsbeginn == 1) 
                             & (large_df.NeuerFall == 1)
                             & (large_df.delay_eingang >= 0)
                             & (large_df.delay_eingang <= 30)].groupby(['Eingangsdatum',  'Altersgruppe', 'Geschlecht', 'delay_eingang']).AnzahlFall.sum()
    bekannte_roh = bekannte_roh.unstack(level=0, fill_value=0)
    bekannte_roh = bekannte_roh.reindex(pandas.MultiIndex.from_product(bekannte_roh.index.levels[:-1]+[pandas.RangeIndex(0, 31)], names=bekannte_roh.index.names),
                                            fill_value=0)
    bekannte_roh_old = large_df[(large_df.IstErkrankungsbeginn == 1) 
                             & (large_df.Datenstand == large_df.Datenstand.min())
                             & (large_df.NeuerFall == 0)
                             & (large_df.delay >= 0)
                             & (large_df.delay <= 30)].groupby(['Meldedatum', 'Altersgruppe', 'Geschlecht', 'delay']).AnzahlFall.sum()
    bekannte_roh_old = bekannte_roh_old.reindex(pandas.MultiIndex.from_product(bekannte_roh_old.index.levels, names=bekannte_roh_old.index.names),
                                            fill_value=0).unstack(level=0)
    # wir verlieren einen Fall
    assert bekannte_roh_old.loc[:, bekannte_roh.columns.min():].sum().sum() <= 1
    delay_known_new_cases = pandas.concat([bekannte_roh_old.loc[:, :bekannte_roh.columns.min()-pandas.Timedelta(days=1)], bekannte_roh], axis=1)
    delay_known_new_cases = delay_known_new_cases.reindex(pandas.date_range('2020-01-01',
                                                     delay_known_new_cases.columns.max()), axis=1, fill_value=0)
    delay_known_new_cases.index.names = ['Altersgruppe', 'Geschlecht', 'delay']
    delay_known_new_cases.columns.name = 'Eingangsdatum'
    return delay_known_new_cases
In [8]:
def verzug_von_erkrankung_bis_eingang_nach_erkrankung(large_df):
    bekannte_roh = large_df[(large_df.IstErkrankungsbeginn == 1) 
                             & (large_df.NeuerFall == 1)
                             & (large_df.delay_eingang >= 0)
                             & (large_df.delay_eingang <= 30)].groupby(['Refdatum',  'Altersgruppe', 'Geschlecht', 'delay_eingang']).AnzahlFall.sum()
    bekannte_roh_old = large_df[(large_df.IstErkrankungsbeginn == 1) 
                             & (large_df.Datenstand == large_df.Datenstand.min())
                             & (large_df.NeuerFall == 0)
                             & (large_df.delay >= 0)
                             & (large_df.delay <= 30)].groupby(['Refdatum', 'Altersgruppe', 'Geschlecht', 'delay']).AnzahlFall.sum()
    
    bekannte_roh.index.names = bekannte_roh.index.names[:-1] + ['delay']
    bekannte_roh = bekannte_roh.unstack(level=0, fill_value=0)
    bekannte_roh_old = bekannte_roh_old.unstack(level=0, fill_value=0)

    bekannte_roh = bekannte_roh.reindex(pandas.MultiIndex.from_product(bekannte_roh.index.levels[:-1]+[pandas.RangeIndex(0, 31)], 
                                                                       names=bekannte_roh.index.names),
                                            fill_value=0)
    bekannte_roh_old = bekannte_roh_old.reindex(pandas.MultiIndex.from_product(bekannte_roh_old.index.levels[:-1]+[pandas.RangeIndex(0, 31)], 
                                                                       names=bekannte_roh_old.index.names),
                                            fill_value=0)
    
    return bekannte_roh.add(bekannte_roh_old, fill_value=0)
In [9]:
def imputiere_erkrankungsdaten(unknown_new_cases_per_day, delay_known_new_cases, reuse_tol=200, completely_missing_tol=0):
    dist = None
    melde_range = pandas.date_range('2020-01-01', unknown_new_cases_per_day.columns.max(), name='Refdatum')
    reuse_count = 0
    completely_missing_count = 0
    imputed_by_day_and_cat = {}
    for date in unknown_new_cases_per_day.columns:
        #print ("###", dw_start, unknown_new_cases_per_day.loc[:, dw_start: dw_end].sum())
        ##['A00-A04', 'A05-A14', 'A15-A34', 'A35-A59', 'A60-A79', 'A80+', 'unbekannt']:
        for ag in unknown_new_cases_per_day.index.levels[0]:
            for g in unknown_new_cases_per_day.index.levels[1]:
                section = unknown_new_cases_per_day.loc[ag, g]
                section = section[date:date] # note: This is including the upper boundary!
                section = section.reindex(melde_range, fill_value=0)
                count = section.sum()
                unnormed_dist = delay_known_new_cases.loc[ag, g].loc[:, date]
                unnormed_count = unnormed_dist.sum()
                if unnormed_count == 0 and count > 0:
                    # print("### Warning", date, ag, g, count)
                    reuse_count += count
                elif unnormed_count > 0:
                    dist = unnormed_dist / unnormed_count
                if count > 0 and dist is not None:
                    imputed_section = numpy.convolve(section, dist[::-1])[-len(section):] # we only want temporal padding
                    imputed_by_day_and_cat[(date, ag, g)] = pandas.Series(imputed_section, index=melde_range)
                    assert count - imputed_section.sum() < 2, "loosing too many cases in imputation"
                elif count > 0:
                    completely_missing_count += count
    if INTERACTIVE:
        print("reused:", reuse_count, "completely missing:", completely_missing_count)
    assert completely_missing_count <= completely_missing_tol # in D eigentlich nicht.
    assert reuse_count <= reuse_tol  # Stand 17. Mai: 179 Einzelfälle ohne andere Fälle in gleicher Kategorie in D
    imputed =  pandas.DataFrame(imputed_by_day_and_cat).T
    imputed.index.names = ['Eingangsdatum', 'Altersgruppe', 'Geschlecht']
    #imputed.columns.name = 'Eingangsdatum'
    imputed = imputed.reindex(pandas.date_range('2020-01-01', large_df.Eingangsdatum.max(), name=imputed.columns.names[0]), axis=1)
    return imputed
In [ ]:
 
In [10]:
def nowcast_lawless(known_cases, imputed, delay_cases):
    assert delay_cases.columns.name == "Refdatum"
    counts = delay_cases.iloc[:, -8-30:].sum(level=2).T[::-1].values

    cumulative_over_days_per_delay = numpy.concatenate([numpy.zeros_like(counts[:1]), counts.cumsum(0)], axis=0)
    counts8 = numpy.tril(cumulative_over_days_per_delay[8:]-cumulative_over_days_per_delay[:-8]) # sum of 8 preceeding days
    n_x = numpy.diagonal(counts8)
    N_x = counts8.sum(1)    

    g_hat = n_x / N_x

    F_hat = (1-g_hat[::-1]).cumprod()[::-1]
    imputed_total = known_cases.add(imputed, fill_value=0)
    nowcast = imputed_total.copy()
    for i in range(1, len(F_hat)):
        nowcast.iloc[-i] /= F_hat[i]
    return nowcast
In [11]:
def qualitaetschecks(large_df):
    with pyplot.style.context("seaborn"):
        dstr = large_df.Datenstand.max().date().isoformat()
        fn = f'./data/nowcast_imputed_known_from_graph_{dstr}.csv'
        if not os.path.exists(fn):
            print("Keine Vergleichsdaten vorhanden")
            return
        df_vergleich = pandas.read_csv(fn, index_col=0)
        df_vergleich.index = pandas.to_datetime(df_vergleich.index)
        faelle_angegeben = faelle_mit_erkrankungsbeginn_nach_erkrankungsbeginn(large_df)
        smooth_known = smooth(faelle_angegeben[df_vergleich.known.index.min():df_vergleich.known.index.max()])
        plot_vergleich(faelle_angegeben, df_vergleich.known, 'angegebener Erkrankungsbeginn',
                       p_output / f'angegeben_vs_rki.{dstr}.svg')

        unknown_new_cases_per_day = faelle_ohne_erkrankungsbeginn_nach_rekonstruiertem_eingangsdatum(large_df)
        nach_meldedatum = large_df[(large_df.Datenstand == large_df.Datenstand.max()) 
                                   & (large_df.IstErkrankungsbeginn == 0)
                                   & (large_df.NeuerFall >= 0)].groupby('Meldedatum').AnzahlFall.sum()
        pyplot.figure()
        pyplot.title('Fälle ohne Erkrankungsbeginn')
        pyplot.plot(nach_meldedatum['2020-04-28':], label='nach Meldedatum')
        pyplot.plot(unknown_new_cases_per_day.sum()['2020-04-28':], label='nach vermutetem Übermittlungsdatum')
        pyplot.xticks(rotation=90)
        pyplot.legend()
        pyplot.savefig(p_output / f'melde_vs_uebermittlung.{dstr}.svg', bbox_inches = "tight")
        pyplot.show()
        pyplot.close()

        delay_known_new_cases = verzug_von_erkrankung_bis_eingang(large_df)
        pyplot.figure()
        pyplot.title('Verteilung der Zeit zwischen Erkrankung und Eingang am RKI')
        new_delays = delay_known_new_cases.sum(level=2).loc[:, '2020-04-29':]
        pyplot.bar(new_delays.index, new_delays.sum(1)/new_delays.sum().sum(), alpha=0.5, label='neuere per Datenstand')
        old_delays = delay_known_new_cases.sum(level=2).loc[:, :'2020-04-28']
        pyplot.bar(new_delays.index, old_delays.sum(1)/old_delays.sum().sum(), alpha=0.5, label='ältere per Meldedatum')
        pyplot.legend()
        pyplot.savefig(p_output / f'erkrankung_uebermittlung_verzoegerung.{dstr}.svg', bbox_inches = "tight")
        pyplot.show()
        pyplot.close()

        
        imputed_x = imputiere_erkrankungsdaten(unknown_new_cases_per_day, delay_known_new_cases,
                                          completely_missing_tol=1, reuse_tol=300)
        imputed_erkr = imputed_x.sum()
        plot_vergleich(imputed_erkr, df_vergleich.imputed - df_vergleich.known, '(nur) imputierte Fälle',
                       p_output / f'imputiert_vs_rki.{dstr}.svg')

        delay_cases_per_onset = verzug_von_erkrankung_bis_eingang_nach_erkrankung(large_df)

        # reweight delays by imputed cases.
        factors = (imputed_x.sum(level=(1,2)) / delay_cases_per_onset.sum(level=(0,1))).fillna(0)+1
        delay_imputed_cases = delay_cases_per_onset * factors
        nowcast = nowcast_lawless(faelle_angegeben, imputed_erkr, delay_imputed_cases)
        plot_vergleich(nowcast, df_vergleich.nowcast, 'Nowcast (gesamt)', p_output / f'nowcast_vs_rki.{dstr}.svg')
In [12]:
if INTERACTIVE:
    qualitaetschecks(large_df)
Keine Vergleichsdaten vorhanden
In [13]:
def plot_nowcast(known, imputed, nowcast, title, min_date='2020-03-02', chop_days=3, fn=None):
    pyplot.figure(figsize=(15,5))
    kno_s = smooth(known)[min_date:][:-chop_days]
    imp_s = smooth(imputed.add(known, fill_value=0))[min_date:][:-chop_days]  # nowcast ist brutto, imputed netto...
    now_s = smooth(nowcast)[min_date:][:-chop_days]
    r_4 = now_s[-1]/now_s[-5]
    now_s7 = smooth(nowcast, size=7)[min_date:][:-chop_days]
    r_7 = now_s7[-1]/now_s7[-5]
    title += f" $R_4={r_4:.2f}$, $R_7={r_7:.2f}$"
    pyplot.title(title)
    pyplot.bar(now_s.index, now_s, label='nowcast')
    pyplot.bar(imp_s.index, imp_s, label='imputiert')
    pyplot.bar(kno_s.index, kno_s, label='angegeben')
    pyplot.legend()
    pyplot.xticks(rotation=90)
    if fn is not None:
        pyplot.savefig(fn, bbox_inches = "tight")
    pyplot.show()
    pyplot.close()
    return r_4, r_7
    
In [14]:
def plot_r(nowcast, num_days=15, chop_days=3):
    snc = smooth(nowcast)[-num_days-chop_days:-chop_days]
    snc7 = smooth(nowcast, size=7)[-num_days-chop_days:-chop_days]
    pyplot.plot(snc[4:]/snc[:-4].values, label='4-Tages-R')
    pyplot.plot(snc7[4:]/snc7[:-4].values, label='7-Tages-R')
    pyplot.xticks(rotation=90)
    pyplot.legend()
    pyplot.close()
    
#plot_r(nowcast)
In [15]:
delay_known_new_cases = verzug_von_erkrankung_bis_eingang(large_df)
delay_known_new_cases_per_refdatum = verzug_von_erkrankung_bis_eingang_nach_erkrankung(large_df)

def compute_and_plot(df, title, delay_known_new_cases, min_date='2020-03-02', chop_days=3):
    dstr = df.Datenstand.max().date().isoformat()
    known = faelle_mit_erkrankungsbeginn_nach_erkrankungsbeginn(df)
    short = bl_abbr.get(title, title).lower().replace(' ', '_')
    unknown_new_cases_per_day = faelle_ohne_erkrankungsbeginn_nach_rekonstruiertem_eingangsdatum(
        df, missing_tol=20)
    imputed_x = imputiere_erkrankungsdaten(unknown_new_cases_per_day, delay_known_new_cases,
                                               completely_missing_tol=1, reuse_tol=300)
    imputed = imputed_x.sum()
    # reweight delays by imputated cases.
    factors = (imputed_x.sum(level=(1,2)) / delay_known_new_cases_per_refdatum.sum(level=(0,1))).fillna(0)+1
    delay_imputed_cases = delay_known_new_cases_per_refdatum * factors

    nowcast = nowcast_lawless(known, imputed, delay_imputed_cases)
    fn = p_output / f'nowcast.{short}.{dstr}.svg'
    r_4, r_7 = plot_nowcast(known, imputed, nowcast, title+' '+dstr, fn=fn)
    copyfile(fn, p_output / f'nowcast.{short}.latest.svg')
    imputed_total = imputed.add(known, fill_value=0)
    kno_s = smooth(known)[min_date:][:-chop_days]
    imp_s = smooth(imputed_total)[min_date:][:-chop_days]  # nowcast ist brutto, imputed netto...
    now_s = smooth(nowcast)[min_date:][:-chop_days]
    return (pandas.DataFrame({
        short+ '_bekannt_unsmooth': known[min_date:][:-chop_days],
        short+ '_imputiert_unsmooth': imputed_total[min_date:][:-chop_days],
        short+ '_nowcast_unsmooth': nowcast[min_date:][:-chop_days],
        short+ '_bekannt_smooth': kno_s,
        short+ '_imputiert_smooth': imp_s,
        short+ '_nowcast_smooth': now_s,
    }).fillna(0), r_4, r_7)  # sometimes we don't have cases


results = {}
for bl in list(bl_abbr.keys())+['Der Norden']:
    if bl == 'Deutschland':
        bl_df = large_df
    elif bl == 'Der Norden':
        bl_df = large_df[(large_df.Bundesland.isin({'Bremen', 'Hamburg', 'Mecklenburg-Vorpommern', 'Niedersachsen', 'Schleswig-Holstein'}))]
    else:
        bl_df = large_df[large_df.Bundesland == bl]
    # nutzt globale Annahmen für Imputation/Nowcasting
    with pyplot.style.context("seaborn"):
        results[bl] = compute_and_plot(bl_df, bl, delay_known_new_cases)
7
reused: 195 completely missing: 0
1
reused: 28 completely missing: 0
18
reused: 47 completely missing: 1
1
reused: 2 completely missing: 0
3
reused: 2 completely missing: 0
3
reused: 7 completely missing: 0
1
reused: 8 completely missing: 0
4
reused: 8 completely missing: 0
0
reused: 0 completely missing: 0
4
reused: 20 completely missing: 0
11
reused: 32 completely missing: 0
2
reused: 25 completely missing: 0
2
reused: 11 completely missing: 0
2
reused: 0 completely missing: 0
11
reused: 2 completely missing: 0
5
reused: 2 completely missing: 1
2
reused: 0 completely missing: 0
6
reused: 37 completely missing: 1
In [16]:
results_df = pandas.concat([r[0] for r in results.values()], axis=1)
dstr = large_df.Datenstand.max().date().isoformat()
fn = p_output / f'nowcast_results.{dstr}.csv'
results_df.to_csv(fn)
copyfile(fn, p_output / 'nowcast_results.latest.csv')