International Hurricane Watchgroup (IHW) has been asked to update their analysis tools. Because of the increase in public awareness of hurricanes, they are required to be more diligient with the analysis of historical hurricane data they share across the organization. They have been facing trouble sharing data across the teams and keeping it consistent.
From what we've been told, it seems that their method of sharing the data with their anaylsts has been to save a CSV file on their local servers and have every analyst pull the data down. Then, each analyst uses a local SQLite engine to store the CSV, run their queries, and send their results around. We've been shared a CSV File.
Data Dictionary:
fid - ID for the row
year - Recorded year
month - Recorded month
day - Recorded date
ad_time - Recorded time in UTC
btid - Hurricane ID
name - Name of the hurricane
lat - Latitude of the recorded location
long - Longitude of the recorded location
wind_kts - Wind speed in knots per second
pressure - Atmospheric pressure of the hurricane
cat - Hurricane category
basin - The basin the hurricane is located
shape_leng - Hurricane shape length
*We want to productionize their services and change the current inefficient way of sharing and querying data.
We will create a database that will accomplish the following:*
Have Database for the IHW to store their tables.
Have Table in that Database that contains fields detailed in the CSV file (in a Storage Efficient Way)
Have Users that can update, read, and insert into a table of the data.
Have Users that can only read into a table.
And Insert data into the table.
import io
import csv
from urllib import request
import pandas as pd
response = request.urlopen("https://dq-content.s3.amazonaws.com/251/storm_data.csv")
reader = csv.reader(io.TextIOWrapper(response))
df = pd.read_csv(io.TextIOWrapper(response))
df.head()
FID | YEAR | MONTH | DAY | AD_TIME | BTID | NAME | LAT | LONG | WIND_KTS | PRESSURE | CAT | BASIN | Shape_Leng | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2001 | 1957 | 8 | 8 | 1800Z | 63 | NOTNAMED | 22.5 | -140.0 | 50 | 0 | TS | Eastern Pacific | 1.140175 |
1 | 2002 | 1961 | 10 | 3 | 1200Z | 116 | PAULINE | 22.1 | -140.2 | 45 | 0 | TS | Eastern Pacific | 1.166190 |
2 | 2003 | 1962 | 8 | 29 | 0600Z | 124 | C | 18.0 | -140.0 | 45 | 0 | TS | Eastern Pacific | 2.102380 |
3 | 2004 | 1967 | 7 | 14 | 0600Z | 168 | DENISE | 16.6 | -139.5 | 45 | 0 | TS | Eastern Pacific | 2.121320 |
4 | 2005 | 1972 | 8 | 16 | 1200Z | 251 | DIANA | 18.5 | -139.8 | 70 | 0 | H1 | Eastern Pacific | 1.702939 |
Let's see datatypes of different columns. And how much storage each data type will need.
df.dtypes
FID int64 YEAR int64 MONTH int64 DAY int64 AD_TIME object BTID int64 NAME object LAT float64 LONG float64 WIND_KTS int64 PRESSURE int64 CAT object BASIN object Shape_Leng float64 dtype: object
Getting Max length of Numerical Columns
columns = list(df.columns)
num_columns = columns[:6] + columns[7:-3] + columns[-1:]
for item in num_columns:
print([item, max(df[item].value_counts().index), len(str(max(df[item].value_counts().index)))])
['FID', 59228, 5] ['YEAR', 2008, 4] ['MONTH', 12, 2] ['DAY', 31, 2] ['AD_TIME', '1800Z', 5] ['BTID', 1410, 4] ['LAT', 69.0, 4] ['LONG', 180.0, 5] ['WIND_KTS', 165, 3] ['PRESSURE', 1024, 4] ['Shape_Leng', 11.18034, 8]
FID
: We wil use INTEGER datatype. Since, it's largest value is 59228.YEAR
, MONTH
, DAY
represent any particular date and AD_TIME
represents a record of the time in UTC (Coordinated Universal Time): Here, we will combine all of them into a single column and use TIMESTAMP datatype for this column.BTID
, WIND_KTS
and PRESSURE
: We will use SMALLINT. Since, there maximum values are 1410, 165 and 1024 respectively.LAT
and LONG
: We will use DECIMAL datatype with precision 4 and scale 1. Since, they have max. 3 digits before decimal and 1 digit after decimal.Shape_Leng
: We will use DECIMAL datatype with precision 8 and scale 6. Since, it has max. two digits before decimal and 6 digits after decimal.Getting Maximum Length of String Columns
str_columns = [columns[x] for x in (6, 11, 12)]
print(str_columns)
for item in str_columns:
print(max([(len(x), x) for x in df[item].unique()]))
['NAME', 'CAT', 'BASIN'] (9, 'SEBASTIEN') (2, 'TS') (15, 'Eastern Pacific')
NAME
:CAT
:BASIN
:Now we will create IHW Database
and Hurricanes Table
to store the values from CSV into PostgreSQL
# Create Database First
import psycopg2
from datetime import datetime
conn = psycopg2.connect("dbname=postgres user=postgres password=postgres host=localhost")
conn.autocommit = True
cur = conn.cursor()
cur.execute("DROP DATABASE IF EXISTS ihw")
cur.execute("CREATE DATABASE ihw")
conn.close()
conn = psycopg2.connect("dbname=ihw user=postgres password=postgres host=localhost")
conn.autocommit = True
cur = conn.cursor()
cur.execute("DROP TABLE IF EXISTS hurricanes")
cur.execute("""
CREATE TABLE hurricanes (
fid INTEGER PRIMARY KEY,
date TIMESTAMP,
btid SMALLINT,
name VARCHAR(10),
lat DECIMAL(4, 1),
long DECIMAL(4, 1),
wind_kts SMALLINT,
pressure SMALLINT,
category VARCHAR(2),
basin VARCHAR(16),
shape_length DECIMAL(8, 6)
)
""")
conn.close()
hurricanes table
conn = psycopg2.connect("dbname=ihw user=postgres password=postgres host=localhost")
query = "SELECT * FROM hurricanes limit 5;"
pd.read_sql_query(query, conn)
fid | date | btid | name | lat | long | wind_kts | pressure | category | basin | shape_length |
---|
With the table set up, we will now create two users on the database:
One that can insert
, update
, and read
the data but not delete
. This is like creating a "data production" user whose job it is is to always write new and existing data to the table.
Second, for the IHW team's analysts to just run read queries on the data.
conn.close()
conn = psycopg2.connect("dbname=ihw user=postgres password=postgres host=localhost")
conn.autocommit = True
cur = conn.cursor()
cur.execute("Drop USER IF EXISTS ihw_production")
cur.execute("CREATE USER ihw_production WITH PASSWORD 'ihw.production.whi'")
cur.execute("Drop USER IF EXISTS ihw_analyst")
cur.execute("CREATE USER ihw_analyst WITH PASSWORD 'ihw.analyst.whi'")
cur.execute("REVOKE ALL ON hurricanes FROM ihw_production")
cur.execute("REVOKE ALL ON hurricanes FROM ihw_analyst")
cur.execute("GRANT INSERT, UPDATE, SELECT ON hurricanes TO ihw_production")
cur.execute("GRANT SELECT ON hurricanes TO ihw_analyst")
conn.close()
conn = psycopg2.connect("dbname=ihw user=postgres password=postgres host=localhost")
conn.autocommit = True
cur = conn.cursor()
cur.execute("DROP GROUP IF EXISTS analysts")
cur.execute("CREATE GROUP analysts NOLOGIN")
conn.close()
Now we will insert data into the table using Insert
and mogrify
method.
conn = psycopg2.connect("dbname=ihw user=ihw_production password=ihw.production.whi host=localhost")
conn.autocommit = True
cur = conn.cursor()
response = request.urlopen("https://dq-content.s3.amazonaws.com/251/storm_data.csv")
reader = csv.reader(io.TextIOWrapper(response))
next(reader)
mogrified_values = []
for row in reader:
date = datetime(int(row[1]), int(row[2]), int(row[3]), hour=int(row[4][:2]), minute=int(row[4][2:-1]))
updated_row = [row[0], date] + row[5:]
# print(updated_row)
mogrified = cur.mogrify("(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", updated_row).decode('utf-8')
mogrified_values.append(mogrified)
cur.execute("INSERT INTO hurricanes VALUES " + ",".join(mogrified_values))
conn.close()
Now, we will run a select query to check if data is loaded into the Database and whether it can be accessed by analysts. This data can now be queried efficiently.
hurricanes table
conn = psycopg2.connect("dbname=ihw user=ihw_analyst password=ihw.analyst.whi host=localhost")
query = "SELECT * FROM hurricanes limit 5;"
pd.read_sql_query(query, conn)
fid | date | btid | name | lat | long | wind_kts | pressure | category | basin | shape_length | |
---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2001 | 1957-08-08 18:00:00 | 63 | NOTNAMED | 22.5 | -140.0 | 50 | 0 | TS | Eastern Pacific | 1.140175 |
1 | 2002 | 1961-10-03 12:00:00 | 116 | PAULINE | 22.1 | -140.2 | 45 | 0 | TS | Eastern Pacific | 1.166190 |
2 | 2003 | 1962-08-29 06:00:00 | 124 | C | 18.0 | -140.0 | 45 | 0 | TS | Eastern Pacific | 2.102380 |
3 | 2004 | 1967-07-14 06:00:00 | 168 | DENISE | 16.6 | -139.5 | 45 | 0 | TS | Eastern Pacific | 2.121320 |
4 | 2005 | 1972-08-16 12:00:00 | 251 | DIANA | 18.5 | -139.8 | 70 | 0 | H1 | Eastern Pacific | 1.702939 |
conn.close()
*Important:*
Run the following commands if you're unable to create/drop a Database. This can be due to auto-connections.
import psycopg2
conn = psycopg2.connect("dbname=ihw user=postgres password=postgres host=localhost")
conn.autocommit = True
cur = conn.cursor()
cur.execute("REVOKE CONNECT ON DATABASE ihw FROM public;")
conn.close()
conn = psycopg2.connect("dbname=ihw user=postgres password=postgres host=localhost")
conn.autocommit = True
cur = conn.cursor()
cur.execute("""SELECT pid, pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE datname = 'ihw' AND pid <> pg_backend_pid();""")
conn.close()