В данном тюториале, я хотел бы рассказать, как использовать в качестве хранилища данных бесплатный SQL сервер. Для предобработки данных будет использован Pandas. В итоге, мы получим удобную базу с данными, на основе которых можно будет строить необходимые модели.
В какой-то момент на работе (консалтинг не связанный никак с IT) коллеги нашли систему, из которой можно выгружать типовые файлы по ценам и производстве электроэнергии по часам. Естественно возникло желание собрать данные за 3 года воедино в удобной форме для дальнейшего анализа и в перспективе прогнозирования временных рядов. Однако, имеется несколько проблем:
1) Данные выгружаются в виде множества Excel файлов: цены за год - 365/366 файлов, производство электроэнергии по часам и производителям - несколько десятков тысяч файлов.
2) Данные требуют предобработки - удаления шапок, лишних столбцов.
3) Данные занимают несколько гигабайт и многие инструменты аналитические (в случае моих коллег Power BI), реагируют на них не так хорошо как хотелось бы.
Таким образом, имеем задачу подготовки и хранения данных по времянным рядам. В тюториале я покажу, как это решить начиная с момента, когда ничего нет, до работающей базы и готовых обработчиков. Тюториал будет интересен для новичков, которые никогда ни с чем подобным не работали.
Для экономии времени возьмем первые 7 дней 2015 года. Данные можно скачать по ссылке: Данные для работы
Содержание данных: 1 файл с ценами (15 Мб) и 159 файлов с производством (7 Мб).
Для начала необходимо установить на машине необходимую инфраструктуру. В данном случае это SQL Server Express 2017 и SQL Management Studio.
У меня на компьютере стоит 64-битная Windows 10 Pro.
Приступим.
Скачиваем и устанавливаем SQL Server Express 2017. Выбор обусловлен тем, что версия бесплатная с 10 Гб памяти для данных (на работе на сервере установили под Linux, там ограничений нет).
После установки необходимо установить SQL Management Studio для создания, настройки баз, ролей доступов. Для установки, достаточно перейти по ссылке, которую предлагает SQL Server Express после установки:
После этого, создадим базу данных, в которую будем записывать таблицы:
Помимо SQL Server Express и SQL Management Studio, может понадобиться скачать и установить драйвер для работы с базой данных.
ODBC Driver 13 for SQL Server
Предварительно подключим необходимые библиотеки. Для работы я буду использовать библиотеки:
pyodbc
os
time
pandas
sqlalchemy
import os
import time
from datetime import datetime
import numpy as np
import pandas as pd
import pyodbc
from sqlalchemy import create_engine
База данных создана, зачнём работу с данными. Для начала создадим пути к папкам, в которых хранятся данные по ценам и производству электроэнергии в России:
path_production = r"C:\Users\Lenovo\Desktop\Tutorial\Production"
path_prices = r"C:\Users\Lenovo\Desktop\Tutorial\Prices"
list_files_production = os.listdir(path_production)
list_files_prices = os.listdir(path_prices)
files_production = [f for f in list_files_production if f[-3:] == "xls"]
files_prices = [f for f in list_files_prices if f[-3:] == "xls"]
Теперь создадим SQL таблицы, в которые будем записывать почасовые цены на электроэнергию.
Разберёмся по порядку.
Для начала необходимо создать connection, через который будет осуществляться связь с базой данных. Для этого мы воспользуемся библиотекой pyodbc в которой есть метод connect, который позволяет создать подключение. В качестве аргументов необходимо передать следующие данные:
Важно! После манпуляций с базой не забывайте закрывать соединение с помощью метода .close().
После создания таблицы SQL, необходимо создать курсор, помощью которого мы будем отправлять команды к базе SQL, используя созданное соединение. Для этого у соединения есть метод .cursor(). Далее в курсор нужно передать строку (запрос SQL), выполнение которой позволит выполнить необходимые манипуляции с таблицей SQL (создание, заполнение и т.п.).
Важно! Чтобы измения произошли, обязательно нужно сделать commit().
Попробуем создать таблицу, в которую потом будем записывать цены:
start = time.time()
cnxn = pyodbc.connect(
"DRIVER= {ODBC Driver 13 for SQL Server}; SERVER=DESKTOP-1NF3KQV\\SQLEXPRESS; DATABASE=ODS_db;Trusted_Connection=Yes"
)
cursor = cnxn.cursor()
if cursor.tables(table="Prices", tableType="TABLE").fetchone():
print("Table already exists!")
else:
string = "CREATE TABLE Prices(ID_node numeric, price float, _datetime datetime)"
cursor.execute(string)
cnxn.commit()
print(
"Time (sec) of creating SQL-table for Prices = ", round(time.time() - start, 2)
)
cnxn.close()
В качестве строки записывается команда на языке SQL. В данном случае, я создаю 3 столбца для ID узла (минимальная единица, для которой определяется цена), цена и время для которого эта цена актуальна.
Далее по тому же принципу создадим таблицу с данными по производству электроэнергии. В качестве столбцов взяты: ID узла и время (по данным столбца можно связывать две таблицы), а так же непосредственно объем производства и ID генерирующей единицы, которая произвела данный объем.
start = time.time()
cnxn = pyodbc.connect(
"DRIVER= {ODBC Driver 13 for SQL Server}; SERVER=DESKTOP-1NF3KQV\\SQLEXPRESS; DATABASE=ODS_db;Trusted_Connection=Yes"
)
cursor = cnxn.cursor()
if cursor.tables(table="Production", tableType="TABLE").fetchone():
print("Table already exists!")
else:
string = "CREATE TABLE Production(ID_unit numeric, ID_node numeric, _datetime datetime, production float)"
cursor.execute(string)
cnxn.commit()
print(
"Time (sec) of creating SQL-table for Production = ",
round(time.time() - start, 2),
)
cnxn.close()
Для начала разберемся с ценами. Один файл Excel соответствует одному дню, номер этого дня мы можем извлечь из названия файла. Каждый час в сутках находится на отдельном листе. Напишем функцию, которая будет извлекать данные.
Внутри файлы с ценами выглядят вот так:
# Функция предобработки файлов с ценами
def Prices_preprocessing(path, f):
df_list = [] ## list с DataFrame по каждому часу
for i in range(24):
df = pd.read_excel(path + "\\" + f, sheet_name=i) # Считываем файл в DataFrame
df.drop(
[0, 1], inplace=True
) # Вырезаем первые две строки, не содержащие ничего полезного
df.drop(
df.columns[[1, 2, 3, 5]], axis=1, inplace=True
) # Удаляем лишние столбцы, оставляем
df.columns = [
"ID_node",
"price",
] # Для красоты переименуем столбцы с ID узлов и ценами
df["_datetime"] = f[:8] # Вырезаем из названия дату
df["_datetime"] = df["_datetime"].apply(
lambda x: datetime.strptime(x, "%Y%m%d").replace(hour=i)
)
df_list.append(df)
df = pd.concat(
df_list, axis=0, ignore_index=True
) # Соединим данные с разных страниц в один DataFrame
df.fillna(0, inplace=True) # Заполним "дырки" нулями
return df
# Соединение данных в один DataFrame
start = time.time()
df_prices = []
for f in files_prices:
df_prices.append(Prices_preprocessing(path_prices, f))
df_prices = pd.concat(df_prices, axis=0, ignore_index=True)
time_prices_preprocessing = time.time() - start
print("Time of Prices preprocessing = ", round(time_prices_preprocessing, 1))
Препроцессинг цен одной недели занял почти 10 минут. Теперь обработаем файлы, содержащие объём производства электроэнергии. Вот так выглядит каждый из файлов.
def func(x):
return datetime.strptime(x[0], "%Y%m%d").replace(hour=int(x[1]))
# Функция предобработки данных по объёму производства
def Production_preprocessing(path, f):
df = pd.read_excel(path + "\\" + f)
df.drop(
df.columns[
[
1,
3,
4,
6,
7,
9,
10,
12,
13,
15,
16,
18,
19,
21,
22,
24,
25,
27,
28,
30,
31,
33,
34,
36,
37,
39,
40,
42,
43,
45,
46,
48,
49,
51,
52,
54,
55,
57,
58,
60,
61,
63,
64,
66,
67,
69,
70,
72,
73,
75,
76,
77,
78,
]
],
axis=1,
inplace=True,
) # Вырезаем ненужные столбцы
df.drop([0, 1, 2, 3, 4, 5], inplace=True) # Отрезаем верхние лишние строки
df.drop(df.tail(1).index, inplace=True) # Отрезаем строку "Итого"
df.columns = ["ID_unit", "ID_node"] + [
x for x in range(24)
] # Переименуем столбцы, в том числе данным по производству дадим номер соответствующего часа
df["_datetime"] = f[:8] # Вырезаем из названия дату
df = pd.melt(
df,
id_vars=["ID_unit", "ID_node", "_datetime"],
value_vars=[x for x in range(24)],
) # Превратим данные из столбцов по часам в строки (unpivot)
df.rename(columns={"value": "production", "variable": "hour"}, inplace=True)
df["_datetime"] = df[["_datetime", "hour"]].apply(
func, axis=1
) # Склеиваем дату и время
df.drop(["hour"], axis=1, inplace=True) # Убираем ненужный уже столбец "hour"
return df
start = time.time()
df_production = []
for f in files_production:
df_production.append(Production_preprocessing(path_production, f))
df_production = pd.concat(df_production, axis=0, ignore_index=True)
time_production_preprocessing = time.time() - start
print("Time of Production preprocessing = ", round(time_production_preprocessing, 1))
Препроцессинг занял меньше времени, чем в случае цен.
start = time.time()
cnxn = pyodbc.connect(
"DRIVER= {ODBC Driver 13 for SQL Server}; SERVER=DESKTOP-1NF3KQV\\SQLEXPRESS; DATABASE=ODS_db;Trusted_Connection=Yes"
)
cursor = cnxn.cursor()
cursor.executemany("INSERT INTO Prices values (?, ?, ?);", df_prices.values.tolist())
cnxn.commit()
time_prices_commit = time.time() - start
start = time.time()
cursor.executemany(
"INSERT INTO Production values (?, ?, ?, ?);", df_production.values.tolist()
)
cnxn.commit()
time_production_commit = time.time() - start
cnxn.close()
print("Number of rows in DataFrame with Prices: ", len(df_prices))
print(
"Memory usage of DataFrame with Prices: ",
round(np.sum(list(df_prices.memory_usage())) / 1024 / 1024, 1),
)
print("Time of Prices preprocessing: ", round(time_prices_preprocessing, 1))
print("Time of Prices commit: ", round(time_prices_commit, 1))
print("Overall time: ", round(time_prices_preprocessing + time_prices_commit, 1))
print("\n")
print("Number of rows in DataFrame with Production: ", len(df_production))
print(
"Memory usage of DataFrame with Production: ",
round(np.sum(list(df_production.memory_usage())) / 1024 / 1024, 1),
)
print("Time of Production preprocessing: ", round(time_production_preprocessing, 1))
print("Time of Production commit: ", round(time_production_commit, 1))
print(
"Overall time: ", round(time_production_preprocessing + time_production_commit, 1)
)
Как мы видим, загрузка данных в базу достаточно затратное дело. Мы загрузили данные всего лишь за 1 неделю и это заняло 3 минуты + предобработка. Мне же нужно загрузить данные за 3 года :) Однако, в конечном итоге мы будем иметь под рукой временные рядов по всем ценам и всему производству в стране (которые извлекаться будут гораздо быстрее), которые можно накладывать на другие данные и строить предиктивные модели. С помощью SQL Management Studio можно поработать с загруженными данными: посмотреть или преобразовать их. Посмотрим, что у нас сейчас хранится на сервере (написал небольшой запрос на SQL, который сводит воедино для каждого производителя энергии объём производства и цену, по которой данный производитель продаёт электроэнергию):
В качестве альтернативы, можно воспользоваться стандартным методом DataFrame'a .to_sql(). Он немного медленнее (на данных за одну неделю это незначительно, но на данных за 3 года, всё же ощутимо), но удобнее. Например, можно предварительно не создавать таблицу на сервере. Поэтому рекомендую использовать его. Для начала откроем соединение:
engine = create_engine(
"mssql+pyodbc://DESKTOP-1NF3KQV\\SQLEXPRESS/ODS_db?driver=ODBC Driver 13 for SQL Server"
)
engine.connect()
Теперь загрузим данные на сервер. Подробнее хочется остановиться на плюсе второго метода в сравнении с первым - это предварительное создание таблиц. У метода есть полезный пареметр if_exists который принимает значения 'fail', 'replace' и 'append'. Что они позволяют сделать, догадаться не сложно. Мы поставим значение 'replace'.
start = time.time()
df_prices.to_sql("Prices_2_way", con=engine, if_exists="replace")
time_prices_commit_2_way = time.time() - start
start = time.time()
df_production.to_sql("Production_2_way", con=engine, if_exists="replace")
time_production_commit_2_way = time.time() - start
print("Number of rows in DataFrame with Prices: ", len(df_prices))
print(
"Memory usage of DataFrame with Prices: ",
round(np.sum(list(df_prices.memory_usage())) / 1024 / 1024, 1),
)
print("Time of Prices preprocessing: ", round(time_prices_preprocessing, 1))
print("Time of Prices commit: ", round(time_prices_commit_2_way, 1))
print("Overall time: ", round(time_prices_preprocessing + time_prices_commit_2_way, 1))
print("\n")
print("Number of rows in DataFrame with Production: ", len(df_production))
print(
"Memory usage of DataFrame with Production: ",
round(np.sum(list(df_production.memory_usage())) / 1024 / 1024, 1),
)
print("Time of Production preprocessing: ", round(time_production_preprocessing, 1))
print("Time of Production commit: ", round(time_production_commit_2_way, 1))
print(
"Overall time: ",
round(time_production_preprocessing + time_production_commit_2_way, 1),
)
Как видим, использование стандартного метода to_sql занимает несколько больше времени. Давайте посмотрим на память, которую занимают в обоих случаях. Это можно сделать просто в SQL Management Studio.
Как видим, второй способ более затратный. Но пользоваться им всё же удобнее. Посмотрим далее, как работать с полученными базами.
Помимо того как извлекать данные, нам интересно где лучше производить манипуляции с данными: на стороне SQL или с помощью pandas. Для этого сравним затраты по времени. Для проверки у меня есть подходящий кейс. Для получения данных в формате "производство-цена" нужно соединить таблицы Prices и Production.
start = time.time()
cnxn = pyodbc.connect(
"DRIVER= {ODBC Driver 13 for SQL Server}; SERVER=DESKTOP-1NF3KQV\\SQLEXPRESS; DATABASE=ODS_db;Trusted_Connection=Yes"
)
data_extracted = pd.read_sql(
"SELECT t1.ID_unit, t1.ID_node, t1._datetime, t1.production, t2.price FROM Production AS t1 LEFT JOIN Prices AS t2 ON t1.ID_node=t2.ID_node AND t1._datetime=t2._datetime",
cnxn,
)
time_join_extract = time.time() - start
print("Time of join and extract = ", round(time_join_extract, 1))
cnxn.close()
Теперь сначала извлечем данные, потом соединим данные:
start = time.time()
cnxn = pyodbc.connect(
"DRIVER= {ODBC Driver 13 for SQL Server}; SERVER=DESKTOP-1NF3KQV\\SQLEXPRESS; DATABASE=ODS_db;Trusted_Connection=Yes"
)
production_extracted = pd.read_sql("SELECT * FROM Production", cnxn)
prices_extracted = pd.read_sql("SELECT * FROM Prices", cnxn)
data_joined = pd.merge(
production_extracted, prices_extracted, how="left", on=["ID_node", "_datetime"]
)
time_extract_join = time.time() - start
print("Time of extract and join = ", round(time_extract_join, 1))
cnxn.close()
Как мы видим, манипуляции с данными лучше делать на уровне SQL запроса к базе данных, чем используя pandas. Это достаточно удобно, делать преобразования на уровне SELECT'ов и работать уже с готовыми данными.
Как можно заметить, обе выгрузки я делал с помощью стандартного метода read_sql(). Я выбрал его, т.к. на выходе мы получаем сразу DataFrame (в отличие от, например, метода fetchall() из pyodbc, результаты которого ещё нужно будет переводить в DataFrame), что экономит время.
Использование стандартных методов to_sql() и read_sql() очень удобно и позволяет хранить большие объёмы данных в SQL Server. Однако, стоит учитывать, что загрузка данных на сервер дело очень не быстрое и если у вас гигабайты данных и миллионы строк (как было у меня), работа может затянуться надолго. При этом, когда вы оперируете очень большими (по меркам простенького ноутбука) данными, могут вылетать ошибки из-за нехватки оперативной памяти. В таком случае, лучше разрезать данные на порции (в случае временных рядов недели, месяца, кварталы) и загружать порциями (не забывайте ставить if_exists='replace', чтобы новые данные не затирали предыдущие).
Метод read_sql() позволяет использовать сложные SQL-запросы. Советую пользоваться этим, т.к. это ускорит работу с данными.
Первый подход к загрузке данных я описал не просто так. С помощью него удобно создавать, удалять, заполнять базы прямо из Jupyter Notebook'a, если вам не удобно это делать через SQL Management Studio. Этим удобно пользоваться, когда вы делаете манипуляции не связанные с просто импортом данных из DataFrame'a. Просто записываете нужные команды в SQL-строку и управляете вашими таблицами. При этом всё же не забывайте про SQL Management Studio. Это удобно.
Думаю, описал всё, что необходимо, чтобы с нуля создать у себя на компьютере базу для данных, заполнить её и работать в своё удовольствие.