Storing Storm Data

International Hurricane Watchgroup (IHW) has been asked to update their analysis tools. Because of the increase in public awareness of hurricanes, they are required to be more diligient with the analysis of historical hurricane data they share across the organization. They have been facing trouble sharing data across the teams and keeping it consistent.

From what we've been told, it seems that their method of sharing the data with their anaylsts has been to save a CSV file on their local servers and have every analyst pull the data down. Then, each analyst uses a local SQLite engine to store the CSV, run their queries, and send their results around. We've been shared a CSV File.

Data Dictionary:

fid - ID for the row
year - Recorded year
month - Recorded month
day - Recorded date
ad_time - Recorded time in UTC
btid - Hurricane ID
name - Name of the hurricane
lat - Latitude of the recorded location
long - Longitude of the recorded location
wind_kts - Wind speed in knots per second
pressure - Atmospheric pressure of the hurricane
cat - Hurricane category
basin - The basin the hurricane is located
shape_leng - Hurricane shape length

Aim

We want to productionize their services and change the current inefficient way of sharing and querying data.
We will create a database that will accomplish the following:

Have Database for the IHW to store their tables.
Have Table in that Database that contains fields detailed in the CSV file (in a Storage Efficient Way)
Have Users that can update, read, and insert into a table of the data.
Have Users that can only read into a table.
And Insert data into the table.

Introduction to Data

In [1]:
import io 
import csv
from urllib import request
import pandas as pd

response = request.urlopen("https://dq-content.s3.amazonaws.com/251/storm_data.csv")
reader = csv.reader(io.TextIOWrapper(response))

df = pd.read_csv(io.TextIOWrapper(response))
df.head()
Out[1]:
FID YEAR MONTH DAY AD_TIME BTID NAME LAT LONG WIND_KTS PRESSURE CAT BASIN Shape_Leng
0 2001 1957 8 8 1800Z 63 NOTNAMED 22.5 -140.0 50 0 TS Eastern Pacific 1.140175
1 2002 1961 10 3 1200Z 116 PAULINE 22.1 -140.2 45 0 TS Eastern Pacific 1.166190
2 2003 1962 8 29 0600Z 124 C 18.0 -140.0 45 0 TS Eastern Pacific 2.102380
3 2004 1967 7 14 0600Z 168 DENISE 16.6 -139.5 45 0 TS Eastern Pacific 2.121320
4 2005 1972 8 16 1200Z 251 DIANA 18.5 -139.8 70 0 H1 Eastern Pacific 1.702939

Exploring Various Columns and deciding their required Data Types

Let's see datatypes of different columns. And how much storage each data type will need.

In [2]:
df.dtypes
Out[2]:
FID             int64
YEAR            int64
MONTH           int64
DAY             int64
AD_TIME        object
BTID            int64
NAME           object
LAT           float64
LONG          float64
WIND_KTS        int64
PRESSURE        int64
CAT            object
BASIN          object
Shape_Leng    float64
dtype: object

Getting Max length of Numerical Columns

In [18]:
columns = list(df.columns)
num_columns = columns[:6] + columns[7:-3] + columns[-1:]

for item in num_columns:
    print([item, max(df[item].value_counts().index), len(str(max(df[item].value_counts().index)))])
['FID', 59228, 5]
['YEAR', 2008, 4]
['MONTH', 12, 2]
['DAY', 31, 2]
['AD_TIME', '1800Z', 5]
['BTID', 1410, 4]
['LAT', 69.0, 4]
['LONG', 180.0, 5]
['WIND_KTS', 165, 3]
['PRESSURE', 1024, 4]
['Shape_Leng', 11.18034, 8]

Numerical Columns Datatypes for PostgreSQL

  • For FID: We wil use INTEGER datatype. Since, it's largest value is 59228.
  • Columns YEAR, MONTH, DAY represent any particular date and AD_TIME represents a record of the time in UTC (Coordinated Universal Time): Here, we will combine all of them into a single column and use TIMESTAMP datatype for this column.
  • For BTID, WIND_KTS and PRESSURE: We will use SMALLINT. Since, there maximum values are 1410, 165 and 1024 respectively.
  • For LAT and LONG: We will use DECIMAL datatype with precision 4 and scale 1. Since, they have max. 3 digits before decimal and 1 digit after decimal.
  • For Shape_Leng: We will use DECIMAL datatype with precision 8 and scale 6. Since, it has max. two digits before decimal and 6 digits after decimal.

Getting Maximum Length of String Columns

In [4]:
str_columns = [columns[x] for x in (6, 11, 12)]
print(str_columns)

for item in str_columns:
    print(max([(len(x), x) for x in df[item].unique()]))
['NAME', 'CAT', 'BASIN']
(9, 'SEBASTIEN')
(2, 'TS')
(15, 'Eastern Pacific')

String Columns Datatypes for PostgreSQL

  • For NAME:
    • We will use VARCHAR(10) since max length is 9
  • For CAT:
    • We will use VARCHAR(2) since max length is 2
  • For BASIN:
    • we will use VARCHAR(16) since max length is 15

Creating the Table

Now we will create IHW Database and Hurricanes Table to store the values from CSV into PostgreSQL

In [5]:
# Create Database First

import psycopg2
from datetime import datetime

conn = psycopg2.connect("dbname=postgres user=postgres password=postgres host=localhost")
conn.autocommit = True
cur = conn.cursor()

cur.execute("DROP DATABASE IF EXISTS ihw")
cur.execute("CREATE DATABASE ihw")
conn.close()
In [6]:
conn = psycopg2.connect("dbname=ihw user=postgres password=postgres host=localhost")
conn.autocommit = True
cur = conn.cursor()
cur.execute("DROP TABLE IF EXISTS hurricanes")

cur.execute("""
    CREATE TABLE hurricanes (
        fid INTEGER PRIMARY KEY,
        date TIMESTAMP,
        btid SMALLINT,
        name VARCHAR(10),
        lat DECIMAL(4, 1), 
        long DECIMAL(4, 1), 
        wind_kts SMALLINT, 
        pressure SMALLINT,
        category VARCHAR(2),
        basin VARCHAR(16),
        shape_length  DECIMAL(8, 6)
        )
    """)

conn.close()

hurricanes table

In [7]:
conn = psycopg2.connect("dbname=ihw user=postgres password=postgres host=localhost")
query = "SELECT * FROM hurricanes limit 5;"
pd.read_sql_query(query, conn)
Out[7]:
fid date btid name lat long wind_kts pressure category basin shape_length

Creating Users

With the table set up, we will now create two users on the database:

  • One that can insert, update, and read the data but not delete. This is like creating a "data production" user whose job it is is to always write new and existing data to the table.

  • Second, for the IHW team's analysts to just run read queries on the data.

In [8]:
conn.close()

conn = psycopg2.connect("dbname=ihw user=postgres password=postgres host=localhost")
conn.autocommit = True
cur = conn.cursor()

cur.execute("Drop USER IF EXISTS ihw_production")
cur.execute("CREATE USER ihw_production WITH PASSWORD 'ihw.production.whi'")

cur.execute("Drop USER IF EXISTS ihw_analyst")
cur.execute("CREATE USER ihw_analyst WITH PASSWORD 'ihw.analyst.whi'")

cur.execute("REVOKE ALL ON hurricanes FROM ihw_production")
cur.execute("REVOKE ALL ON hurricanes FROM ihw_analyst")
cur.execute("GRANT INSERT, UPDATE, SELECT ON hurricanes TO ihw_production")
cur.execute("GRANT SELECT ON hurricanes TO ihw_analyst")
conn.close()

Creating Readonly Group

In [9]:
conn = psycopg2.connect("dbname=ihw user=postgres password=postgres host=localhost")
conn.autocommit = True
cur = conn.cursor()
cur.execute("DROP GROUP IF EXISTS analysts")

cur.execute("CREATE GROUP analysts NOLOGIN")
conn.close()

Inserting Data

Now we will insert data into the table using Insert and mogrify method.

In [10]:
conn = psycopg2.connect("dbname=ihw user=ihw_production password=ihw.production.whi host=localhost")
conn.autocommit = True
cur = conn.cursor()

response = request.urlopen("https://dq-content.s3.amazonaws.com/251/storm_data.csv")
reader = csv.reader(io.TextIOWrapper(response))
next(reader)

mogrified_values = []

for row in reader:
    date = datetime(int(row[1]), int(row[2]), int(row[3]), hour=int(row[4][:2]), minute=int(row[4][2:-1]))
    updated_row = [row[0], date] + row[5:]
#     print(updated_row)

    mogrified = cur.mogrify("(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", updated_row).decode('utf-8')
    mogrified_values.append(mogrified)
    
cur.execute("INSERT INTO hurricanes VALUES " + ",".join(mogrified_values))
conn.close()

Confirming Data is read into PostgreSQL

Now, we will run a select query to check if data is loaded into the Database and whether it can be accessed by analysts. This data can now be queried efficiently.

hurricanes table

In [11]:
conn = psycopg2.connect("dbname=ihw user=ihw_analyst password=ihw.analyst.whi host=localhost")
query = "SELECT * FROM hurricanes limit 5;"
pd.read_sql_query(query, conn)
Out[11]:
fid date btid name lat long wind_kts pressure category basin shape_length
0 2001 1957-08-08 18:00:00 63 NOTNAMED 22.5 -140.0 50 0 TS Eastern Pacific 1.140175
1 2002 1961-10-03 12:00:00 116 PAULINE 22.1 -140.2 45 0 TS Eastern Pacific 1.166190
2 2003 1962-08-29 06:00:00 124 C 18.0 -140.0 45 0 TS Eastern Pacific 2.102380
3 2004 1967-07-14 06:00:00 168 DENISE 16.6 -139.5 45 0 TS Eastern Pacific 2.121320
4 2005 1972-08-16 12:00:00 251 DIANA 18.5 -139.8 70 0 H1 Eastern Pacific 1.702939
In [12]:
conn.close()

Important:
Run the following commands if you're unable to create/drop a Database. This can be due to auto-connections.

In [13]:
import psycopg2
conn = psycopg2.connect("dbname=ihw user=postgres password=postgres host=localhost")
conn.autocommit = True
cur = conn.cursor()

cur.execute("REVOKE CONNECT ON DATABASE ihw FROM public;")
conn.close()
In [14]:
conn = psycopg2.connect("dbname=ihw user=postgres password=postgres host=localhost")
conn.autocommit = True
cur = conn.cursor()

cur.execute("""SELECT pid, pg_terminate_backend(pid) 
FROM pg_stat_activity 
WHERE datname = 'ihw' AND pid <> pg_backend_pid();""")
conn.close()
In [ ]: