The purpose of this notebook is to take the original PostgreSQL database and clean the data in it. The output is a set of tables in a new PostgreSQL schema that hold the cleaned data.
Cleaning occurs at several levels:
The structure of the data can be viewed at the ORM layer in the package.
!umd --version
urban-meal-delivery, version 0.2.0
from urban_meal_delivery import config, db
import collections
import datetime
import hashlib
import pandas as pd
import pytz as tz
import numpy as np
import sqlalchemy as sa
%load_ext lab_black
pd.set_option("display.max_columns", 999)
pd.set_option("display.max_rows", 999)
connection = db.connection
As a result of this notebook, a new PostgreSQL schema called "clean"
is created holding the tables with the cleaned data.
config.CLEAN_SCHEMA
'clean'
All tables with the original data are stored in the default PostgreSQL schema called "public"
.
config.ORIGINAL_SCHEMA
'public'
Use alembic
to run the very first database migration script that creates the new tables.
%cd -q ..
!alembic upgrade f11cd76d2f45
%cd -q research
INFO [alembic.runtime.migration] Context impl PostgresqlImpl. INFO [alembic.runtime.migration] Will assume transactional DDL. INFO [alembic.runtime.migration] Running upgrade -> f11cd76d2f45, Create the database from scratch.
While the original database consists of data concerning the UDP's operations in five cities in France, we only look at "Bordeaux"
, "Lyon"
, and "Paris"
in this research project, as the amount of data for "Lille"
and "Nantes"
is simply not a lot due to the very short time horizons the UDP had been operating there.
The following target_cities
data were manually obtained from Google Maps and mapped to the "database_id"
s of the cities in the original database where the UDP was operating in.
target_cities = {
"Bordeaux": {
"database_id": 4,
"google_maps_data": {
"center_latitude": 44.837789,
"center_longitude": -0.57918,
"northeast_latitude": 44.91670389999999,
"northeast_longitude": -0.5333089999999999,
"southwest_latitude": 44.810752,
"southwest_longitude": -0.638973,
"initial_zoom": 13,
},
},
"Lyon": {
"database_id": 1,
"google_maps_data": {
"center_latitude": 45.764043,
"center_longitude": 4.835659,
"northeast_latitude": 45.808425,
"northeast_longitude": 4.898393,
"southwest_latitude": 45.707486,
"southwest_longitude": 4.7718489,
"initial_zoom": 13,
},
},
"Paris": {
"database_id": 2,
"google_maps_data": {
"center_latitude": 48.856614,
"center_longitude": 2.3522219,
"northeast_latitude": 48.9021449,
"northeast_longitude": 2.4699208,
"southwest_latitude": 48.815573,
"southwest_longitude": 2.225193,
"initial_zoom": 12,
},
},
}
city_ids = tuple(city["database_id"] for city in target_cities.values())
cities
below holds the cleaned city related data from the original database. They come with KML data (i.e., area) associated with a city, which is kept.
cities = pd.read_sql_query(
f"""
SELECT
cities.id,
cities.name,
geo_areas.kml
FROM
{config.ORIGINAL_SCHEMA}.cities
LEFT OUTER JOIN
{config.ORIGINAL_SCHEMA}.geo_areas ON cities.geo_area_id = geo_areas.id
WHERE
cities.id IN %(city_ids)s
ORDER BY
cities.id
""",
con=connection,
index_col="id",
params={"city_ids": city_ids},
)
Merge in the data from Google Maps.
for city in target_cities.values():
for col, val in city["google_maps_data"].items():
cities.loc[city["database_id"], col] = val
Cast the columns' types explicitly.
cities = cities.astype(
{
"name": "string",
"kml": "string",
"center_latitude": float,
"center_longitude": float,
"northeast_latitude": float,
"northeast_longitude": float,
"southwest_latitude": float,
"southwest_longitude": float,
"initial_zoom": int,
}
)
cities.head()
name | kml | center_latitude | center_longitude | northeast_latitude | northeast_longitude | southwest_latitude | southwest_longitude | initial_zoom | |
---|---|---|---|---|---|---|---|---|---|
id | |||||||||
1 | Lyon | <?xml version='1.0' encoding='UTF-8'?><kml xml... | 45.764043 | 4.835659 | 45.808425 | 4.898393 | 45.707486 | 4.771849 | 13 |
2 | Paris | <?xml version='1.0' encoding='UTF-8'?><kml xml... | 48.856614 | 2.352222 | 48.902145 | 2.469921 | 48.815573 | 2.225193 | 12 |
4 | Bordeaux | <?xml version='1.0' encoding='UTF-8'?><kml xml... | 44.837789 | -0.579180 | 44.916704 | -0.533309 | 44.810752 | -0.638973 | 13 |
cities.info()
<class 'pandas.core.frame.DataFrame'> Int64Index: 3 entries, 1 to 4 Data columns (total 9 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 name 3 non-null string 1 kml 3 non-null string 2 center_latitude 3 non-null float64 3 center_longitude 3 non-null float64 4 northeast_latitude 3 non-null float64 5 northeast_longitude 3 non-null float64 6 southwest_latitude 3 non-null float64 7 southwest_longitude 3 non-null float64 8 initial_zoom 3 non-null int64 dtypes: float64(6), int64(1), string(2) memory usage: 320.0 bytes
As this notebook was developed iteratively, we validate that the cleaned data stays unchanged using SHA256 checksums of the cleaned DataFrames and other assert
s.
assert (
hashlib.sha256(cities.to_json().encode()).hexdigest()
== "800689a6ba5b6d03f583f258e058eca0b12e6df8e34c98bfe7aec246ed688c92"
)
Only load addresses with orders in the target cities, excluding the cut-off day.
addresses = pd.read_sql_query(
f"""
SELECT
id,
created_at,
place_id,
latitude,
longitude,
city_id,
city_name AS city,
zip,
street_address AS street,
floor,
special_instructions
FROM
{config.ORIGINAL_SCHEMA}.addresses
WHERE
city_id IN %(city_ids)s
AND
id IN (
SELECT DISTINCT address_id
FROM (
SELECT DISTINCT
pickup_address_id AS address_id
FROM
{config.ORIGINAL_SCHEMA}.orders
WHERE
created_at < '{config.CUTOFF_DAY}'
UNION
SELECT DISTINCT
dropoff_address_id AS address_id
FROM
{config.ORIGINAL_SCHEMA}.orders
WHERE
created_at < '{config.CUTOFF_DAY}'
) AS orders
)
ORDER BY
id
""",
con=connection,
index_col="id",
params={"city_ids": city_ids},
parse_dates=["created_at"],
)
All columns are "strings"
, even zip
.
addresses = addresses.astype(
{
"place_id": "string",
"city": "string",
"zip": "string",
"street": "string",
"floor": "string",
"special_instructions": "string",
}
)
addresses.head()
created_at | place_id | latitude | longitude | city_id | city | zip | street | floor | special_instructions | |
---|---|---|---|---|---|---|---|---|---|---|
id | ||||||||||
2 | 2016-02-22 09:42:10.204171 | ChIJSfxJmlXq9EcRX2ChkiPW9J8 | 45.763149 | 4.832660 | 1 | Lyon | 69002 | 31 rue Mercière | <NA> | |
3 | 2016-02-22 09:42:10.351150 | ChIJwwMvNPnq9EcRY7Qu-Tw2HL8 | 45.767227 | 4.835750 | 1 | Lyon | 69001 | Rue de la République 2 | <NA> | |
4 | 2016-02-22 09:43:57.077475 | ChIJr1RhGN7B9EcRv6XSHmmN6a8 | 45.743725 | 4.873138 | 1 | Lyon | 69008 | 123 avenue des frères lumières | <NA> | |
5 | 2016-02-22 09:43:57.220446 | ChIJEwQm9H7q9EcRTymMxQ71z84 | 45.759369 | 4.864087 | 1 | Lyon | 69003 | Avenue Georges Pompidou 17 | <NA> | Appel au 06 46 12 20 27 |
6 | 2016-02-22 10:06:08.762590 | ChIJbQz7p6vr9EcRr9L2cH5942I | 45.761181 | 4.826371 | 1 | Lyon | 69005 | 8 bis Place Saint Jean | <NA> |
addresses.info()
<class 'pandas.core.frame.DataFrame'> Int64Index: 663082 entries, 2 to 691914 Data columns (total 10 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 created_at 663082 non-null datetime64[ns] 1 place_id 662953 non-null string 2 latitude 663082 non-null float64 3 longitude 663082 non-null float64 4 city_id 663082 non-null int64 5 city 663082 non-null string 6 zip 663082 non-null string 7 street 663082 non-null string 8 floor 337703 non-null string 9 special_instructions 585310 non-null string dtypes: datetime64[ns](1), float64(2), int64(1), string(6) memory usage: 55.6 MB
assert len(addresses) == 663_082
Create a helper function that strips out the microseconds from datetime columns and converts their time zones from UTC to Europe/Paris.
def clean_datetime(col):
"""Strip Microseconds and convert timezone to Europe/Paris."""
return (
col.dt.tz_localize(tz.utc)
.dt.tz_convert(tz.timezone("Europe/Paris"))
.dt.tz_localize(None)
.map(
lambda x: datetime.datetime(
x.year, x.month, x.day, x.hour, x.minute, x.second
)
if x is not pd.NaT
else x
)
)
addresses["created_at"] = clean_datetime(addresses["created_at"])
A tiny number of addresses has latitude
/ longitude
pairs as place_id
s.
addresses["place_id"] = (
addresses["place_id"].str.replace(r"^[\d\.,-]+$", "", regex=True).str.strip()
)
Discard addresses without a place_id
by Google Maps. If even Google does not know where these addresses are geo-located, we do not even try.
msk = addresses["place_id"].isnull() | (addresses["place_id"] == "")
addresses = addresses[~msk]
assert msk.sum() == 139
Some customers entered too much data into the city
part of the address. Unify this column by only keeping the city's name.
addresses["city"].unique()
<StringArray> [ 'Lyon', 'Lyon 2 E Arrondissement', 'Villeurbanne', 'Lyon 7 E Arrondissement', 'Lyon 6 E Arrondissement', 'Lyon 8 E Arrondissement', 'Lyon 9 E Arrondissement', 'Paris', 'Levallois-Perret', 'Courbevoie', 'Puteaux', 'Neuilly Sur Seine', 'Boulogne Billancourt', 'Levallois Perret', 'Malakoff', 'Saint-Mandé', 'Paris 14 E Arrondissement', 'Montrouge', 'Issy-les-moulineaux', 'Paris 10 E Arrondissement', 'Issy Les Moulineaux', 'Nanterre', 'Paris 12 E Arrondissement', 'La Garenne-colombes', 'Vincennes', 'Montreuil', 'Asnières-sur-Seine', 'Paris 15 E Arrondissement', 'Saint-Ouen', 'Paris 9 E Arrondissement', 'Pantin', 'Paris 17 E Arrondissement', 'Paris 1 Er Arrondissement', 'Bordeaux', 'Talence', 'Ivry-sur-Seine', 'Testas', 'Saint-Denis'] Length: 38, dtype: string
addresses["city"] = (
addresses["city"]
.str.replace(r"(E Arrondissement|Er Arrondissement)", "", regex=True)
.str.replace(r"(\d)", "", regex=True)
# Get rid off accents.
.str.normalize("NFKD")
.str.encode("ascii", errors="ignore")
.str.decode("utf8")
.astype("string")
# Unify hyphens.
.str.strip()
.str.replace("-", " ")
.str.title()
.str.replace(" ", "-")
)
Sub-urban city names surrounding the three big cities in this research project are kept.
addresses["city"].value_counts()
Paris 362252 Lyon 199872 Bordeaux 64379 Villeurbanne 15555 Levallois-Perret 4620 Courbevoie 3510 Puteaux 2323 Issy-Les-Moulineaux 1219 Malakoff 1214 Montrouge 1164 Pantin 1119 Saint-Mande 1043 Boulogne-Billancourt 986 Montreuil 909 Neuilly-Sur-Seine 835 Vincennes 826 Ivry-Sur-Seine 438 Nanterre 361 La-Garenne-Colombes 213 Asnieres-Sur-Seine 62 Saint-Ouen 35 Saint-Denis 4 Testas 3 Talence 1 Name: city, dtype: Int64
addresses["zip"].unique()
<StringArray> [ '69002', '69001', '69008', '69003', '69005', '69007', '69004', '69009', '69006', '69100', ... '92053', '92806', '69003 ', '75015 PARIS', '69', '33077', '33092', '33139', '75010z', '92040'] Length: 167, dtype: string
addresses["zip"] = (
addresses["zip"]
.str.replace(r".*(\d{5}).*", r"\1", regex=True)
.str.replace(r"\D+", "", regex=True)
.replace("", "NaN")
.astype(float)
)
Zip codes with less than 5 digits are invalid. Paris has zip codes with 75xxx (with 92xxx, 93xxx, and 94xxx being suburbs), Lyon 69xxx, and Bordeaux 33xxx (cf., source). Keep only valid zip codes in target cities.
invalid = addresses["zip"].notnull() & (addresses["zip"] < 10000)
assert invalid.sum() == 9
not_in_target_cities = (
addresses["zip"].notnull()
& ~invalid
& ~(
(33000 <= addresses["zip"]) & (addresses["zip"] < 34000)
| (69000 <= addresses["zip"]) & (addresses["zip"] < 70000)
| (75000 <= addresses["zip"]) & (addresses["zip"] < 76000)
| (92000 <= addresses["zip"]) & (addresses["zip"] < 95000)
)
)
assert not_in_target_cities.sum() == 10
addresses.loc[invalid | not_in_target_cities, "zip"] = np.NaN
addresses["zip"].unique()
array([69002., 69001., 69008., 69003., 69005., 69007., 69004., 69009., 69006., 69100., 69200., 69300., 69350., nan, 75011., 69370., 75009., 75010., 75001., 75008., 75017., 75003., 75002., 75018., 92300., 75007., 75012., 75006., 75116., 75015., 92400., 92800., 92200., 92100., 75020., 75019., 75004., 75013., 75016., 75005., 75014., 92130., 92092., 92932., 92000., 92150., 92270., 92190., 92110., 92170., 93400., 92600., 94250., 69160., 92120., 92250., 92078., 92310., 92240., 75543., 92064., 92210., 75270., 75000., 92057., 93100., 92140., 92081., 75741., 69326., 69130., 69500., 94160., 94300., 93170., 94110., 94220., 93260., 69000., 93500., 75045., 94120., 94200., 92220., 93000., 92974., 92063., 69429., 33000., 33800., 33200., 33400., 33300., 33130., 33110., 33100., 33270., 33520., 92935., 33067., 33150., 93507., 33700., 69628., 33080., 33076., 92700., 93310., 92042., 92058., 92930., 69120., 92936., 93300., 93210., 92671., 93200., 92053., 92806., 33077., 33092., 33139., 92040.])
Discard addresses with missing zip codes because they are hard to geo-code.
msk = addresses["zip"].isnull()
addresses = addresses[~msk]
assert msk.sum() == 21
addresses = addresses.astype({"zip": int})
Remove extra whitespace, HTML encodings, and accents.
addresses["street"] = (
addresses["street"]
.str.replace("\s+", " ", regex=True)
.str.replace("'", "'")
# Get rid off accents.
.str.normalize("NFKD")
.str.encode("ascii", errors="ignore")
.str.decode("utf8")
.astype("string")
.str.strip()
.str.title()
)
There are no addresses without a street
name.
assert not addresses["street"].isnull().any()
Make floor
an integer column.
addresses["floor"].unique()
<StringArray> [ <NA>, '', '3', '19', '2ème étage', '1', '1er', '2eme etage face escalier', '2', '2 eme droite', ... ' Premiere Bati interphone 16 2eme', 'Le Cargo - 4e étage', 'Rdc appart 11', 'Etage 3, appartement 11', '5e étage gauche ', '2eme etage a gauche bat b ', '4ème étage, chambre 41', 'Cinquiéme ', 'Montez jusqu'à la grille ', '5ème étage à droite '] Length: 11990, dtype: string
Parse out floors from the floor
text column.
addresses["floor"] = (
addresses["floor"]
# Get rid of accents and lower case everything.
.str.normalize("NFKD")
.str.encode("ascii", errors="ignore")
.str.decode("utf8")
.astype("string")
.str.casefold()
# Replace common text that messes up the matching.
.str.replace(".", "")
.str.replace(":", "")
.str.replace(";", "")
.str.replace("'", "'")
.str.replace("36b25", "")
.str.replace("n°", "")
.str.replace("#", "")
.str.replace("face l'assanceur", "")
.str.replace("\(drt\)", "")
.str.replace("floor", "")
.str.replace("et demi", "")
.str.replace("et droite", "")
.str.replace("droite", "")
.str.replace("droit", "")
.str.replace("a gauche", "")
.str.replace("e gauche", "")
.str.replace("gauche", "")
.str.replace("entrez", "")
.str.replace("serez", "")
.str.replace("dussol", "")
.str.replace("soler", "")
.str.replace("sonner", "")
.str.replace("code", "")
.str.replace("perez", "")
.str.replace("-", "")
.str.replace("\s+", " ", regex=True)
.str.strip()
# Abbreviations.
.str.replace(
r"^.*?((\d+)\s?(er|ere|em|eme|ele|ieme|bis|(e|g|st|nd|rd|th|z)($|,|\s+))).*",
r"\2",
regex=True,
)
# French written out.
.str.replace(r".*(rdc|rez|sol|ground).*", "0", regex=True)
.str.replace(r".*(premiere|premier).*", "1", regex=True)
.str.replace(r".*(deuxieme).*", "2", regex=True)
.str.replace(r".*(troisieme).*", "3", regex=True)
.str.replace(r".*(quatrieme).*", "4", regex=True)
.str.replace(r".*(cinquieme).*", "5", regex=True)
.str.replace(r".*(sixieme).*", "6", regex=True)
.str.replace(r".*(septieme).*", "7", regex=True)
.str.replace(r".*(huitieme).*", "8", regex=True)
.str.replace(r".*(neuvieme).*", "9", regex=True)
.str.replace(r".*(dixieme).*", "10", regex=True)
.str.replace(r"^.*?((etage|etg) (\d+))($|\D+.*)", r"\3", regex=True)
.str.replace(r"^.*?((\d+)(etage| etage|etg| etg)).*", r"\2", regex=True)
# Remove apartment info to not confuse it with floor
.str.replace(
r"(.*)(ap|apt|app|appt|appart|appartment|appartement|chambre|room)\s*\w?\d+(.*)",
r"\1 \3",
regex=True,
)
.str.replace(r"(.*)(code|digicode)\s*\w?\d+(.*)", r"\1 \3", regex=True)
# Take number at start.
.str.replace(r"^(\d+)(,|\s+).*", r"\1", regex=True)
# Ignore anything with non-numeric symbols entirely.
.str.replace(r".*\D+.*", "", regex=True)
.str.replace("^$", "NaN")
.fillna("NaN")
.astype(float)
)
If the floor
column is empty, parse out floor info from the special_instructions
column that must have been used before the floor
column was introduced (slightly different parsing logic than above).
addresses["special_instructions"] = (
addresses["special_instructions"]
# Get rid of accents and lower case everything.
.str.normalize("NFKD")
.str.encode("ascii", errors="ignore")
.str.decode("utf8")
.astype("string")
.str.casefold()
# Replace common text that messes up the matching.
.str.replace(".", "")
.str.replace(":", "")
.str.replace(";", "")
.str.replace("'", "'")
.str.replace("36b25", "")
.str.replace("n°", "")
.str.replace("#", "")
.str.replace("face l'assanceur", "")
.str.replace("\(drt\)", "")
.str.replace("floor", "")
.str.replace("et demi", "")
.str.replace("et droite", "")
.str.replace("droite", "")
.str.replace("droit", "")
.str.replace("a gauche", "")
.str.replace("e gauche", "")
.str.replace("gauche", "")
.str.replace("entrez", "")
.str.replace("serez", "")
.str.replace("dussol", "")
.str.replace("soler", "")
.str.replace("sonner", "")
.str.replace("code", "")
.str.replace("perez", "")
.str.replace("-", "")
.str.replace("\s+", " ", regex=True)
.str.strip()
# Abbreviations.
.str.replace(
r"^.*?((\d+)\s?(er|ere|em|eme|ele|ieme|bis|(e|g|st|nd|rd|th|z)($|,|\s+))).*",
r"\2",
regex=True,
)
# French written out.
.str.replace(r".*(rdc|rez|sol|ground).*", "0", regex=True)
.str.replace(r".*(premiere|premier).*", "1", regex=True)
.str.replace(r".*(deuxieme).*", "2", regex=True)
.str.replace(r".*(troisieme).*", "3", regex=True)
.str.replace(r".*(quatrieme).*", "4", regex=True)
.str.replace(r".*(cinquieme).*", "5", regex=True)
.str.replace(r".*(sixieme).*", "6", regex=True)
.str.replace(r".*(septieme).*", "7", regex=True)
.str.replace(r".*(huitieme).*", "8", regex=True)
.str.replace(r".*(neuvieme).*", "9", regex=True)
.str.replace(r".*(dixieme).*", "10", regex=True)
.str.replace(r"^.*?((etage|etg) (\d+))($|\D+.*)", r"\3", regex=True)
.str.replace(r"^.*?((\d+)(etage| etage|etg| etg)).*", r"\2", regex=True)
# Remove apartment info to not confuse it with floor.
.str.replace(
r"(.*)(ap|apt|app|appt|appart|appartment|appartement|chambre|room)\s*\w?\d+(.*)",
r"\1 \3",
regex=True,
)
.str.replace(r"(.*)(code|digicode)\s*\w?\d+(.*)", r"\1 \3", regex=True)
# Ignore anything with non-numeric symbols entirely.
.str.replace(r".*\D+.*", "", regex=True)
.str.replace("^$", "NaN")
.fillna("NaN")
.astype(float)
)
Fill in floor
from special_instructions
and cast the type.
msk = addresses["floor"].isnull() & addresses["special_instructions"].notnull()
addresses.loc[msk, "floor"] = addresses.loc[msk, "special_instructions"].values
del addresses["special_instructions"]
addresses = addresses.astype({"floor": "Int64"})
Only keep the realisic numbers.
addresses.loc[addresses["floor"].notnull() & (addresses["floor"] > 40), "floor"] = pd.NA
Most addresses have no floor number given.
assert len(addresses.loc[addresses["floor"].isnull(), "floor"]) == 307_973
Most floor
s are near the ground floor, which is plausible.
addresses["floor"].value_counts().sort_index()
0 20977 1 69765 2 62439 3 59011 4 52226 5 42930 6 28396 7 8853 8 3349 9 1631 10 1384 11 764 12 657 13 448 14 322 15 206 16 202 17 109 18 135 19 105 20 77 21 51 22 99 23 39 24 85 25 76 26 56 27 30 28 49 29 51 30 33 31 55 32 22 33 11 34 37 35 55 36 19 37 8 38 32 39 136 40 19 Name: floor, dtype: Int64
The number of addresses (ca. 663,000) is inflated, probably due to some sort of automated re-entering.
assert len(addresses) == 662_922
First, merge all addresses with the same place_id
, latitude
/ longitude
, city
, zip
, street
, and floor
into one entry, namely its first occurrence.
addresses["floor"] = addresses["floor"].fillna(999) # dummy -> No grouping with NaN's
by = ["place_id", "latitude", "longitude", "city_id", "city", "zip", "street", "floor"]
addresses = (
addresses.reset_index()
.set_index(by)
.merge(
(
addresses.reset_index()
.groupby(by)[["id"]]
.min()
.rename(columns={"id": "merged_on_id"})
),
left_index=True,
right_index=True,
)
.reset_index()
.astype({"place_id": "string", "city": "string", "street": "string"})
)
Keep a dictionary address_merger
to map the ID's that are merged away to the ones that are kept.
address_merger = collections.defaultdict(lambda: np.NaN)
address_merger.update(
{
id_: merged_on_id
for _, id_, merged_on_id in addresses[["id", "merged_on_id"]].itertuples()
}
)
addresses = (
addresses[addresses["id"] == addresses["merged_on_id"]]
.set_index("id")
.sort_index()[
[
"created_at",
"place_id",
"latitude",
"longitude",
"city_id",
"city",
"zip",
"street",
"floor",
]
]
)
addresses["floor"] = addresses["floor"].replace(999, pd.NA).astype("Int64")
Only about 178,000 addresses remain!
assert len(addresses) == 178_101
Second, many addresses are still redundant as they are referring to different floor
s in the same house or their street
name is written differently.
We create a primary_id
column that holds the ID of the first occurrence of an address independent of the exact spelling of the street
name and the floor
number.
That column is created via grouping the remaining addresses twice, once with their GPS location, and second by a simplified version of street
. The latter accounts for slightly different latitude
/ longitude
pairs of the same location, potentially due to an update in the Google Maps database.
by = ["place_id", "latitude", "longitude"]
addresses = (
addresses.reset_index()
.set_index(by)
.merge(
(
addresses.reset_index()
.groupby(by)[["id"]]
.min()
.rename(columns={"id": "unified1_id"})
),
left_index=True,
right_index=True,
)
.reset_index()
.set_index("id")
.sort_index()
.astype({"place_id": "string"})[
[
"unified1_id",
"created_at",
"place_id",
"latitude",
"longitude",
"city_id",
"city",
"zip",
"street",
"floor",
]
]
)
addresses["street_simple"] = (
addresses["street"]
.str.replace("Avenue", "Ave")
.str.replace("Place", "Pl")
.str.replace(".", "")
.str.replace("-", "")
.str.replace(" ", "")
.str.lower()
)
by = ["city_id", "street_simple"]
addresses = (
addresses.reset_index()
.set_index(by)
.merge(
(
addresses.reset_index()
.groupby(by)[["id"]]
.min()
.rename(columns={"id": "unified2_id"})
),
left_index=True,
right_index=True,
)
.reset_index()
.set_index("id")
.sort_index()[
[
"unified1_id",
"unified2_id",
"created_at",
"place_id",
"latitude",
"longitude",
"city_id",
"city",
"zip",
"street",
"floor",
]
]
)
So, an address may be a duplicate of two different earlier addresses and we choose the earliest one.
addresses["primary_id"] = addresses[["unified1_id", "unified2_id"]].min(axis=1)
del addresses["unified1_id"]
del addresses["unified2_id"]
addresses = addresses[
[
"primary_id",
"created_at",
"place_id",
"latitude",
"longitude",
"city_id",
"city",
"zip",
"street",
"floor",
]
]
A tricky issue is that an address could be identified as a duplicate of an earlier one that itself is a duplicate of an even earlier one. The following loop does the trick and maps each address to its earlierst version.
_address_unifier = {
id_: unified_id
for _, id_, unified_id in addresses.reset_index()[["id", "primary_id"]].itertuples()
}
while True:
if (addresses["primary_id"] != addresses["primary_id"].map(_address_unifier)).any():
addresses["primary_id"] = addresses["primary_id"].map(_address_unifier)
else:
break
Only about 87,000 of the remaining 178,000 addresses are unique locations disregarding floor
s and different spellings of the street
name.
_addresses = addresses.reset_index()
msk = _addresses["id"] == _addresses["primary_id"]
del _addresses
assert msk.sum() == 87_287
To not overwrite a Python built-in in the ORM layer.
addresses = addresses.rename(columns={"zip": "zip_code"})
addresses.head()
primary_id | created_at | place_id | latitude | longitude | city_id | city | zip_code | street | floor | |
---|---|---|---|---|---|---|---|---|---|---|
id | ||||||||||
2 | 2 | 2016-02-22 10:42:10 | ChIJSfxJmlXq9EcRX2ChkiPW9J8 | 45.763149 | 4.832660 | 1 | Lyon | 69002 | 31 Rue Merciere | <NA> |
3 | 3 | 2016-02-22 10:42:10 | ChIJwwMvNPnq9EcRY7Qu-Tw2HL8 | 45.767227 | 4.835750 | 1 | Lyon | 69001 | Rue De La Republique 2 | <NA> |
4 | 4 | 2016-02-22 10:43:57 | ChIJr1RhGN7B9EcRv6XSHmmN6a8 | 45.743725 | 4.873138 | 1 | Lyon | 69008 | 123 Avenue Des Freres Lumieres | <NA> |
5 | 5 | 2016-02-22 10:43:57 | ChIJEwQm9H7q9EcRTymMxQ71z84 | 45.759369 | 4.864087 | 1 | Lyon | 69003 | Avenue Georges Pompidou 17 | <NA> |
6 | 6 | 2016-02-22 11:06:08 | ChIJbQz7p6vr9EcRr9L2cH5942I | 45.761181 | 4.826371 | 1 | Lyon | 69005 | 8 Bis Place Saint Jean | <NA> |
addresses.info()
<class 'pandas.core.frame.DataFrame'> Int64Index: 178101 entries, 2 to 691914 Data columns (total 10 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 primary_id 178101 non-null int64 1 created_at 178101 non-null datetime64[ns] 2 place_id 178101 non-null string 3 latitude 178101 non-null float64 4 longitude 178101 non-null float64 5 city_id 178101 non-null int64 6 city 178101 non-null string 7 zip_code 178101 non-null int64 8 street 178101 non-null string 9 floor 100540 non-null Int64 dtypes: Int64(1), datetime64[ns](1), float64(2), int64(3), string(3) memory usage: 15.1 MB
assert (
hashlib.sha256(addresses.to_json().encode()).hexdigest()
== "4f9f3b63a9b2472bf07207d0e06f4901619066121d6bb5fd3ad4ebf21b590410"
)
Load restaurants associated with all addresses in the target cities. Further below, all restaurants are shown to have a clean address.
restaurants = pd.read_sql_query(
f"""
SELECT
id,
created_at,
name,
address_id,
estimated_prep_duration
FROM
{config.ORIGINAL_SCHEMA}.businesses
WHERE
address_id IN (
SELECT id FROM {config.ORIGINAL_SCHEMA}.addresses WHERE city_id IN %(city_ids)s
)
AND
created_at < '{config.CUTOFF_DAY}'
ORDER BY
id
""",
con=connection,
index_col="id",
params={"city_ids": city_ids},
parse_dates=["created_at"],
)
restaurants["name"] = restaurants["name"].astype("string")
restaurants.head()
created_at | name | address_id | estimated_prep_duration | |
---|---|---|---|---|
id | ||||
1 | 2016-02-22 09:42:10.228854 | King Marcel Mercière | 2 | 1200 |
2 | 2016-02-22 09:43:57.103750 | Trotekala | 4 | 1200 |
3 | 2016-02-22 10:06:08.796042 | Soul Food & Jazz Café | 6 | 1200 |
4 | 2016-02-22 10:16:11.478981 | Gourmix Bellecour | 8 | 900 |
5 | 2016-02-22 10:36:09.150481 | Yabio Hôtel de Ville | 10 | 1200 |
restaurants.info()
<class 'pandas.core.frame.DataFrame'> Int64Index: 1654 entries, 1 to 1787 Data columns (total 4 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 created_at 1654 non-null datetime64[ns] 1 name 1654 non-null string 2 address_id 1654 non-null int64 3 estimated_prep_duration 1654 non-null int64 dtypes: datetime64[ns](1), int64(2), string(1) memory usage: 64.6 KB
assert len(restaurants) == 1_654
restaurants["created_at"] = clean_datetime(restaurants["created_at"])
restaurants["name"] = (
restaurants["name"]
.str.replace("\s+", " ", regex=True)
.str.replace("'", "'")
# Get rid off accents.
.str.normalize("NFKD")
.str.encode("ascii", errors="ignore")
.str.decode("utf8")
.astype("string")
.str.title()
# To find duplicates further below.
.str.replace(" & ", " And ")
.str.replace("The ", "")
.str.replace("Pasta Pizza ", "")
.str.replace(" - Bar A Taboule", "")
.str.replace("- Cuisine Mediterraneenne", "")
.str.replace(" - Petit-Dejeuner", "")
.str.replace("Lyon", "")
.str.replace("La Burgeria Saint Mande", "La Fromagette Saint Mande")
.str.replace("Mansou'", "Mansouria")
.str.strip()
)
restaurants["address_id"] = restaurants["address_id"].map(address_merger)
assert not restaurants["address_id"].isnull().any()
Restaurants with the same name at the same (unified) address are merged.
restaurants = restaurants.merge(
addresses["primary_id"], left_on="address_id", right_index=True
)
restaurants = restaurants.rename(columns={"primary_id": "primary_address_id"})
by = ["name", "primary_address_id"]
restaurants = (
restaurants.reset_index()
.set_index(by)
.merge(
(
restaurants.reset_index()
.groupby(by)[["id"]]
.min()
.rename(columns={"id": "merged_on_id"})
),
left_index=True,
right_index=True,
)
.reset_index()
.astype({"name": "string"})
)
Keep a dictionary to map the ID's that are merged away to the ones that are kept.
restaurants_merger = collections.defaultdict(lambda: np.NaN)
restaurants_merger.update(
{
id_: merged_on_id
for _, id_, merged_on_id in restaurants[["id", "merged_on_id"]].itertuples()
}
)
restaurants = (
restaurants[restaurants["id"] == restaurants["merged_on_id"]]
.set_index("id")
.sort_index()[["created_at", "name", "address_id", "estimated_prep_duration"]]
)
assert len(restaurants) == 1_644
restaurants.head()
created_at | name | address_id | estimated_prep_duration | |
---|---|---|---|---|
id | ||||
1 | 2016-02-22 10:42:10 | King Marcel Merciere | 2 | 1200 |
2 | 2016-02-22 10:43:57 | Trotekala | 4 | 1200 |
3 | 2016-02-22 11:06:08 | Soul Food And Jazz Cafe | 6 | 1200 |
4 | 2016-02-22 11:16:11 | Gourmix Bellecour | 8 | 900 |
5 | 2016-02-22 11:36:09 | Yabio Hotel De Ville | 10 | 1200 |
restaurants.info()
<class 'pandas.core.frame.DataFrame'> Int64Index: 1644 entries, 1 to 1787 Data columns (total 4 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 created_at 1644 non-null datetime64[ns] 1 name 1644 non-null string 2 address_id 1644 non-null int64 3 estimated_prep_duration 1644 non-null int64 dtypes: datetime64[ns](1), int64(2), string(1) memory usage: 64.2 KB
assert (
hashlib.sha256(restaurants.to_json().encode()).hexdigest()
== "8eb852690c027e2fcc0d9cf988391741b1cd028e35c494766f8c21d4ea1722b7"
)
Only load couriers that worked in one of the target cities (i.e., had an order) and include the vehicle information.
couriers = pd.read_sql_query(
f"""
SELECT
couriers.id,
couriers.created_at,
MD5(couriers.name) AS name,
vehicle_types.icon as vehicle,
couriers.speed,
vehicle_bag_types.capacity,
couriers.pay_per_hour,
couriers.pay_per_order
FROM
{config.ORIGINAL_SCHEMA}.couriers
LEFT OUTER JOIN
{config.ORIGINAL_SCHEMA}.vehicles ON couriers.vehicle_id = vehicles.id
LEFT OUTER JOIN
{config.ORIGINAL_SCHEMA}.vehicle_types ON vehicles.vehicle_type_id = vehicle_types.id
LEFT OUTER JOIN
{config.ORIGINAL_SCHEMA}.vehicle_bag_types ON vehicles.vehicle_bag_type_id = vehicle_bag_types.id
WHERE
couriers.id in (
SELECT DISTINCT
deliveries.courier_id
FROM
{config.ORIGINAL_SCHEMA}.orders
INNER JOIN
{config.ORIGINAL_SCHEMA}.deliveries ON orders.id = deliveries.order_id
WHERE
orders.featured_business_id IN (
SELECT -- Subquery based off the restaurants query above!
id
FROM
{config.ORIGINAL_SCHEMA}.businesses
WHERE
address_id IN (
SELECT id FROM {config.ORIGINAL_SCHEMA}.addresses WHERE city_id IN %(city_ids)s
)
AND
created_at < '{config.CUTOFF_DAY}'
)
AND
deliveries.courier_id IS NOT NULL
)
ORDER BY
couriers.id
""",
con=connection,
index_col="id",
params={"city_ids": city_ids},
parse_dates=["created_at"],
)
couriers = couriers.astype({"name": "string", "vehicle": "string"})
couriers.head()
created_at | name | vehicle | speed | capacity | pay_per_hour | pay_per_order | |
---|---|---|---|---|---|---|---|
id | |||||||
1 | 2016-02-21 13:28:41.571046 | 9181c2be2b3746cbcd6abf86d77fb2aa | bicycle | 13.90 | 150 | 0 | 300 |
2 | 2016-02-21 13:28:41.673711 | 1b5d5709985e874e9c54e30127b3049f | bicycle | 18.47 | 100 | 0 | 300 |
3 | 2016-02-21 13:28:41.776893 | 0e385d898f81147d460a45d2943a4eed | bicycle | 19.18 | 100 | 0 | 300 |
4 | 2016-02-21 13:28:41.879876 | ca21f91ea528f1b6e596d0c43959062b | bicycle | 14.73 | 150 | 0 | 200 |
5 | 2016-02-21 13:28:41.982542 | 04af1119256f88ea3a25e79344dec96c | bicycle | 15.20 | 150 | 0 | 200 |
couriers.info()
<class 'pandas.core.frame.DataFrame'> Int64Index: 2471 entries, 1 to 53764 Data columns (total 7 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 created_at 2471 non-null datetime64[ns] 1 name 2471 non-null string 2 vehicle 2471 non-null string 3 speed 2471 non-null float64 4 capacity 2471 non-null int64 5 pay_per_hour 2471 non-null int64 6 pay_per_order 2471 non-null int64 dtypes: datetime64[ns](1), float64(1), int64(3), string(2) memory usage: 154.4 KB
assert len(couriers) == 2_471
couriers["created_at"] = clean_datetime(couriers["created_at"])
Couriers with the same name either have the same phone number or signed up within a short time window: They are merged.
by = ["name"]
couriers = (
couriers.reset_index()
.set_index(by)
.merge(
(
couriers.reset_index()
.groupby(by)[["id"]]
.max() # merged on the latest courier!
.rename(columns={"id": "merged_on_id"})
),
left_index=True,
right_index=True,
)
.reset_index()
.astype({"name": "string"})
)
Keep a dictionary to map the ID's that are merged away to the ones that are kept.
couriers_merger = collections.defaultdict(lambda: np.NaN)
couriers_merger.update(
{
id_: merged_on_id
for _, id_, merged_on_id in couriers[["id", "merged_on_id"]].itertuples()
}
)
couriers = (
couriers[couriers["id"] == couriers["merged_on_id"]]
.set_index("id")
.sort_index()[
["created_at", "vehicle", "speed", "capacity", "pay_per_hour", "pay_per_order"]
]
)
assert len(couriers) == 2_469
The column pay_per_hour
defaults to 0
in the database definition. The actual default value is EUR 7,50, which is also the mode in the dataset.
couriers["pay_per_hour"].value_counts()
0 2398 750 70 1500 1 Name: pay_per_hour, dtype: int64
The column pay_per_order
defaults to 0
in the database definition. A more realistic value is EUR 2 (i.e., 200 cents), which is the mode in the dataset.
couriers["pay_per_order"].value_counts()
200 2146 0 159 300 97 400 29 500 11 2 9 600 8 20000 3 250 3 650 2 1400000 1 1 1 Name: pay_per_order, dtype: int64
Whenever a 0
appears in pay_per_order
, the corresponding pay_per_hour
is 0
in all cases except one, which is the highest paid courier.
assert ((couriers["pay_per_order"] == 0) & (couriers["pay_per_hour"] == 0)).sum() == 158
assert ((couriers["pay_per_order"] == 0) & (couriers["pay_per_hour"] > 0)).sum() == 1
couriers[(couriers["pay_per_order"] == 0) & (couriers["pay_per_hour"] > 0)]
created_at | vehicle | speed | capacity | pay_per_hour | pay_per_order | |
---|---|---|---|---|---|---|
id | ||||||
721 | 2016-03-18 16:56:39 | bicycle | 20.49 | 100 | 1500 | 0 |
Couriers with 0
s in both columns receive the default payment scheme.
msk_0_pay = (couriers["pay_per_hour"] == 0) & (couriers["pay_per_order"] == 0)
couriers.loc[msk_0_pay, "pay_per_hour"] = 750
couriers.loc[msk_0_pay, "pay_per_order"] = 200
Couriers with a 0
in the pay_per_hour
column, receive a fixed salary of EUR 7,50.
couriers.loc[couriers["pay_per_hour"] == 0, "pay_per_hour"] = 750
The column pay_per_order
contains obvious typos that are corrected.
couriers.loc[
couriers["pay_per_order"].isin([1, 2, 20, 2000, 20000]), "pay_per_order"
] = 200
couriers.loc[couriers["pay_per_order"] == 1400000, "pay_per_order"] = 400
Distribution of the various pay_per_hour
/ pay_per_order
combinations.
collections.Counter(
(y, z) for (x, y, z) in couriers[["pay_per_hour", "pay_per_order"]].itertuples()
)
Counter({(750, 300): 97, (750, 200): 2317, (750, 500): 11, (750, 400): 30, (750, 600): 8, (750, 650): 2, (1500, 0): 1, (750, 250): 3})
couriers.head()
created_at | vehicle | speed | capacity | pay_per_hour | pay_per_order | |
---|---|---|---|---|---|---|
id | ||||||
1 | 2016-02-21 14:28:41 | bicycle | 13.90 | 150 | 750 | 300 |
2 | 2016-02-21 14:28:41 | bicycle | 18.47 | 100 | 750 | 300 |
3 | 2016-02-21 14:28:41 | bicycle | 19.18 | 100 | 750 | 300 |
4 | 2016-02-21 14:28:41 | bicycle | 14.73 | 150 | 750 | 200 |
5 | 2016-02-21 14:28:41 | bicycle | 15.20 | 150 | 750 | 200 |
couriers.info()
<class 'pandas.core.frame.DataFrame'> Int64Index: 2469 entries, 1 to 53764 Data columns (total 6 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 created_at 2469 non-null datetime64[ns] 1 vehicle 2469 non-null string 2 speed 2469 non-null float64 3 capacity 2469 non-null int64 4 pay_per_hour 2469 non-null int64 5 pay_per_order 2469 non-null int64 dtypes: datetime64[ns](1), float64(1), int64(3), string(1) memory usage: 135.0 KB
assert (
hashlib.sha256(couriers.to_json().encode()).hexdigest()
== "a1059e93095842120a58c4f74145bb6a14aeb94f47a0d77043896c70f4772afe"
)
The order related data is spread over many different tables in the original database. Also, some data is not even normalized. The following SQL query puts all the data into one big relation that is cleaned further below.
orders = pd.read_sql_query(
f"""
SELECT
orders.id,
deliveries.id AS delivery_id,
MD5(CONCAT(orders.email, orders.phone_number)) as customer_id, -- anonymize the customer data
orders.order_placed_at AS placed_at,
CASE
WHEN orders.preorder IS FALSE
THEN TRUE
ELSE FALSE
END AS ad_hoc,
CASE
WHEN orders.preorder is TRUE
THEN orders.scheduled_dropoff_at
ELSE NULL
END AS scheduled_delivery_at,
deliveries.status,
cancellations.cancelled_at,
orders.featured_business_id as restaurant_id,
orders.order_sent_at AS restaurant_notified_at,
orders.order_received_at AS restaurant_confirmed_at,
orders.estimated_prep_duration,
orders.estimated_prep_buffer,
deliveries.courier_id,
deliveries.courier_dispatched_at AS dispatch_at,
deliveries.courier_notified_at,
deliveries.courier_accepted_at,
courier_no_accept_confirmed.issue AS courier_no_accept_confirmed_issue,
orders.pickup_address_id,
orders.scheduled_pickup_at,
deliveries.courier_picked_up_at AS pickup_at,
left_pickups.left_pickup_at,
courier_late_at_pickup.issue AS courier_late_at_pickup_issue,
courier_waited_at_pickup.issue AS courier_waited_at_pickup_issue,
courier_no_pickup_confirmed.issue AS courier_no_pickup_confirmed_issue,
orders.dropoff_address_id AS delivery_address_id,
deliveries.first_estimated_dropoff_at AS first_estimated_delivery_at,
deliveries.courier_dropped_off_at AS delivery_at,
courier_waited_at_delivery.issue AS courier_waited_at_delivery_issue,
courier_no_delivery_confirmed.issue AS courier_no_delivery_confirmed_issue,
orders.utilization,
items_totals.sub_total,
orders.delivery_fee,
orders.total,
deliveries.delivery_distance AS logged_delivery_distance,
deliveries.courier_avg_speed AS logged_avg_courier_speed,
CAST(deliveries.courier_avg_speed_distance AS INTEGER) AS logged_avg_courier_speed_distance,
delivery_timings.accepting_time AS logged_accepting_time,
delivery_timings.courier_reaction_time AS logged_reaction_time,
delivery_timings.to_pickup_time AS logged_to_pickup_time,
delivery_timings.expected_wait_pickup_time AS expected_wait_pickup_time,
delivery_timings.wait_pickup_time AS logged_wait_pickup_time,
delivery_timings.pickup_time AS logged_pickup_time,
delivery_timings.courier_late AS logged_courier_late_time,
delivery_timings.vendor_late AS logged_restaurant_late_time,
delivery_timings.to_dropoff_time AS logged_to_delivery_time,
delivery_timings.expected_dropoff_time AS expected_delivery_time,
delivery_timings.dropoff_time AS logged_delivery_time,
delivery_timings.delivery_late AS logged_delivery_late_time,
delivery_timings.total_time AS logged_total_time,
delivery_timings.confirmed_total_time AS logged_confirmed_total_time
FROM
{config.ORIGINAL_SCHEMA}.orders
LEFT OUTER JOIN
{config.ORIGINAL_SCHEMA}.deliveries ON orders.id = deliveries.order_id
LEFT OUTER JOIN
(
SELECT
order_id,
CAST(100 * SUM(price) AS INTEGER) AS sub_total
FROM
{config.ORIGINAL_SCHEMA}.order_records
GROUP BY
order_id
) AS items_totals ON orders.id = items_totals.order_id
LEFT OUTER JOIN
{config.ORIGINAL_SCHEMA}.delivery_timings ON deliveries.id = delivery_timings.delivery_id
LEFT OUTER JOIN (
SELECT
delivery_id,
MAX(notes) AS issue
FROM
{config.ORIGINAL_SCHEMA}.issues
WHERE
type = 'DispatchIssue'
AND
category = 'no_courier_interaction'
GROUP BY
delivery_id
) AS courier_no_accept_confirmed ON deliveries.id = courier_no_accept_confirmed.delivery_id
LEFT OUTER JOIN (
SELECT
delivery_id,
MAX(notes) AS issue
FROM
{config.ORIGINAL_SCHEMA}.issues
WHERE
type = 'PickupIssue'
AND
category = 'waiting'
GROUP BY
delivery_id
) AS courier_waited_at_pickup ON deliveries.id = courier_waited_at_pickup.delivery_id
LEFT OUTER JOIN (
SELECT
delivery_id,
MAX(notes) AS issue
FROM
{config.ORIGINAL_SCHEMA}.issues
WHERE
type = 'PickupIssue'
AND
category = 'late'
GROUP BY
delivery_id
) AS courier_late_at_pickup ON deliveries.id = courier_late_at_pickup.delivery_id
LEFT OUTER JOIN (
SELECT
delivery_id,
MAX(notes) AS issue
FROM
{config.ORIGINAL_SCHEMA}.issues
WHERE
type = 'PickupIssue'
AND
category = 'no_courier_interaction'
GROUP BY
delivery_id
) AS courier_no_pickup_confirmed ON deliveries.id = courier_no_pickup_confirmed.delivery_id
LEFT OUTER JOIN (
SELECT
delivery_id,
MAX(notes) AS issue
FROM
{config.ORIGINAL_SCHEMA}.issues
WHERE
type = 'DropoffIssue'
AND
category = 'waiting'
GROUP BY
delivery_id
) AS courier_waited_at_delivery ON deliveries.id = courier_waited_at_delivery.delivery_id
LEFT OUTER JOIN (
SELECT
delivery_id,
MAX(notes) AS issue
FROM
{config.ORIGINAL_SCHEMA}.issues
WHERE
type = 'DropoffIssue'
AND
category = 'late'
GROUP BY
delivery_id
) AS courier_late_at_delivery ON deliveries.id = courier_late_at_delivery.delivery_id
LEFT OUTER JOIN (
SELECT
delivery_id,
MAX(notes) AS issue
FROM
{config.ORIGINAL_SCHEMA}.issues
WHERE
type = 'DropoffIssue'
AND
category = 'no_courier_interaction'
GROUP BY
delivery_id
) AS courier_no_delivery_confirmed ON deliveries.id = courier_no_delivery_confirmed.delivery_id
LEFT OUTER JOIN (
SELECT
delivery_id,
courier_id,
MAX(created_at) AS left_pickup_at
FROM (
SELECT
delivery_id,
(metadata -> 'courier_id')::TEXT::INTEGER AS courier_id,
created_at
FROM
{config.ORIGINAL_SCHEMA}.delivery_transitions
WHERE
to_state = 'left_pickup'
) AS left_pickups
GROUP BY
delivery_id,
courier_id
) AS left_pickups ON deliveries.id = left_pickups.delivery_id AND deliveries.courier_id = left_pickups.courier_id
LEFT OUTER JOIN (
SELECT
delivery_id,
MAX(created_at) AS cancelled_at
FROM
{config.ORIGINAL_SCHEMA}.delivery_transitions
WHERE
to_state = 'cancelled'
GROUP BY
delivery_id
) AS cancellations ON deliveries.id = cancellations.delivery_id
WHERE
orders.featured_business_id IN (
SELECT -- Subquery based off the restaurants query above!
id
FROM
{config.ORIGINAL_SCHEMA}.businesses
WHERE
address_id IN (
SELECT id FROM {config.ORIGINAL_SCHEMA}.addresses WHERE city_id IN %(city_ids)s
)
AND
created_at < '{config.CUTOFF_DAY}'
)
AND
scheduled_dropoff_at < '{config.CUTOFF_DAY}'
AND
deliveries.is_primary IS TRUE
ORDER BY
orders.id
""",
con=connection,
index_col="id",
params={"city_ids": city_ids},
parse_dates=[
"placed_at",
"scheduled_delivery_at",
"cancelled_at",
"restaurant_notified_at",
"restaurant_confirmed_at",
"dispatch_at",
"courier_notified_at",
"courier_accepted_at",
"pickup_at",
"left_pickup_at",
"first_estimated_delivery_at",
"delivery_at",
],
)
orders = orders.astype(
{
"customer_id": "string",
"status": "string",
"estimated_prep_duration": "Int64",
"courier_id": "Int64",
"courier_no_accept_confirmed_issue": "string",
"courier_late_at_pickup_issue": "string",
"courier_waited_at_pickup_issue": "string",
"courier_no_pickup_confirmed_issue": "string",
"courier_waited_at_delivery_issue": "string",
"courier_no_delivery_confirmed_issue": "string",
"logged_avg_courier_speed_distance": "Int64",
"logged_accepting_time": "Int64",
"logged_reaction_time": "Int64",
"logged_to_pickup_time": "Int64",
"expected_wait_pickup_time": "Int64",
"logged_wait_pickup_time": "Int64",
"logged_pickup_time": "Int64",
"logged_courier_late_time": "Int64",
"logged_restaurant_late_time": "Int64",
"logged_to_delivery_time": "Int64",
"expected_delivery_time": "Int64",
"logged_delivery_time": "Int64",
"logged_delivery_late_time": "Int64",
"logged_total_time": "Int64",
"logged_confirmed_total_time": "Int64",
}
)
orders.head()
delivery_id | customer_id | placed_at | ad_hoc | scheduled_delivery_at | status | cancelled_at | restaurant_id | restaurant_notified_at | restaurant_confirmed_at | estimated_prep_duration | estimated_prep_buffer | courier_id | dispatch_at | courier_notified_at | courier_accepted_at | courier_no_accept_confirmed_issue | pickup_address_id | scheduled_pickup_at | pickup_at | left_pickup_at | courier_late_at_pickup_issue | courier_waited_at_pickup_issue | courier_no_pickup_confirmed_issue | delivery_address_id | first_estimated_delivery_at | delivery_at | courier_waited_at_delivery_issue | courier_no_delivery_confirmed_issue | utilization | sub_total | delivery_fee | total | logged_delivery_distance | logged_avg_courier_speed | logged_avg_courier_speed_distance | logged_accepting_time | logged_reaction_time | logged_to_pickup_time | expected_wait_pickup_time | logged_wait_pickup_time | logged_pickup_time | logged_courier_late_time | logged_restaurant_late_time | logged_to_delivery_time | expected_delivery_time | logged_delivery_time | logged_delivery_late_time | logged_total_time | logged_confirmed_total_time | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
id | ||||||||||||||||||||||||||||||||||||||||||||||||||
1 | 487377 | f172baefbdf8550eb2c6bcf667e43090 | 2016-02-22 09:42:01 | False | 2016-02-22 11:30:00 | cancelled | 2016-10-18 07:52:45.136243 | 1 | NaT | NaT | <NA> | 0 | <NA> | NaT | NaT | NaT | <NA> | 2 | 2016-02-22 11:15:00.000000 | NaT | NaT | <NA> | <NA> | <NA> | 3 | NaT | NaT | <NA> | <NA> | 15 | 1250 | 250 | 1500 | 671 | NaN | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> |
2 | 487433 | cc2f0563675b4b2e9b985bfa4d7a157f | 2016-02-22 09:40:09 | False | 2016-02-22 11:00:00 | completed | NaT | 2 | 2016-02-22 10:31:03.664857 | 2016-02-22 10:46:43.348671 | <NA> | 0 | 96 | 2016-02-22 10:37:00.683271 | 2016-02-22 10:36:04.532254 | 2016-02-22 10:37:00.683188 | Courier did not hit "Accept" | 4 | 2016-02-22 10:41:03.664793 | 2016-02-22 10:54:00.886417 | NaT | <NA> | <NA> | Courier did not hit "Picked up" | 5 | 2016-02-23 22:55:23 | 2016-02-22 11:13:23.772580 | <NA> | <NA> | 19 | 1550 | 250 | 1800 | 2281 | NaN | <NA> | 2916 | <NA> | 187 | <NA> | 69 | 125 | 67 | 69 | 1745 | 4791 | 264 | 942 | 5734 | 5595 |
3 | 487444 | c09807c923d356a43fac084543de664b | 2016-02-22 09:56:16 | False | 2016-02-22 11:00:00 | completed | NaT | 3 | 2016-02-22 10:31:02.531350 | 2016-02-22 10:42:00.334529 | <NA> | 0 | 57 | 2016-02-22 10:31:30.963423 | 2016-02-22 10:31:03.587634 | 2016-02-22 10:31:30.963342 | <NA> | 6 | 2016-02-22 10:41:02.531284 | 2016-02-22 10:54:32.122854 | NaT | <NA> | <NA> | <NA> | 7 | 2016-02-23 22:25:47 | 2016-02-22 11:06:07.677724 | <NA> | <NA> | 25 | 2000 | 250 | 2250 | 2449 | NaN | <NA> | 28 | <NA> | 1381 | <NA> | <NA> | 0 | 0 | 0 | 522 | 3824 | 270 | 464 | 4288 | 4192 |
4 | 470503 | 440d641745a8707a7afd5565adc99be2 | 2016-02-22 10:11:46 | False | 2016-02-22 11:30:00 | completed | NaT | 4 | 2016-02-22 10:32:04.810633 | 2016-02-22 10:48:01.402101 | <NA> | 0 | 39 | 2016-02-22 10:50:29.141811 | 2016-02-22 10:50:03.610169 | 2016-02-22 10:50:29.141722 | <NA> | 8 | 2016-02-22 11:17:04.810526 | 2016-02-22 11:11:26.166161 | NaT | <NA> | <NA> | <NA> | 9 | 2016-02-22 11:25:50 | 2016-02-22 11:18:43.289947 | <NA> | <NA> | 100 | 9800 | 250 | 10050 | 196 | NaN | <NA> | 26 | <NA> | 439 | <NA> | -180 | 818 | -1061 | -338 | 87 | 4694 | 350 | -676 | 4017 | 4017 |
5 | 487439 | 0e7b3e4012a00a56b5bf1dee1bc6c1c7 | 2016-02-22 10:35:58 | False | 2016-02-22 11:30:00 | completed | NaT | 5 | 2016-02-22 10:50:03.866327 | 2016-02-22 10:50:19.473584 | <NA> | 0 | 128 | 2016-02-22 10:50:28.153663 | 2016-02-22 10:50:04.830855 | 2016-02-22 10:50:28.153559 | <NA> | 10 | 2016-02-22 11:10:03.866256 | 2016-02-22 11:05:19.667624 | NaT | <NA> | <NA> | <NA> | 11 | 2016-02-22 11:28:19 | 2016-02-22 11:20:44.147345 | <NA> | <NA> | 45 | 3600 | 250 | 3850 | 2584 | NaN | <NA> | 24 | <NA> | 284 | <NA> | -440 | 607 | -735 | -284 | 694 | 3242 | 231 | -555 | 2686 | 2686 |
orders.info()
<class 'pandas.core.frame.DataFrame'> Int64Index: 661314 entries, 1 to 688690 Data columns (total 50 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 delivery_id 661314 non-null int64 1 customer_id 661314 non-null string 2 placed_at 661314 non-null datetime64[ns] 3 ad_hoc 661314 non-null bool 4 scheduled_delivery_at 83579 non-null datetime64[ns] 5 status 661314 non-null string 6 cancelled_at 23665 non-null datetime64[ns] 7 restaurant_id 661314 non-null int64 8 restaurant_notified_at 656311 non-null datetime64[ns] 9 restaurant_confirmed_at 639636 non-null datetime64[ns] 10 estimated_prep_duration 552317 non-null Int64 11 estimated_prep_buffer 661314 non-null int64 12 courier_id 648850 non-null Int64 13 dispatch_at 656923 non-null datetime64[ns] 14 courier_notified_at 430500 non-null datetime64[ns] 15 courier_accepted_at 650964 non-null datetime64[ns] 16 courier_no_accept_confirmed_issue 40105 non-null string 17 pickup_address_id 661314 non-null int64 18 scheduled_pickup_at 656727 non-null datetime64[ns] 19 pickup_at 638249 non-null datetime64[ns] 20 left_pickup_at 312171 non-null datetime64[ns] 21 courier_late_at_pickup_issue 67499 non-null string 22 courier_waited_at_pickup_issue 29469 non-null string 23 courier_no_pickup_confirmed_issue 35394 non-null string 24 delivery_address_id 661314 non-null int64 25 first_estimated_delivery_at 657796 non-null datetime64[ns] 26 delivery_at 637808 non-null datetime64[ns] 27 courier_waited_at_delivery_issue 32415 non-null string 28 courier_no_delivery_confirmed_issue 13884 non-null string 29 utilization 661314 non-null int64 30 sub_total 661314 non-null int64 31 delivery_fee 661314 non-null int64 32 total 661314 non-null int64 33 logged_delivery_distance 661314 non-null int64 34 logged_avg_courier_speed 213841 non-null float64 35 logged_avg_courier_speed_distance 213841 non-null Int64 36 logged_accepting_time 637808 non-null Int64 37 logged_reaction_time 410349 non-null Int64 38 logged_to_pickup_time 637808 non-null Int64 39 expected_wait_pickup_time 532387 non-null Int64 40 logged_wait_pickup_time 497916 non-null Int64 41 logged_pickup_time 637808 non-null Int64 42 logged_courier_late_time 503505 non-null Int64 43 logged_restaurant_late_time 503505 non-null Int64 44 logged_to_delivery_time 637808 non-null Int64 45 expected_delivery_time 637808 non-null Int64 46 logged_delivery_time 637808 non-null Int64 47 logged_delivery_late_time 637808 non-null Int64 48 logged_total_time 637808 non-null Int64 49 logged_confirmed_total_time 637808 non-null Int64 dtypes: Int64(17), bool(1), datetime64[ns](13), float64(1), int64(10), string(8) memory usage: 263.6 MB
assert len(orders) == 661_314
for column in [
"placed_at",
"scheduled_delivery_at",
"cancelled_at",
"restaurant_notified_at",
"restaurant_confirmed_at",
"dispatch_at",
"courier_notified_at",
"courier_accepted_at",
"scheduled_pickup_at",
"pickup_at",
"left_pickup_at",
"first_estimated_delivery_at",
"delivery_at",
]:
orders[column] = clean_datetime(orders[column])
About 0.02 % of the orders belong to discarded addresses and are discarded also.
orders["pickup_address_id"] = orders["pickup_address_id"].map(address_merger)
orders["delivery_address_id"] = orders["delivery_address_id"].map(address_merger)
msk = orders["pickup_address_id"].isnull() | orders["delivery_address_id"].isnull()
orders = orders[~msk].astype({"pickup_address_id": int, "delivery_address_id": int,})
assert msk.sum() == 160
orders["restaurant_id"] = orders["restaurant_id"].map(restaurants_merger)
assert not orders["restaurant_id"].isnull().any()
orders["courier_id"] = orders["courier_id"].map(couriers_merger).astype("Int64")
Verify that the couriers' IDs are the same in couriers
and orders
.
assert set(couriers.index) == set(
orders.loc[orders["courier_id"].notnull(), "courier_id"].unique()
)
Convert the MD5 hashed emails and phone numbers into integer ID's.
orders["customer_id"] = (
orders["customer_id"]
.map({y: x for (x, y) in enumerate(orders["customer_id"].unique(), start=1)})
.astype({"customer_id": int})
)
Ad-hoc orders never have a scheduled_delivery_at
value set, and scheduled orders always have it set.
assert not (
(orders["ad_hoc"] == True) & orders["scheduled_delivery_at"].notnull()
).any()
assert not (
(orders["ad_hoc"] == False) & orders["scheduled_delivery_at"].isnull()
).any()
For all adjusted timestamps we add *_corrected
columns indicating if a correction is made in the following.
for column in [
"scheduled_delivery_at",
"cancelled_at",
"restaurant_notified_at",
"restaurant_confirmed_at",
"estimated_prep_duration",
"dispatch_at",
"courier_notified_at",
"courier_accepted_at",
"pickup_at",
"left_pickup_at",
"delivery_at",
]:
orders[column + "_corrected"] = False
orders.loc[orders[column].isnull(), column + "_corrected"] = pd.NA
Some customers managed to place scheduled orders for the past. These are converted into ad-hoc orders.
msk = orders["scheduled_delivery_at"] < orders["placed_at"]
orders.loc[msk, "ad_hoc"] = True
orders.loc[msk, "scheduled_delivery_at"] = pd.NaT
orders.loc[msk, "scheduled_delivery_at_corrected"] = True
assert msk.sum() == 11
Orders scheduled within the next 30 minutes are treated as ad-hoc orders. With the median fulfillment time of ad-hoc orders being 34 minutes, it is absolutely unrealistic to fulfill such a scheduled order on time. This should not influence the KPIs in a bad way.
msk = (orders["ad_hoc"] == False) & (
orders["scheduled_delivery_at"] - orders["placed_at"]
< datetime.timedelta(minutes=30)
)
orders.loc[msk, "ad_hoc"] = True
orders.loc[msk, "scheduled_delivery_at"] = pd.NaT
orders.loc[msk, "scheduled_delivery_at_corrected"] = True
assert msk.sum() == 3_267
For scheduled orders, scheduled_delivery_at
is mostly set to quarters of an hour. The seconds part is always 0
.
assert not (
(orders["ad_hoc"] == False) & (orders["scheduled_delivery_at"].dt.second != 0)
).any()
If a customer managed to enter something other than a quarter of an hour as scheduled_delivery_at
, we adjust that.
msk = (orders["ad_hoc"] == False) & (
orders["scheduled_delivery_at"].dt.minute % 15 != 0
)
round_down = msk & (orders["scheduled_delivery_at"].dt.minute % 15 < 8)
orders.loc[round_down, "scheduled_delivery_at"] = orders.loc[
round_down, "scheduled_delivery_at"
] - (orders.loc[round_down, "scheduled_delivery_at"].dt.minute % 15).map(
lambda m: datetime.timedelta(minutes=m)
)
round_up = msk & (orders["scheduled_delivery_at"].dt.minute % 15 >= 8)
orders.loc[round_up, "scheduled_delivery_at"] = orders.loc[
round_up, "scheduled_delivery_at"
] + (orders.loc[round_up, "scheduled_delivery_at"].dt.minute % 15).map(
lambda m: datetime.timedelta(minutes=(15 - m))
)
orders.loc[msk, "scheduled_delivery_at_corrected"] = True
assert msk.sum() == 6
All timestamps in orders
must occur in a strict sequence (i.e., order) according to the delivery process. A tiny fraction of the orders has timestamps that do not comply with that and are adjusted in the following.
placed_at
must always be the earliest of all timestamps.
for column in [
"scheduled_delivery_at",
"cancelled_at",
"restaurant_notified_at",
"restaurant_confirmed_at",
"dispatch_at",
"courier_notified_at",
"courier_accepted_at",
"pickup_at",
"left_pickup_at",
"first_estimated_delivery_at",
"delivery_at",
]:
assert not (orders["placed_at"] >= orders[column]).any()
Rarely, a restaurant confirmed an order before it was notified about it. We keep restaurant_confirmed_at
in these cases.
msk = orders["restaurant_notified_at"] >= orders["restaurant_confirmed_at"]
orders.loc[msk, "restaurant_notified_at"] = pd.NaT
orders.loc[msk, "restaurant_notified_at_corrected"] = True
assert msk.sum() == 47
Whenever restaurant_notified_at
or restaurant_confirmed_at
is later than pickup_at
, we discard the values.
msk = orders["restaurant_notified_at"] >= orders["pickup_at"]
orders.loc[msk, "restaurant_notified_at"] = pd.NaT
orders.loc[msk, "restaurant_notified_at_corrected"] = True
assert msk.sum() == 73
msk = orders["restaurant_confirmed_at"] >= orders["pickup_at"]
orders.loc[msk, "restaurant_confirmed_at"] = pd.NaT
orders.loc[msk, "restaurant_confirmed_at_corrected"] = True
assert msk.sum() == 2_001
If a courier forgot to confirm the pickup, pickup_at
and delivery_at
are the same.
msk = orders["delivery_at"] == orders["pickup_at"]
orders.loc[msk, "pickup_at"] = pd.NaT
orders.loc[msk, "pickup_at_corrected"] = True
assert msk.sum() == 16
msk = orders["delivery_at"] == orders["left_pickup_at"]
orders.loc[msk, "left_pickup_at"] = pd.NaT
orders.loc[msk, "left_pickup_at_corrected"] = True
assert msk.sum() == 15
delivery_at
must be the latest of all dispatch-related timestamps.
for column in [
"dispatch_at",
"courier_notified_at",
"courier_accepted_at",
"pickup_at",
"left_pickup_at",
]:
assert not (orders["delivery_at"] <= orders[column]).any()
In about 14,500 cases the left_pickup_at
lies before or on pickup_at
. This only affects orders between September 6 and October 17. We discard these timestamps.
msk = orders["left_pickup_at"] < orders["pickup_at"]
orders.loc[msk, "left_pickup_at"] = pd.NaT
orders.loc[msk, "left_pickup_at_corrected"] = True
assert msk.sum() == 14_013
assert orders.loc[msk, "placed_at"].min().date() == datetime.date(2016, 9, 6)
assert orders.loc[msk, "placed_at"].max().date() == datetime.date(2016, 10, 17)
msk = orders["left_pickup_at"] == orders["pickup_at"]
orders.loc[msk, "left_pickup_at"] = pd.NaT
orders.loc[msk, "left_pickup_at_corrected"] = True
assert msk.sum() == 496
for column in [
"dispatch_at",
"courier_notified_at",
"courier_accepted_at",
"pickup_at",
]:
assert not (orders["left_pickup_at"] <= orders[column]).any()
Rarely, pickup_at
is earlier than or equal to dispatch_at
, courier_notified_at
, or courier_accepted_at
. They are discarded.
msk = orders["pickup_at"] <= orders["dispatch_at"]
orders.loc[msk, "dispatch_at"] = pd.NaT
orders.loc[msk, "dispatch_at_corrected"] = True
assert msk.sum() == 15
msk = orders["pickup_at"] <= orders["courier_notified_at"]
orders.loc[msk, "courier_notified_at"] = pd.NaT
orders.loc[msk, "courier_notified_at_corrected"] = True
assert msk.sum() == 8
assert set(orders.loc[msk, "status"].unique()) == set(["cancelled"])
msk = orders["pickup_at"] <= orders["courier_accepted_at"]
orders.loc[msk, "courier_accepted_at"] = pd.NaT
orders.loc[msk, "courier_accepted_at_corrected"] = True
assert msk.sum() == 15
for column in ["dispatch_at", "courier_notified_at", "courier_accepted_at"]:
assert not (orders["pickup_at"] <= orders[column]).any()
For about 66.000 orders courier_accepted_at
equals dispatch_at
or lies before it. We assume the former is correct and discard the latter.
msk = orders["courier_accepted_at"] <= orders["dispatch_at"]
orders.loc[msk, "dispatch_at"] = pd.NaT
orders.loc[msk, "dispatch_at_corrected"] = True
assert msk.sum() == 65_848
If courier_accepted_at
is equal or before courier_notified_at
, we discard the latter.
msk = orders["courier_accepted_at"] <= orders["courier_notified_at"]
orders.loc[msk, "courier_notified_at"] = pd.NaT
orders.loc[msk, "courier_notified_at_corrected"] = True
assert msk.sum() == 165_585
for column in ["dispatch_at", "courier_notified_at"]:
assert not (orders["courier_accepted_at"] <= orders[column]).any()
For some more orders, courier_notified_at
lies before dispatch_at
. Manual analysis reveals that in most of these cases, the courier did not hit "accept". We discard dispatch_at
as the timings between courier_notified_at
and courier_accepted_at
fit the issue messages.
msk = orders["courier_notified_at"] <= orders["dispatch_at"]
orders.loc[msk, "dispatch_at"] = pd.NaT
orders.loc[msk, "dispatch_at_corrected"] = True
assert msk.sum() == 3_397
Ad-hoc orders that were placed before 11 in the morning and after 23 in the evening are discarded. Most of them were cancelled anyways.
msk = (orders["ad_hoc"] == True) & (
(orders["placed_at"].dt.hour <= 10) | (orders["placed_at"].dt.hour >= 23)
)
orders = orders[~msk]
assert msk.sum() == 337
The orders scheduled for 11:15 (=1) and 11:30 (=37) are scheduled for 11:45. Most of them were not delivered until 12 anyways. This is in line with the 30-minute minimum horizon above.
msk = (orders["scheduled_delivery_at"].dt.hour == 11) & (
orders["scheduled_delivery_at"].dt.minute == 30
)
orders.loc[msk, "scheduled_delivery_at"] += datetime.timedelta(minutes=15)
orders.loc[msk, "scheduled_delivery_at_corrected"] = True
assert msk.sum() == 37
msk = (orders["scheduled_delivery_at"].dt.hour == 11) & (
orders["scheduled_delivery_at"].dt.minute == 15
)
orders.loc[msk, "scheduled_delivery_at"] += datetime.timedelta(minutes=30)
orders.loc[msk, "scheduled_delivery_at_corrected"] = True
assert msk.sum() == 1
assert not (
(orders["scheduled_delivery_at"].dt.hour == 11)
& (orders["scheduled_delivery_at"].dt.minute == 0)
).any()
Orders with a scheduled delivery before 11 in the morning and after 23 in the evening are discarded.
msk = (orders["ad_hoc"] == False) & (
(orders["scheduled_delivery_at"].dt.hour <= 10)
| (orders["scheduled_delivery_at"].dt.hour >= 23)
)
orders = orders[~msk]
assert msk.sum() == 159
ad_hoc = orders["ad_hoc"] == True
scheduled = ~ad_hoc
There are only cancelled and completed orders. We replace the status
column with a boolean cancelled
column.
assert set(orders["status"].unique()) == set(["cancelled", "completed"])
orders["cancelled"] = False
msk = orders["status"] == "cancelled"
orders.loc[msk, "cancelled"] = True
del orders["status"]
assert msk.sum() == 23_552
Some cancelled orders still have a delivery_at
value. All of them have a dummy value for the cancelled_at
value (cf., below). For roughly two thirds of them, the time between pickup and delivery is so small that it seems unrealistic that they actually were delivered. In these cases, we take delivery_at
as the realistic cancelled_at
value. The ones that could have been delivered realistically are treated as completed orders.
claimed_to_be_delivered = (orders["cancelled"] == True) & orders[
"delivery_at"
].notnull()
assert (
orders.loc[claimed_to_be_delivered, "cancelled_at"].min()
== orders.loc[claimed_to_be_delivered, "cancelled_at"].max()
== datetime.datetime(2016, 10, 18, 9, 52, 45)
)
realistically_delivered = (
orders["delivery_at"] - orders["pickup_at"]
).dt.total_seconds() > 120
msk = claimed_to_be_delivered & realistically_delivered
orders.loc[msk, "cancelled"] = False
orders.loc[msk, "cancelled_at"] = pd.NaT
orders.loc[msk, "cancelled_at_corrected"] = True
msk = claimed_to_be_delivered & ~realistically_delivered
orders.loc[msk, "cancelled_at"] = orders.loc[msk, "delivery_at"]
orders.loc[msk, "cancelled_at_corrected"] = True
orders.loc[msk, "delivery_at"] = pd.NaT
orders.loc[msk, "delivery_at_corrected"] = pd.NA
assert claimed_to_be_delivered.sum() == 159
assert (claimed_to_be_delivered & realistically_delivered).sum() == 61
assert (claimed_to_be_delivered & ~realistically_delivered).sum() == 98
Only cancelled orders have a cancelled_at
value.
cancelled = orders["cancelled"] == True
completed = orders["cancelled"] == False
assert not orders.loc[cancelled, "cancelled_at"].isnull().any()
assert not orders.loc[completed, "cancelled_at"].notnull().any()
For about 40% of the orders the cancelled_at
field was only filled in after a system change on October 18 (i.e., in a batch). For these orders, this field is not meaningful because of that. We discard it.
batch = orders["cancelled_at"] == datetime.datetime(2016, 10, 18, 9, 52, 45)
orders.loc[cancelled & batch, "cancelled_at"] = pd.NaT
orders.loc[cancelled & batch, "cancelled_at_corrected"] = True
assert (cancelled & batch).sum() == 9_410
When a restaurant was notified about an order after the order was cancelled, we discard restaurant_notified_at
and restaurant_confirmed_at
.
msk = orders["cancelled_at"] <= orders["restaurant_notified_at"]
orders.loc[msk, "restaurant_notified_at"] = pd.NaT
orders.loc[msk, "restaurant_notified_at_corrected"] = True
assert msk.sum() == 6
msk = orders["cancelled_at"] <= orders["restaurant_confirmed_at"]
orders.loc[msk, "restaurant_confirmed_at"] = pd.NaT
orders.loc[msk, "restaurant_confirmed_at_corrected"] = True
assert msk.sum() == 1_253
When an order was dispatched in the moment it was cancelled, we adjust that.
msk = orders["cancelled_at"] == orders["dispatch_at"]
orders.loc[msk, "dispatch_at"] -= datetime.timedelta(seconds=1)
orders.loc[msk, "dispatch_at_corrected"] = True
assert msk.sum() == 3
When a courier was notified about or accepted an order in the moment it was cancelled, we adjust that.
msk = orders["cancelled_at"] == orders["courier_notified_at"]
orders.loc[msk, "courier_notified_at"] -= datetime.timedelta(seconds=1)
orders.loc[msk, "courier_notified_at_corrected"] = True
assert msk.sum() == 1
msk = orders["cancelled_at"] == orders["courier_accepted_at"]
orders.loc[msk, "courier_accepted_at"] -= datetime.timedelta(seconds=1)
orders.loc[msk, "courier_accepted_at_corrected"] = True
assert msk.sum() == 8
When a courier picked up an order in the moment it was cancelled, we adjust that.
msk = orders["cancelled_at"] == orders["pickup_at"]
orders.loc[msk, "pickup_at"] -= datetime.timedelta(seconds=1)
orders.loc[msk, "pickup_at_corrected"] = True
assert msk.sum() == 1
Verify that cancelled_at
is indeed the latest timestamp in every row.
orders["_max_datetime"] = pd.NaT
orders["_max_datetime"] = orders[
[
"restaurant_notified_at",
"restaurant_confirmed_at",
"dispatch_at",
"courier_notified_at",
"courier_accepted_at",
"pickup_at",
"left_pickup_at",
"delivery_at",
"cancelled_at",
]
].max(axis=1)
assert not (
cancelled & ~batch & (orders["cancelled_at"] != orders["_max_datetime"])
).any()
del orders["_max_datetime"]
The times in between the timestamps can be used to obtain timings of individual steps in the delivery process. In the original database, such timings were already logged. In the following, we validate the timestamps against the timings and only keep the timestamps as the timings are then calculated as @property
s in the ORM layer.
confirmed_total_time
is the difference between placed_at
and delivery_at
. It is set only for completed orders and useful only for ad_hoc orders. The Order
class has a total_time
property that computes that value.
all_data_available = (
orders["logged_confirmed_total_time"].notnull()
& orders["delivery_at"].notnull()
& orders["placed_at"].notnull()
)
good_data = (
orders["logged_confirmed_total_time"]
- ((orders["delivery_at"] - orders["placed_at"]).dt.total_seconds().round())
).abs() <= 5
del orders["logged_confirmed_total_time"]
assert (all_data_available & good_data).sum() == 635_768
assert (all_data_available & good_data & completed).sum() == 635_768
assert (all_data_available & good_data & ad_hoc).sum() == 561_340
round(
(all_data_available & good_data & ad_hoc).sum()
/ (all_data_available & ad_hoc).sum(),
3,
)
0.998
The best guess for accepting_time
is the difference between dispatch_at
and courier_accepted_at
. Order.time_to_accept
models that.
all_data_available = (
orders["logged_accepting_time"].notnull()
& orders["courier_accepted_at"].notnull()
& orders["dispatch_at"].notnull()
)
good_data = (
orders["logged_accepting_time"]
- (
(orders["courier_accepted_at"] - orders["dispatch_at"])
.dt.total_seconds()
.round()
)
).abs() <= 5
assert (all_data_available & good_data).sum() == 345_803
assert (all_data_available & good_data & completed).sum() == 345_803
round((all_data_available & good_data).sum() / all_data_available.sum(), 3)
0.607
We use accepting_time
to extrapolate missing values for dispatch_at
.
extrapolate = (
orders["dispatch_at"].isnull()
& orders["courier_accepted_at"].notnull()
& orders["logged_accepting_time"].notnull()
)
accept_time = orders["logged_accepting_time"].map(
lambda x: datetime.timedelta(seconds=x) if x is not pd.NA else pd.NaT
)
extrapolated_dispatch_at = orders["courier_accepted_at"] - accept_time
still_wrong = extrapolated_dispatch_at >= orders["courier_notified_at"]
msk = extrapolate & ~still_wrong
orders.loc[msk, "dispatch_at"] = extrapolated_dispatch_at.loc[msk]
orders.loc[msk, "dispatch_at_corrected"] = True
del orders["logged_accepting_time"]
assert extrapolate.sum() == 67_372
assert (extrapolate & ~still_wrong).sum() == 61_545
The best guess for reaction_time
is the difference between courier_notified_at
and courier_accepted_at
. Order.time_to_react
models that in the ORM.
all_data_available = (
orders["logged_reaction_time"].notnull()
& orders["courier_accepted_at"].notnull()
& orders["courier_notified_at"].notnull()
)
good_data = (
orders["logged_reaction_time"]
- (
(orders["courier_accepted_at"] - orders["courier_notified_at"])
.dt.total_seconds()
.round()
)
).abs() <= 5
assert (all_data_available & good_data).sum() == 165_355
assert (all_data_available & good_data & completed).sum() == 165_355
round((all_data_available & good_data).sum() / all_data_available.sum(), 3)
0.843
We use reaction_time
to extrapolate missing values for courier_notified_at
.
extrapolate = (
orders["courier_notified_at"].isnull()
& orders["courier_accepted_at"].notnull()
& orders["logged_reaction_time"].notnull()
)
extrapolated_courier_notified_at = (
orders["courier_accepted_at"]
# Some values for logged_reaction_time are <= 0.
- orders["logged_reaction_time"].map(
lambda x: datetime.timedelta(seconds=x) if x is not pd.NA and x > 0 else pd.NaT
)
)
still_wrong = extrapolated_courier_notified_at <= orders["dispatch_at"]
msk = extrapolate & ~still_wrong
orders.loc[msk, "courier_notified_at"] = extrapolated_courier_notified_at.loc[msk]
orders.loc[msk, "courier_notified_at_corrected"] = True
assert extrapolate.sum() == 214_043
assert (extrapolate & ~still_wrong).sum() == 213_290
No need to extrapolate courier_accepted_at
from courier_notified_at
.
assert not (
orders["courier_notified_at"].notnull()
& orders["courier_accepted_at"].isnull()
& orders["logged_reaction_time"].notnull()
).any()
del orders["logged_reaction_time"]
estimated_prep_duration
equals expected_wait_pickup_time
. As the latter is not filled in for cancelled orders, we keep the former.
Also, estimated_prep_duration
is only filled in starting with May 24. It is always a multiple of 60
, so it is stored as full minutes.
all_data_available = (
orders["estimated_prep_duration"].notnull()
& orders["expected_wait_pickup_time"].notnull()
)
good_data = (
orders["estimated_prep_duration"] - orders["expected_wait_pickup_time"]
).abs() <= 5
no_duration = orders["estimated_prep_duration"].isnull()
assert not (no_duration & orders["expected_wait_pickup_time"].notnull()).any()
assert (~no_duration & orders["expected_wait_pickup_time"].isnull()).sum() == 19_865
assert not (
(orders["placed_at"].dt.date > datetime.date(2016, 5, 24))
& orders["expected_wait_pickup_time"].isnull()
& (orders["cancelled"] == False)
).any()
del orders["expected_wait_pickup_time"]
assert orders.loc[no_duration, "placed_at"].min().date() == datetime.date(2016, 2, 21)
assert orders.loc[no_duration, "placed_at"].max().date() == datetime.date(2016, 5, 24)
assert orders.loc[~no_duration, "placed_at"].min().date() == datetime.date(2016, 5, 24)
assert orders.loc[~no_duration, "placed_at"].max().date() == datetime.date(2017, 1, 31)
assert not (~no_duration & (orders["estimated_prep_duration"] % 60 != 0)).any()
round((all_data_available & good_data).sum() / all_data_available.sum(), 3)
1.0
estimated_prep_duration
is the difference between restaurant_notified_at
and scheduled_pickup_at
when allowing up to half a minute of clock skew. restaurant_confirmed_at
only works in about 40% of the cases. So, if and when a restaurant confirms an order, does not affect the dispatching process.
all_data_available = (
orders["estimated_prep_duration"].notnull()
& orders["restaurant_notified_at"].notnull()
& orders["scheduled_pickup_at"].notnull()
)
good_data = (
orders["estimated_prep_duration"]
- (
(orders["scheduled_pickup_at"] - orders["restaurant_notified_at"])
.dt.total_seconds()
.round()
)
).abs() <= 35
assert (all_data_available & good_data).sum() == 539_668
assert (all_data_available & good_data & completed).sum() == 524_709
round((all_data_available & good_data).sum() / all_data_available.sum(), 3)
0.986
We use estimated_prep_duration
to correct about a third of the 1.5% of restaurant_notified_at
that are off for orders after May 24.
duration = orders["estimated_prep_duration"].map(
lambda x: datetime.timedelta(seconds=x) if x is not pd.NA else pd.NaT
)
calc_restaurant_notified_at = orders["scheduled_pickup_at"] - duration
not_wrong = (
completed
& (orders["placed_at"] < calc_restaurant_notified_at)
& (calc_restaurant_notified_at < orders["restaurant_confirmed_at"])
)
msk = all_data_available & ~good_data & not_wrong
orders.loc[msk, "restaurant_notified_at"] = calc_restaurant_notified_at.loc[msk]
orders.loc[msk, "restaurant_notified_at_corrected"] = True
assert (all_data_available & ~good_data).sum() == 7_514
assert msk.sum() == 2_425
assert orders.loc[msk, "placed_at"].min().date() == datetime.date(2016, 5, 24)
assert orders.loc[msk, "placed_at"].max().date() == datetime.date(2017, 1, 31)
Also, we use estimated_prep_duration
to extrapolate missing restaurant_notified_at
values for orders after May 24.
duration = orders["estimated_prep_duration"].map(
lambda x: datetime.timedelta(seconds=x) if x is not pd.NA else pd.NaT
)
extrapolated = orders["scheduled_pickup_at"] - duration
extrapolate = (
orders["restaurant_notified_at"].isnull()
& orders["scheduled_pickup_at"].notnull()
& orders["estimated_prep_duration"].notnull()
)
still_wrong = (
(extrapolated <= orders["placed_at"])
| (extrapolated >= orders["restaurant_confirmed_at"])
| (extrapolated >= orders["cancelled_at"])
)
msk = extrapolate & ~still_wrong
orders.loc[msk, "restaurant_notified_at"] = extrapolated.loc[msk]
orders.loc[msk, "restaurant_notified_at_corrected"] = True
assert extrapolate.sum() == 469
assert msk.sum() == 374
assert orders.loc[msk, "placed_at"].min().date() == datetime.date(2016, 5, 29)
Vice versa, we extrapolate estimated_prep_duration
as the difference of scheduled_pickup_at
and restaurant_notified_at
for orders before May 24.
extrapolated = orders["scheduled_pickup_at"] - orders["restaurant_notified_at"]
extrapolated = (extrapolated.dt.total_seconds() // 60 * 60).astype("Int64")
extrapolate = (
orders["restaurant_notified_at"].notnull()
& orders["scheduled_pickup_at"].notnull()
& orders["estimated_prep_duration"].isnull()
)
orders.loc[extrapolate, "estimated_prep_duration"] = extrapolated.loc[extrapolate]
orders.loc[extrapolate, "estimated_prep_duration_corrected"] = True
assert extrapolate.sum() == 108_398
assert orders.loc[extrapolate, "placed_at"].min().date() == datetime.date(2016, 2, 21)
assert orders.loc[extrapolate, "placed_at"].max().date() == datetime.date(2016, 5, 24)
More than 99.9% of the orders with estimated_prep_duration
set, have this value be under 45 minutes. We view the remaining ones as outliers and adjust them.
more_than_45_mins = orders["estimated_prep_duration"].notnull()
more_than_45_mins &= orders["estimated_prep_duration"] > 45 * 60
orders.loc[more_than_45_mins, "estimated_prep_duration"] = 45 * 60
orders.loc[more_than_45_mins, "estimated_prep_duration_corrected"] = True
assert more_than_45_mins.sum() == 449
round((~more_than_45_mins).sum() / orders["estimated_prep_duration"].notnull().sum(), 5)
0.99973
We create a boolean column pickup_not_confirmed
out of the text column courier_no_pickup_confirmed_issue
.
orders["courier_no_pickup_confirmed_issue"].value_counts()
Courier did not hit "Picked up" 35345 Name: courier_no_pickup_confirmed_issue, dtype: Int64
orders["pickup_not_confirmed"] = False
msk = orders["courier_no_pickup_confirmed_issue"].notnull()
orders.loc[msk, "pickup_not_confirmed"] = True
msk = orders["pickup_at"].isnull()
orders.loc[msk, "pickup_not_confirmed"] = pd.NA
del orders["courier_no_pickup_confirmed_issue"]
assert orders["pickup_not_confirmed"].sum() == 34_966
logged_to_pickup_time
and logged_pickup_time
constitute the difference between courier_accepted_at
and pickup_at
. logged_pickup_time
is negative in rare cases.
assert not (orders["logged_to_pickup_time"] < 0).any()
assert (orders["logged_pickup_time"] < 0).sum() == 30
all_data_available = (
orders["logged_to_pickup_time"].notnull()
& orders["logged_pickup_time"].notnull()
& (orders["logged_pickup_time"] >= 0)
& orders["pickup_at"].notnull()
& orders["courier_accepted_at"].notnull()
)
good_data = (
orders["logged_to_pickup_time"]
+ orders["logged_pickup_time"]
- ((orders["pickup_at"] - orders["courier_accepted_at"]).dt.total_seconds().round())
).abs() <= 5
pickup_not_confirmed = orders["pickup_not_confirmed"] == True
assert (all_data_available & good_data).sum() == 599_195
assert (all_data_available & good_data & completed).sum() == 599_111
assert (all_data_available & (good_data | pickup_not_confirmed)).sum() == 604_483
round((all_data_available & good_data).sum() / all_data_available.sum(), 3)
0.94
For the 6% where pickup_at
does not relate back to courier_accepted_at
, we correct the former. Unconfirmed pickups seem to not be the cause of these inconsistencies.
calc_pickup_at = (
orders["courier_accepted_at"]
+ orders["logged_to_pickup_time"].map(
lambda x: datetime.timedelta(seconds=x) if x is not pd.NA else pd.NaT
)
+ orders["logged_pickup_time"].map(
lambda x: datetime.timedelta(seconds=x) if x is not pd.NA else pd.NaT
)
)
msk = all_data_available & ~good_data
orders.loc[msk, "pickup_at"] = calc_pickup_at.loc[msk]
orders.loc[msk, "pickup_at_corrected"] = True
assert (all_data_available & ~good_data).sum() == 38_015
assert (all_data_available & ~good_data & pickup_not_confirmed).sum() == 5_288
Keep other timestamps consistent after the correction.
msk = orders["pickup_at"] <= orders["restaurant_notified_at"]
orders.loc[msk, "restaurant_notified_at"] = pd.NaT
orders.loc[msk, "restaurant_notified_at_corrected"] = True
assert msk.sum() == 107
msk = orders["pickup_at"] <= orders["restaurant_confirmed_at"]
orders.loc[msk, "restaurant_confirmed_at"] = pd.NaT
orders.loc[msk, "restaurant_confirmed_at_corrected"] = True
assert msk.sum() == 892
With logged_to_pickup_time
we calculate a new timestamp reached_pickup_at
.
to_pickup_time = orders["logged_to_pickup_time"].map(
lambda x: datetime.timedelta(seconds=x) if x is not pd.NA and x > 0 else pd.NaT
)
reached_pickup_at = orders["courier_accepted_at"] + to_pickup_time
orders["reached_pickup_at"] = pd.NaT
msk = (
completed & reached_pickup_at.notnull() & (reached_pickup_at < orders["pickup_at"])
)
orders.loc[msk, "reached_pickup_at"] = reached_pickup_at.loc[msk]
assert msk.sum() == 530_724
logged_courier_late_time
and logged_restaurant_late_time
are always set together. The ca. 110,000 missing values are spread over the entire horizon.
assert not (
(
orders["logged_courier_late_time"].notnull()
& orders["logged_restaurant_late_time"].isnull()
)
| (
orders["logged_courier_late_time"].isnull()
& orders["logged_restaurant_late_time"].notnull()
)
).any()
assert orders.loc[
orders["logged_courier_late_time"].isnull(), "placed_at"
].min().date() == datetime.date(2016, 2, 22)
assert orders.loc[
orders["logged_courier_late_time"].isnull(), "placed_at"
].max().date() == datetime.date(2017, 1, 31)
assert orders.loc[
orders["logged_courier_late_time"].notnull(), "placed_at"
].min().date() == datetime.date(2016, 2, 21)
assert orders.loc[
orders["logged_courier_late_time"].notnull(), "placed_at"
].max().date() == datetime.date(2017, 1, 31)
logged_courier_late_time
is mostly explained with reached_pickup_at
and scheduled_pickup_at
. Order.courier_early
and Order.courier_late
model that in the ORM.
all_data_available = (
orders["logged_courier_late_time"].notnull()
& orders["reached_pickup_at"].notnull()
& orders["scheduled_pickup_at"].notnull()
)
good_data = (
orders["logged_courier_late_time"]
- (
(orders["reached_pickup_at"] - orders["scheduled_pickup_at"])
.dt.total_seconds()
.round()
)
).abs() <= 5
assert (all_data_available & good_data).sum() == 471_553
assert (all_data_available & good_data & completed).sum() == 471_553
round((all_data_available & good_data).sum() / all_data_available.sum(), 3)
0.964
logged_restaurant_late_time
is mostly explained with pickup_at
and scheduled_pickup_at
. logged_restaurant_late_time
is also 0
quite often, indicating no timing was taken. Order.restaurant_early
and Order.restaurant_late
model that in the ORM.
all_data_available = (
orders["logged_restaurant_late_time"].notnull()
& orders["pickup_at"].notnull()
& orders["scheduled_pickup_at"].notnull()
)
good_data = (
orders["logged_restaurant_late_time"]
- ((orders["pickup_at"] - orders["scheduled_pickup_at"]).dt.total_seconds().round())
).abs() <= 5
restaurant_not_timed = orders["logged_restaurant_late_time"] == 0
assert (all_data_available).sum() == 503_179
assert (all_data_available & good_data).sum() == 245_714
assert (all_data_available & restaurant_not_timed).sum() == 246_362
assert (all_data_available & (good_data | restaurant_not_timed)).sum() == 488_512
restaurant_timed = orders["logged_restaurant_late_time"] != 0
round(
(all_data_available & restaurant_timed & good_data).sum()
/ (all_data_available & restaurant_timed).sum(),
3,
)
0.943
logged_wait_pickup_time
is unfortunately not a good timing to extrapolate when a meal was ready to picked up by the courier. It is only good to explain the difference between reached_pickup_at
and left_pickup_at
, which is not really the time the courier had to wait. Also, the field seems to only be tracked correctly if the courier was late.
all_data_available = (
(orders["logged_wait_pickup_time"]).notnull()
& orders["reached_pickup_at"].notnull()
& orders["left_pickup_at"].notnull()
)
good_data = (
orders["logged_wait_pickup_time"]
- (
(orders["left_pickup_at"] - orders["reached_pickup_at"])
.dt.total_seconds()
.round()
)
).abs() <= 5
round(
(all_data_available & good_data).sum() / (all_data_available).sum(), 3,
)
0.396
all_data_available = (
(orders["logged_wait_pickup_time"]).notnull()
& (orders["logged_courier_late_time"] >= 0)
& orders["reached_pickup_at"].notnull()
& orders["left_pickup_at"].notnull()
)
good_data = (
orders["logged_wait_pickup_time"]
- (
(orders["left_pickup_at"] - orders["reached_pickup_at"])
.dt.total_seconds()
.round()
)
).abs() <= 5
round(
(all_data_available & good_data).sum() / (all_data_available).sum(), 3,
)
1.0
del orders["logged_courier_late_time"]
del orders["logged_restaurant_late_time"]
del orders["logged_wait_pickup_time"]
We create a boolean column delivery_not_confirmed
out of the text column courier_no_pickup_confirmed_issue
.
orders["courier_no_delivery_confirmed_issue"].value_counts()
Courier did not hit "Dropped off" 13862 Name: courier_no_delivery_confirmed_issue, dtype: Int64
orders["delivery_not_confirmed"] = False
msk = orders["courier_no_delivery_confirmed_issue"].notnull()
orders.loc[msk, "delivery_not_confirmed"] = True
msk = orders["delivery_at"].isnull()
orders.loc[msk, "delivery_not_confirmed"] = pd.NA
del orders["courier_no_delivery_confirmed_issue"]
assert orders["delivery_not_confirmed"].sum() == 13_817
logged_to_delivery_time
and logged_delivery_time
constitute the difference between pickup_at
and delivery_at
. Without the pickup_at
corrections above, not 91% but only 86% of the differences would work. Order.time_to_delivery
and Order.time_at_delivery
model that in the ORM.
assert not (orders["logged_to_delivery_time"] < 0).any()
assert not (orders["logged_delivery_time"] < 0).any()
all_data_available = (
orders["logged_to_delivery_time"].notnull()
& orders["logged_delivery_time"].notnull()
& orders["delivery_at"].notnull()
& orders["pickup_at"].notnull()
)
good_data = (
orders["logged_to_delivery_time"]
+ orders["logged_delivery_time"]
- ((orders["delivery_at"] - orders["pickup_at"]).dt.total_seconds().round())
).abs() <= 5
delivery_not_confirmed = orders["delivery_not_confirmed"] == True
assert (all_data_available & good_data).sum() == 572_609
assert (all_data_available & good_data & completed).sum() == 572_609
assert (all_data_available & (good_data | delivery_not_confirmed)).sum() == 581_700
round(
(all_data_available & (good_data | delivery_not_confirmed)).sum()
/ all_data_available.sum(),
3,
)
0.913
courier_waited_at_delivery_issue
is filled in whenever the courier needed to wait for the customer at delivery. It is also filled in if the courier forgot to confirm the delivery, which mostly happened at the end of a shift. If a courier needed to wait for more than 45 minutes, that is summarized as 'waiting about 1, 2, or 3 hours.'
orders["courier_waited_at_delivery_issue"].value_counts().sort_index()
Waiting at Dropoff 1597 Waiting at Dropoff: 10 minutes 4132 Waiting at Dropoff: 11 minutes 3151 Waiting at Dropoff: 12 minutes 2348 Waiting at Dropoff: 13 minutes 1775 Waiting at Dropoff: 14 minutes 1432 Waiting at Dropoff: 15 minutes 1199 Waiting at Dropoff: 16 minutes 941 Waiting at Dropoff: 17 minutes 826 Waiting at Dropoff: 18 minutes 720 Waiting at Dropoff: 19 minutes 594 Waiting at Dropoff: 20 minutes 522 Waiting at Dropoff: 21 minutes 431 Waiting at Dropoff: 22 minutes 415 Waiting at Dropoff: 23 minutes 284 Waiting at Dropoff: 24 minutes 281 Waiting at Dropoff: 25 minutes 215 Waiting at Dropoff: 26 minutes 193 Waiting at Dropoff: 27 minutes 167 Waiting at Dropoff: 28 minutes 162 Waiting at Dropoff: 29 minutes 118 Waiting at Dropoff: 30 minutes 109 Waiting at Dropoff: 31 minutes 97 Waiting at Dropoff: 32 minutes 86 Waiting at Dropoff: 33 minutes 67 Waiting at Dropoff: 34 minutes 69 Waiting at Dropoff: 35 minutes 59 Waiting at Dropoff: 36 minutes 50 Waiting at Dropoff: 37 minutes 62 Waiting at Dropoff: 38 minutes 38 Waiting at Dropoff: 39 minutes 41 Waiting at Dropoff: 40 minutes 41 Waiting at Dropoff: 41 minutes 32 Waiting at Dropoff: 42 minutes 26 Waiting at Dropoff: 43 minutes 22 Waiting at Dropoff: 44 minutes 35 Waiting at Dropoff: 8 minutes 3958 Waiting at Dropoff: 9 minutes 5814 Waiting at Dropoff: about 1 hour 244 Waiting at Dropoff: about 2 hours 7 Waiting at Dropoff: about 3 hours 3 Name: courier_waited_at_delivery_issue, dtype: Int64
We convert courier_waited_at_delivery_issue
into courier_waited_at_delivery
to validate it further below.
waited_at_delivery = (
orders["courier_waited_at_delivery_issue"]
.str.replace(r"\D+", "", regex=True)
.fillna("NaN")
.replace("", "NaN")
.astype(float)
.astype("Int64")
)
orders["courier_waited_at_delivery"] = pd.NA
orders["courier_waited_at_delivery"] = orders["courier_waited_at_delivery"].astype(
"Int64"
)
hours = orders["courier_waited_at_delivery_issue"].str.contains("hour").fillna(0)
orders.loc[hours, "courier_waited_at_delivery"] = (
60 * 60 * waited_at_delivery.loc[hours]
)
mins = orders["courier_waited_at_delivery_issue"].str.contains("minutes").fillna(0)
orders.loc[mins, "courier_waited_at_delivery"] = 60 * waited_at_delivery.loc[mins]
customer_late = orders["courier_waited_at_delivery_issue"].notnull()
del orders["courier_waited_at_delivery_issue"]
assert hours.sum() == 254
assert mins.sum() == 30_512
For the roughly 9% of orders where logged_to_delivery_time
and logged_delivery_time
do not explain delivery_at
, the latter is corrected. However, this is only done if the courier did not have to wait for the customer or forgot to confirm the delivery, which wrongly shows up as waiting for the customer as well.
calc_delivery_at = (
orders["pickup_at"]
+ orders["logged_to_delivery_time"].map(
lambda x: datetime.timedelta(seconds=x) if x is not pd.NA else pd.NaT
)
+ orders["logged_delivery_time"].map(
lambda x: datetime.timedelta(seconds=x) if x is not pd.NA else pd.NaT
)
)
del orders["logged_delivery_time"]
orders["delivery_at_orig"] = orders["delivery_at"]
msk = (
all_data_available
& ~good_data
& (~customer_late | (customer_late & delivery_not_confirmed))
)
orders.loc[msk, "delivery_at"] = calc_delivery_at.loc[msk]
orders.loc[msk, "delivery_at_corrected"] = True
assert (all_data_available & ~good_data).sum() == 64_543
assert (all_data_available & ~good_data & ~customer_late).sum() == 49_122
assert (
all_data_available & ~good_data & customer_late & delivery_not_confirmed
).sum() == 5_241
assert (all_data_available & ~good_data & delivery_not_confirmed).sum() == 9_091
With logged_to_delivery_time
we calculate a new timestamp reached_delivery_at
.
to_delivery_time = orders["logged_to_delivery_time"].map(
lambda x: datetime.timedelta(seconds=x) if x is not pd.NA and x > 0 else pd.NaT
)
reached_delivery_at = orders["pickup_at"] + to_delivery_time
del orders["logged_to_delivery_time"]
orders["reached_delivery_at"] = pd.NaT
msk = (
completed
& reached_delivery_at.notnull()
& (reached_delivery_at < orders["delivery_at"])
)
orders.loc[msk, "reached_delivery_at"] = reached_delivery_at.loc[msk]
assert msk.sum() == 608_160
Some left_pickup_at
values conflict with that and are discarded.
msk = orders["left_pickup_at"] >= orders["reached_delivery_at"]
orders.loc[msk, "left_pickup_at"] = pd.NaT
orders.loc[msk, "left_pickup_at_corrected"] = True
assert msk.sum() == 4_215
logged_delivery_late_time
is the difference between scheduled_delivery_at
and delivery_at
for pre-orders. Order.delivery_early
and Order.delivery_late
model that in the ORM.
all_data_available = (
scheduled
& orders["logged_delivery_late_time"].notnull()
& orders["delivery_at"].notnull()
& orders["scheduled_delivery_at"].notnull()
)
good_data = (
orders["logged_delivery_late_time"]
- (
(orders["delivery_at"] - orders["scheduled_delivery_at"])
.dt.total_seconds()
.round()
)
).abs() <= 5
assert (all_data_available & good_data).sum() == 73_658
assert (all_data_available & good_data & completed).sum() == 73_658
assert (all_data_available & (good_data | delivery_not_confirmed)).sum() == 73_659
round(
(all_data_available & (good_data | delivery_not_confirmed)).sum()
/ all_data_available.sum(),
3,
)
0.986
expected_delivery_time
is simply the difference between placed_at
and scheduled_delivery_at
, for both ad-hoc and pre-orders. So, the field provides no new information.
all_data_available = (
orders["expected_delivery_time"].notnull()
& orders["placed_at"].notnull()
& orders["scheduled_delivery_at"].notnull()
)
good_data = (
orders["expected_delivery_time"]
- (
(orders["scheduled_delivery_at"] - orders["placed_at"])
.dt.total_seconds()
.round()
)
).abs() <= 5
del orders["expected_delivery_time"]
assert (all_data_available & good_data).sum() == 74_386
round((all_data_available & good_data).sum() / all_data_available.sum(), 3)
0.996
courier_waited_at_delivery
can be mostly explained as the difference between reached_delivery_at
and delivery_at
. Order.courier_waited_at_delivery
models that in the ORM.
all_data_available = (
orders["courier_waited_at_delivery"].notnull()
& orders["delivery_at"].notnull()
& orders["reached_delivery_at"].notnull()
)
good_data = (
orders["courier_waited_at_delivery"]
- (
(orders["delivery_at"] - orders["reached_delivery_at"])
.dt.total_seconds()
.round()
)
).abs() <= 90
imprecise_wait_times = orders["courier_waited_at_delivery"].fillna(0) >= 45 * 60
assert (all_data_available & good_data).sum() == 26_268
assert (all_data_available & good_data & completed).sum() == 26_268
assert (all_data_available & (good_data | imprecise_wait_times)).sum() == 26_499
round(
(all_data_available & (good_data | imprecise_wait_times)).sum()
/ all_data_available.sum(),
3,
)
0.871
We keep courier_waited_at_delivery
as a boolean field here to be used by Order.courier_waited_at_delivery
.
msk = orders["delivery_at"].notnull() & orders["courier_waited_at_delivery"].notnull()
orders["courier_waited_at_delivery"] = pd.NA
orders.loc[orders["delivery_at"].notnull(), "courier_waited_at_delivery"] = False
orders.loc[msk, "courier_waited_at_delivery"] = True
assert orders["courier_waited_at_delivery"].sum() == msk.sum() == 30_658
Keep the columns that log the courier's speed.
orders = orders.rename(
columns={
"logged_avg_courier_speed": "logged_avg_speed",
"logged_avg_courier_speed_distance": "logged_avg_speed_distance",
}
)
unrealistic = orders["logged_delivery_distance"] > 12_000
orders.loc[unrealistic, "logged_delivery_distance"] = pd.NA
assert unrealistic.sum() == 17
orders = orders[
[
# Generic columns
"delivery_id",
"customer_id",
"placed_at",
"ad_hoc",
"scheduled_delivery_at",
"scheduled_delivery_at_corrected",
"first_estimated_delivery_at",
"cancelled",
"cancelled_at",
"cancelled_at_corrected",
# Price related columns
"sub_total",
"delivery_fee",
"total",
# Restaurant related columns
"restaurant_id",
"restaurant_notified_at",
"restaurant_notified_at_corrected",
"restaurant_confirmed_at",
"restaurant_confirmed_at_corrected",
"estimated_prep_duration",
"estimated_prep_duration_corrected",
"estimated_prep_buffer",
# Dispatch related columns
"courier_id",
"dispatch_at",
"dispatch_at_corrected",
"courier_notified_at",
"courier_notified_at_corrected",
"courier_accepted_at",
"courier_accepted_at_corrected",
"utilization",
# Pickup related columns
"pickup_address_id",
"reached_pickup_at",
"pickup_at",
"pickup_at_corrected",
"pickup_not_confirmed",
"left_pickup_at",
"left_pickup_at_corrected",
# Delivery related columns
"delivery_address_id",
"reached_delivery_at",
"delivery_at",
"delivery_at_corrected",
"delivery_not_confirmed",
"courier_waited_at_delivery",
# Statistical columns
"logged_delivery_distance",
"logged_avg_speed",
"logged_avg_speed_distance",
]
].sort_index()
orders.head()
delivery_id | customer_id | placed_at | ad_hoc | scheduled_delivery_at | scheduled_delivery_at_corrected | first_estimated_delivery_at | cancelled | cancelled_at | cancelled_at_corrected | sub_total | delivery_fee | total | restaurant_id | restaurant_notified_at | restaurant_notified_at_corrected | restaurant_confirmed_at | restaurant_confirmed_at_corrected | estimated_prep_duration | estimated_prep_duration_corrected | estimated_prep_buffer | courier_id | dispatch_at | dispatch_at_corrected | courier_notified_at | courier_notified_at_corrected | courier_accepted_at | courier_accepted_at_corrected | utilization | pickup_address_id | reached_pickup_at | pickup_at | pickup_at_corrected | pickup_not_confirmed | left_pickup_at | left_pickup_at_corrected | delivery_address_id | reached_delivery_at | delivery_at | delivery_at_corrected | delivery_not_confirmed | courier_waited_at_delivery | logged_delivery_distance | logged_avg_speed | logged_avg_speed_distance | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
id | |||||||||||||||||||||||||||||||||||||||||||||
1 | 487377 | 1 | 2016-02-22 10:42:01 | False | 2016-02-22 12:30:00 | False | NaT | True | NaT | True | 1250 | 250 | 1500 | 1 | NaT | <NA> | NaT | <NA> | <NA> | <NA> | 0 | <NA> | NaT | <NA> | NaT | <NA> | NaT | <NA> | 15 | 2 | NaT | NaT | <NA> | <NA> | NaT | <NA> | 3 | NaT | NaT | <NA> | <NA> | <NA> | 671 | NaN | <NA> |
2 | 487433 | 2 | 2016-02-22 10:40:09 | False | 2016-02-22 12:00:00 | False | 2016-02-23 23:55:23 | False | NaT | <NA> | 1550 | 250 | 1800 | 2 | 2016-02-22 11:31:03 | False | NaT | True | 600 | True | 0 | 96 | 2016-02-22 10:48:24 | True | 2016-02-22 11:36:04 | False | 2016-02-22 11:37:00 | False | 19 | 4 | 2016-02-22 11:40:07 | 2016-02-22 11:42:12 | True | True | NaT | <NA> | 5 | 2016-02-22 12:11:17 | 2016-02-22 12:15:41 | True | False | False | 2281 | NaN | <NA> |
3 | 487444 | 3 | 2016-02-22 10:56:16 | False | 2016-02-22 12:00:00 | False | 2016-02-23 23:25:47 | False | NaT | <NA> | 2000 | 250 | 2250 | 3 | 2016-02-22 11:31:02 | False | 2016-02-22 11:42:00 | False | 600 | True | 0 | 57 | 2016-02-22 11:31:02 | True | 2016-02-22 11:31:03 | False | 2016-02-22 11:31:30 | False | 25 | 6 | 2016-02-22 11:54:31 | 2016-02-22 11:54:32 | False | False | NaT | <NA> | 7 | 2016-02-22 12:03:14 | 2016-02-22 12:07:44 | True | False | False | 2449 | NaN | <NA> |
4 | 470503 | 4 | 2016-02-22 11:11:46 | False | 2016-02-22 12:30:00 | False | 2016-02-22 12:25:50 | False | NaT | <NA> | 9800 | 250 | 10050 | 4 | 2016-02-22 11:32:04 | False | 2016-02-22 11:48:01 | False | 2700 | True | 0 | 39 | NaT | True | 2016-02-22 11:50:03 | False | 2016-02-22 11:50:29 | False | 100 | 8 | 2016-02-22 11:57:48 | 2016-02-22 12:11:26 | False | False | NaT | <NA> | 9 | 2016-02-22 12:12:53 | 2016-02-22 12:18:43 | False | False | False | 196 | NaN | <NA> |
5 | 487439 | 5 | 2016-02-22 11:35:58 | False | 2016-02-22 12:30:00 | False | 2016-02-22 12:28:19 | False | NaT | <NA> | 3600 | 250 | 3850 | 5 | 2016-02-22 11:50:03 | False | 2016-02-22 11:50:19 | False | 1200 | True | 0 | 128 | NaT | True | 2016-02-22 11:50:04 | False | 2016-02-22 11:50:28 | False | 45 | 10 | 2016-02-22 11:55:12 | 2016-02-22 12:05:19 | False | False | NaT | <NA> | 11 | 2016-02-22 12:16:53 | 2016-02-22 12:20:44 | False | False | False | 2584 | NaN | <NA> |
orders.info()
<class 'pandas.core.frame.DataFrame'> Int64Index: 660658 entries, 1 to 688690 Data columns (total 45 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 delivery_id 660658 non-null int64 1 customer_id 660658 non-null int64 2 placed_at 660658 non-null datetime64[ns] 3 ad_hoc 660658 non-null bool 4 scheduled_delivery_at 80121 non-null datetime64[ns] 5 scheduled_delivery_at_corrected 83398 non-null object 6 first_estimated_delivery_at 657168 non-null datetime64[ns] 7 cancelled 660658 non-null bool 8 cancelled_at 14081 non-null datetime64[ns] 9 cancelled_at_corrected 23552 non-null object 10 sub_total 660658 non-null int64 11 delivery_fee 660658 non-null int64 12 total 660658 non-null int64 13 restaurant_id 660658 non-null int64 14 restaurant_notified_at 655847 non-null datetime64[ns] 15 restaurant_notified_at_corrected 655982 non-null object 16 restaurant_confirmed_at 634974 non-null datetime64[ns] 17 restaurant_confirmed_at_corrected 639102 non-null object 18 estimated_prep_duration 660385 non-null Int64 19 estimated_prep_duration_corrected 660385 non-null object 20 estimated_prep_buffer 660658 non-null int64 21 courier_id 648239 non-null Int64 22 dispatch_at 648608 non-null datetime64[ns] 23 dispatch_at_corrected 656301 non-null object 24 courier_notified_at 314281 non-null datetime64[ns] 25 courier_notified_at_corrected 481307 non-null object 26 courier_accepted_at 650348 non-null datetime64[ns] 27 courier_accepted_at_corrected 650363 non-null object 28 utilization 660658 non-null int64 29 pickup_address_id 660658 non-null int64 30 reached_pickup_at 530724 non-null datetime64[ns] 31 pickup_at 637684 non-null datetime64[ns] 32 pickup_at_corrected 637700 non-null object 33 pickup_not_confirmed 637684 non-null object 34 left_pickup_at 293397 non-null datetime64[ns] 35 left_pickup_at_corrected 312136 non-null object 36 delivery_address_id 660658 non-null int64 37 reached_delivery_at 608160 non-null datetime64[ns] 38 delivery_at 637167 non-null datetime64[ns] 39 delivery_at_corrected 637167 non-null object 40 delivery_not_confirmed 637167 non-null object 41 courier_waited_at_delivery 637167 non-null object 42 logged_delivery_distance 660641 non-null object 43 logged_avg_speed 213812 non-null float64 44 logged_avg_speed_distance 213812 non-null Int64 dtypes: Int64(3), bool(2), datetime64[ns](14), float64(1), int64(10), object(15) memory usage: 224.9+ MB
for column in orders.columns:
if column.endswith("corrected"):
print(column, (orders[column] == True).sum())
scheduled_delivery_at_corrected 3321 cancelled_at_corrected 9569 restaurant_notified_at_corrected 2862 restaurant_confirmed_at_corrected 4128 estimated_prep_duration_corrected 108591 dispatch_at_corrected 69241 courier_notified_at_corrected 216869 courier_accepted_at_corrected 23 pickup_at_corrected 38032 left_pickup_at_corrected 18739 delivery_at_corrected 54363
assert (
hashlib.sha256(orders.to_json().encode()).hexdigest()
== "c548084f094bd220f3aff7e9b7072a4964127f6962dffd54f21c8d1f5b846a7f"
)
All couriers had at least one order.
assert set(couriers.reset_index()["id"]) == set(
orders.loc[orders["courier_id"].notnull(), "courier_id"].unique()
)
Only keep restaurants that had at least one order.
restaurants = restaurants.reset_index()
msk = restaurants["id"].isin(orders["restaurant_id"].unique())
restaurants = restaurants[msk].set_index("id")
assert (~msk).sum() == 6
Only keep addresses with pickups or deliveries.
addresses = addresses.reset_index()
msk = addresses["id"].isin(
set(restaurants["address_id"])
| set(orders["pickup_address_id"])
| set(orders["delivery_address_id"])
)
addresses = addresses[msk].set_index("id")
assert (~msk).sum() == 100
discarded_addresses = set(addresses["primary_id"]) - set(addresses.reset_index()["id"])
for old_primary_id in set(addresses["primary_id"]) - set(addresses.reset_index()["id"]):
msk = addresses["primary_id"] == old_primary_id
new_primary_id = addresses[msk].index.min()
addresses.loc[msk, "primary_id"] = new_primary_id
config.CLEAN_SCHEMA
'clean'
cities.to_sql(
"cities",
con=connection,
schema=config.CLEAN_SCHEMA,
if_exists="append",
index=True,
)
addresses.to_sql(
"addresses",
con=connection,
schema=config.CLEAN_SCHEMA,
if_exists="append",
index=True,
)
restaurants.to_sql(
"restaurants",
con=connection,
schema=config.CLEAN_SCHEMA,
if_exists="append",
index=True,
)
couriers.to_sql(
"couriers",
con=connection,
schema=config.CLEAN_SCHEMA,
if_exists="append",
index=True,
)
customers = pd.DataFrame({"id": orders["customer_id"].unique()}).set_index("id")
customers.to_sql(
"customers",
con=connection,
schema=config.CLEAN_SCHEMA,
if_exists="append",
index=True,
)
orders.to_sql(
"orders",
con=connection,
schema=config.CLEAN_SCHEMA,
if_exists="append",
index=True,
)