#!/usr/bin/env python # coding: utf-8 # # *Storing Storm Data* # # # **International Hurricane Watchgroup (IHW) has been asked to update their analysis tools. Because of the increase in public awareness of hurricanes, they are required to be more diligient with the analysis of historical hurricane data they share across the organization. They have been facing trouble sharing data across the teams and keeping it consistent.

From what we've been told, it seems that their method of sharing the data with their anaylsts has been to save a CSV file on their local servers and have every analyst pull the data down. Then, each analyst uses a local SQLite engine to store the CSV, run their queries, and send their results around. We've been shared a CSV [File](https://dq-content.s3.amazonaws.com/251/storm_data.csv).

# Data Dictionary:** # # > fid - ID for the row
# > year - Recorded year
# > month - Recorded month
# > day - Recorded date
# > ad_time - Recorded time in UTC
# > btid - Hurricane ID
# > name - Name of the hurricane
# > lat - Latitude of the recorded location
# > long - Longitude of the recorded location
# > wind_kts - Wind speed in knots per second
# > pressure - Atmospheric pressure of the hurricane
# > cat - Hurricane category
# > basin - The basin the hurricane is located
# > shape_leng - Hurricane shape length
# # # Aim # # ***We want to productionize their services and change the current inefficient way of sharing and querying data.
We will create a database that will accomplish the following:*** # # > Have Database for the IHW to store their tables.
# > Have Table in that Database that contains fields detailed in the CSV file (in a Storage Efficient Way)
# > Have Users that can update, read, and insert into a table of the data.
# > Have Users that can only read into a table.
# > And Insert data into the table. # ### Introduction to Data # In[1]: import io import csv from urllib import request import pandas as pd response = request.urlopen("https://dq-content.s3.amazonaws.com/251/storm_data.csv") reader = csv.reader(io.TextIOWrapper(response)) df = pd.read_csv(io.TextIOWrapper(response)) df.head() # ### Exploring Various Columns and deciding their required Data Types # # Let's see datatypes of different columns. And how much storage each data type will need. # In[2]: df.dtypes # **Getting Max length of Numerical Columns** # In[18]: columns = list(df.columns) num_columns = columns[:6] + columns[7:-3] + columns[-1:] for item in num_columns: print([item, max(df[item].value_counts().index), len(str(max(df[item].value_counts().index)))]) # ### Numerical Columns Datatypes for PostgreSQL # # - For `FID`: We wil use INTEGER datatype. Since, it's largest value is 59228. # - Columns `YEAR`, `MONTH`, `DAY` represent any particular date and `AD_TIME` represents a record of the time in [UTC (Coordinated Universal Time)](https://en.wikipedia.org/wiki/Coordinated_Universal_Time): Here, we will combine all of them into a single column and use TIMESTAMP datatype for this column. # - For `BTID`, `WIND_KTS` and `PRESSURE`: We will use SMALLINT. Since, there maximum values are 1410, 165 and 1024 respectively. # - For `LAT` and `LONG`: We will use DECIMAL datatype with precision 4 and scale 1. Since, they have max. 3 digits before decimal and 1 digit after decimal. # - For `Shape_Leng`: We will use DECIMAL datatype with precision 8 and scale 6. Since, it has max. two digits before decimal and 6 digits after decimal. # **Getting Maximum Length of String Columns** # In[4]: str_columns = [columns[x] for x in (6, 11, 12)] print(str_columns) for item in str_columns: print(max([(len(x), x) for x in df[item].unique()])) # ### String Columns Datatypes for PostgreSQL # # - For `NAME`: # - We will use VARCHAR(10) since max length is 9 # - For `CAT`: # - We will use VARCHAR(2) since max length is 2 # - For `BASIN`: # - we will use VARCHAR(16) since max length is 15 # ### Creating the Table # # Now we will create `IHW Database` and `Hurricanes Table` to store the values from CSV into PostgreSQL # In[5]: # Create Database First import psycopg2 from datetime import datetime conn = psycopg2.connect("dbname=postgres user=postgres password=postgres host=localhost") conn.autocommit = True cur = conn.cursor() cur.execute("DROP DATABASE IF EXISTS ihw") cur.execute("CREATE DATABASE ihw") conn.close() # In[6]: conn = psycopg2.connect("dbname=ihw user=postgres password=postgres host=localhost") conn.autocommit = True cur = conn.cursor() cur.execute("DROP TABLE IF EXISTS hurricanes") cur.execute(""" CREATE TABLE hurricanes ( fid INTEGER PRIMARY KEY, date TIMESTAMP, btid SMALLINT, name VARCHAR(10), lat DECIMAL(4, 1), long DECIMAL(4, 1), wind_kts SMALLINT, pressure SMALLINT, category VARCHAR(2), basin VARCHAR(16), shape_length DECIMAL(8, 6) ) """) conn.close() # `hurricanes table` # In[7]: conn = psycopg2.connect("dbname=ihw user=postgres password=postgres host=localhost") query = "SELECT * FROM hurricanes limit 5;" pd.read_sql_query(query, conn) # ### Creating Users # # With the table set up, we will now create two users on the database: # # - One that can `insert`, `update`, and `read` the data but not `delete`. This is like creating a "data production" user whose job it is is to always write new and existing data to the table. # # - Second, for the IHW team's analysts to just run read queries on the data. # In[8]: conn.close() conn = psycopg2.connect("dbname=ihw user=postgres password=postgres host=localhost") conn.autocommit = True cur = conn.cursor() cur.execute("Drop USER IF EXISTS ihw_production") cur.execute("CREATE USER ihw_production WITH PASSWORD 'ihw.production.whi'") cur.execute("Drop USER IF EXISTS ihw_analyst") cur.execute("CREATE USER ihw_analyst WITH PASSWORD 'ihw.analyst.whi'") cur.execute("REVOKE ALL ON hurricanes FROM ihw_production") cur.execute("REVOKE ALL ON hurricanes FROM ihw_analyst") cur.execute("GRANT INSERT, UPDATE, SELECT ON hurricanes TO ihw_production") cur.execute("GRANT SELECT ON hurricanes TO ihw_analyst") conn.close() # ### Creating Readonly Group # In[9]: conn = psycopg2.connect("dbname=ihw user=postgres password=postgres host=localhost") conn.autocommit = True cur = conn.cursor() cur.execute("DROP GROUP IF EXISTS analysts") cur.execute("CREATE GROUP analysts NOLOGIN") conn.close() # ### Inserting Data # # Now we will insert data into the table using `Insert` and `mogrify` method. # In[10]: conn = psycopg2.connect("dbname=ihw user=ihw_production password=ihw.production.whi host=localhost") conn.autocommit = True cur = conn.cursor() response = request.urlopen("https://dq-content.s3.amazonaws.com/251/storm_data.csv") reader = csv.reader(io.TextIOWrapper(response)) next(reader) mogrified_values = [] for row in reader: date = datetime(int(row[1]), int(row[2]), int(row[3]), hour=int(row[4][:2]), minute=int(row[4][2:-1])) updated_row = [row[0], date] + row[5:] # print(updated_row) mogrified = cur.mogrify("(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", updated_row).decode('utf-8') mogrified_values.append(mogrified) cur.execute("INSERT INTO hurricanes VALUES " + ",".join(mogrified_values)) conn.close() # ### Confirming Data is read into PostgreSQL # # Now, we will run a select query to check if data is loaded into the Database and whether it can be accessed by analysts. This data can now be queried efficiently. # `hurricanes table` # In[11]: conn = psycopg2.connect("dbname=ihw user=ihw_analyst password=ihw.analyst.whi host=localhost") query = "SELECT * FROM hurricanes limit 5;" pd.read_sql_query(query, conn) # In[12]: conn.close() # ***Important:***
Run the following commands if you're unable to create/drop a Database. This can be due to auto-connections. # In[13]: import psycopg2 conn = psycopg2.connect("dbname=ihw user=postgres password=postgres host=localhost") conn.autocommit = True cur = conn.cursor() cur.execute("REVOKE CONNECT ON DATABASE ihw FROM public;") conn.close() # In[14]: conn = psycopg2.connect("dbname=ihw user=postgres password=postgres host=localhost") conn.autocommit = True cur = conn.cursor() cur.execute("""SELECT pid, pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = 'ihw' AND pid <> pg_backend_pid();""") conn.close() # In[ ]: