We use the part of the instacart data that you can find here https://www.instacart.com/datasets/grocery-shopping-2017
Specically order_products__prior.csv a 4 columns, 33.2 Million rows csv file.
Before 2.2.10 It took 355.58 seconds to process all the data set in a Windows 10, Instacart data
After 2.2.10 It took 78 sec. infer== False
%load_ext autoreload
%autoreload 2
import sys
sys.path.append("..")
# Create optimus
from optimus import Optimus
op = Optimus(master="local[*]", app_name = "optimus" ,verbose =True, checkpoint= True)
C:\Users\argenisleon\Anaconda3\lib\site-packages\socks.py:58: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working from collections import Callable You are using PySparkling of version 2.4.10, but your PySpark is of version 2.3.1. Please make sure Spark and PySparkling versions are compatible. INFO:optimus:Operative System:Windows INFO:optimus:Just check that Spark and all necessary environments vars are present... INFO:optimus:----- INFO:optimus:SPARK_HOME=C:\opt\spark\spark-2.3.1-bin-hadoop2.7 INFO:optimus:HADOOP_HOME=C:\opt\hadoop-2.7.7 INFO:optimus:PYSPARK_PYTHON=C:\Users\argenisleon\Anaconda3\python.exe INFO:optimus:PYSPARK_DRIVER_PYTHON=jupyter INFO:optimus:PYSPARK_SUBMIT_ARGS=--jars "file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/RedshiftJDBC42-1.2.16.1027.jar,file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/mysql-connector-java-8.0.16.jar,file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/ojdbc8.jar,file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/postgresql-42.2.5.jar" --driver-class-path "C:/Users/argenisleon/Documents/Optimus/optimus/jars/RedshiftJDBC42-1.2.16.1027.jar;C:/Users/argenisleon/Documents/Optimus/optimus/jars/mysql-connector-java-8.0.16.jar;C:/Users/argenisleon/Documents/Optimus/optimus/jars/ojdbc8.jar;C:/Users/argenisleon/Documents/Optimus/optimus/jars/postgresql-42.2.5.jar" --conf "spark.sql.catalogImplementation=hive" pyspark-shell INFO:optimus:JAVA_HOME=C:\java INFO:optimus:Pyarrow Installed INFO:optimus:----- INFO:optimus:Starting or getting SparkSession and SparkContext... INFO:optimus:Spark Version:2.3.1 INFO:optimus:Setting checkpoint folder local. If you are in a cluster initialize Optimus with master='your_ip' as param INFO:optimus:Deleting previous folder if exists... INFO:optimus:Creating the checkpoint directory... INFO:optimus: ____ __ _ / __ \____ / /_(_)___ ___ __ _______ / / / / __ \/ __/ / __ `__ \/ / / / ___/ / /_/ / /_/ / /_/ / / / / / / /_/ (__ ) \____/ .___/\__/_/_/ /_/ /_/\__,_/____/ /_/ INFO:optimus:Transform and Roll out... INFO:optimus:Optimus successfully imported. Have fun :). INFO:optimus:Config.ini not found
df = op.load.csv("C:\\Users\\argenisleon\\Desktop\\order_products__prior.csv")
df.table()
order_id
1 (int)
nullable
|
product_id
2 (int)
nullable
|
add_to_cart_order
3 (int)
nullable
|
reordered
4 (int)
nullable
|
---|---|---|---|
2
|
33120
|
1
|
1
|
2
|
28985
|
2
|
1
|
2
|
9327
|
3
|
0
|
2
|
45918
|
4
|
1
|
2
|
30035
|
5
|
0
|
2
|
17794
|
6
|
1
|
2
|
40141
|
7
|
1
|
2
|
1819
|
8
|
1
|
2
|
43668
|
9
|
0
|
3
|
33754
|
1
|
1
|
%%time
df.groupBy("order_id").count().sort("count",ascending=False).show()
+--------+-----+ |order_id|count| +--------+-----+ | 1564244| 145| | 790903| 137| | 61355| 127| | 2970392| 121| | 2069920| 116| | 3308010| 115| | 2753324| 114| | 2499774| 112| | 2621625| 109| | 77151| 109| | 2136777| 108| | 1657096| 108| | 2648316| 105| | 1892299| 104| | 171934| 104| | 653887| 102| | 1384519| 102| | 1867980| 102| | 3282039| 101| | 3052353| 101| +--------+-----+ only showing top 20 rows Wall time: 15.9 s
%%time
df.cols.frequency("order_id")
--------------------------------------------------------------------------- NameError Traceback (most recent call last) <timed eval> in <module> NameError: name 'df' is not defined
%%time
op.profiler.to_json(df, "order_id", infer=False, relative_error=1)
INFO:optimus:Procesing General Stats... INFO:optimus:general_stats() executed in 37.12 sec INFO:optimus:Using 'column_exp' to process column 'order_id' with function _cast_to INFO:optimus:cast_columns() executed in 0.01 sec INFO:optimus:Processing Frequency ... INFO:optimus:dataset_info() executed in 0.07 sec
Wall time: 2min 43s
'{"columns": {"order_id": {"stats": {"count_uniques_agg": 3025302, "min": 2, "max": 3421083, "stddev": 987300.69645, "kurtosis": -1.19913, "mean": 1710748.51894, "skewness": -0.00049, "sum": 55487254019416, "variance": 974762665216.534, "zeros": 0, "count_na": 0, "hist": [{"count": 3239832.0, "lower": 2.0, "upper": 342110.1}, {"count": 3243756.0, "lower": 342110.1, "upper": 684218.2}, {"count": 3237942.0, "lower": 684218.2, "upper": 1026326.3}, {"count": 3243055.0, "lower": 1026326.3, "upper": 1368434.4}, {"count": 3247818.0, "lower": 1368434.4, "upper": 1710542.5}, {"count": 3249098.0, "lower": 1710542.5, "upper": 2052650.6}, {"count": 3243017.0, "lower": 2052650.6, "upper": 2394758.7}, {"count": 3248231.0, "lower": 2394758.7, "upper": 2736866.8}, {"count": 3241887.0, "lower": 2736866.8, "upper": 3078974.9}, {"count": 3239843.0, "lower": 3078974.9, "upper": 3421083.0}], "percentile": {"0.75": 2, "0.95": 2, "0.05": 2, "0.25": 2, "0.5": 2}, "missing_count": 0, "p_missing": 0.0, "range": 3421081, "median": 2, "interquartile_range": 0, "coef_variation": 0.57712, "mad": 0}, "frequency": [{"value": 1564244, "count": 145}, {"value": 790903, "count": 137}, {"value": 61355, "count": 127}, {"value": 2970392, "count": 121}, {"value": 2069920, "count": 116}, {"value": 3308010, "count": 115}, {"value": 2753324, "count": 114}, {"value": 2499774, "count": 112}, {"value": 77151, "count": 109}, {"value": 2621625, "count": 109}], "name": "order_id", "column_dtype": "int", "dtypes_stats": {"string": 0, "bool": 0, "int": 32434489, "float": 0, "double": 0, "date": 0, "array": 0, "null": 0, "missing": 0}, "column_type": "numeric"}}, "name": null, "rows_count": "32.4 million", "count_types": {"numeric": 1, "categorical": 0, "null": 0, "array": 0, "date": 0, "bool": 0}, "size": "262.3 MB", "summary": {"cols_count": 4, "rows_count": 32434489, "missing_count": "0.0%", "size": "262.1 MB"}, "sample": {"columns": [{"title": "order_id"}, {"title": "product_id"}, {"title": "add_to_cart_order"}, {"title": "reordered"}], "value": [[934811], [1149543], [1156809], [1320736], [1343186], [1565712], [1968204], [2450020], [2508067], [2830472]]}}'
a = df.limit(10) 1:46 2:25
%%time
df.cols.frequency("order_id")
--------------------------------------------------------------------------- TypeError Traceback (most recent call last) <timed eval> in <module> ~\Documents\Optimus\optimus\helpers\decorators.py in wrapper(*args, **kwargs) 47 def wrapper(*args, **kwargs): 48 start_time = timeit.default_timer() ---> 49 f = func(*args, **kwargs) 50 _time = round(timeit.default_timer() - start_time, 2) 51 if log_time: ~\Documents\Optimus\optimus\dataframe\columns.py in frequency(columns, n) 1546 result = {} 1547 for f in freq.collect(): -> 1548 result[f[0]] = [{"value": kv[0], "count": kv[1]} for kv in f[1]] 1549 1550 return result TypeError: 'int' object is not iterable
from optimus import Profiler
p = Profiler()
p.run(a,"order_id")
INFO:optimus:Config.ini not found INFO:optimus:general_stats() executed in 23.99 sec INFO:optimus:Processing column 'order_id'... INFO:optimus:Using 'column_exp' to process column 'order_id' with function _cast_to INFO:optimus:cast_columns() executed in 0.02 sec INFO:optimus:------------------------------ INFO:optimus:Processing column 'order_id'... INFO:optimus:frequency() executed in 1.39 sec INFO:optimus:dataset_info() executed in 0.08 sec
--------------------------------------------------------------------------- TemplateNotFound Traceback (most recent call last) <ipython-input-24-c1ed1a1b51c2> in <module> 1 from optimus import Profiler 2 p = Profiler() ----> 3 p.run(a,"order_id") ~\Documents\Optimus\optimus\helpers\decorators.py in timed(*args, **kw) 26 def timed(*args, **kw): 27 start_time = timeit.default_timer() ---> 28 f = method(*args, **kw) 29 _time = round(timeit.default_timer() - start_time, 2) 30 logger.print("{name}() executed in {time} sec".format(name=method.__name__, time=_time)) ~\Documents\Optimus\optimus\profiler\profiler.py in run(self, df, columns, buckets, infer, relative_error, approx_count) 199 template_env = jinja2.Environment(loader=template_loader, autoescape=True) 200 --> 201 202 # Render template 203 # Create the profiler info header ~\Anaconda3\lib\site-packages\jinja2\environment.py in get_template(self, name, parent, globals) 828 if parent is not None: 829 name = self.join_path(name, parent) --> 830 return self._load_template(name, self.make_globals(globals)) 831 832 @internalcode ~\Anaconda3\lib\site-packages\jinja2\environment.py in _load_template(self, name, globals) 802 template.is_up_to_date): 803 return template --> 804 template = self.loader.load(self, name, globals) 805 if self.cache is not None: 806 self.cache[cache_key] = template ~\Anaconda3\lib\site-packages\jinja2\loaders.py in load(self, environment, name, globals) 111 # first we try to get the source for this template together 112 # with the filename and the uptodate function. --> 113 source, filename, uptodate = self.get_source(environment, name) 114 115 # try to load the code from the bytecode cache if there is a ~\Anaconda3\lib\site-packages\jinja2\loaders.py in get_source(self, environment, template) 185 return False 186 return contents, filename, uptodate --> 187 raise TemplateNotFound(template) 188 189 def list_templates(self): TemplateNotFound: /out/general_info.html
op.profiler.run(a, "order_id", infer=True, relative_error=1)
INFO:optimus:general_stats() executed in 23.64 sec INFO:optimus:Processing column 'order_id'... INFO:optimus:Using 'column_exp' to process column 'order_id' with function _cast_to INFO:optimus:cast_columns() executed in 0.02 sec INFO:optimus:------------------------------ INFO:optimus:Processing column 'order_id'... INFO:optimus:frequency() executed in 1.27 sec INFO:optimus:dataset_info() executed in 0.09 sec
--------------------------------------------------------------------------- TemplateNotFound Traceback (most recent call last) <ipython-input-22-4f64cac065ea> in <module> ----> 1 op.profiler.run(a, "order_id", infer=True, relative_error=1) ~\Documents\Optimus\optimus\helpers\decorators.py in timed(*args, **kw) 26 def timed(*args, **kw): 27 start_time = timeit.default_timer() ---> 28 f = method(*args, **kw) 29 _time = round(timeit.default_timer() - start_time, 2) 30 logger.print("{name}() executed in {time} sec".format(name=method.__name__, time=_time)) ~\Documents\Optimus\optimus\profiler\profiler.py in run(self, df, columns, buckets, infer, relative_error, approx_count) 199 # Render template 200 # Create the profiler info header --> 201 html = "" 202 general_template = template_env.get_template("general_info.html") 203 html = html + general_template.render(data=output) ~\Anaconda3\lib\site-packages\jinja2\environment.py in get_template(self, name, parent, globals) 828 if parent is not None: 829 name = self.join_path(name, parent) --> 830 return self._load_template(name, self.make_globals(globals)) 831 832 @internalcode ~\Anaconda3\lib\site-packages\jinja2\environment.py in _load_template(self, name, globals) 802 template.is_up_to_date): 803 return template --> 804 template = self.loader.load(self, name, globals) 805 if self.cache is not None: 806 self.cache[cache_key] = template ~\Anaconda3\lib\site-packages\jinja2\loaders.py in load(self, environment, name, globals) 111 # first we try to get the source for this template together 112 # with the filename and the uptodate function. --> 113 source, filename, uptodate = self.get_source(environment, name) 114 115 # try to load the code from the bytecode cache if there is a ~\Anaconda3\lib\site-packages\jinja2\loaders.py in get_source(self, environment, template) 185 return False 186 return contents, filename, uptodate --> 187 raise TemplateNotFound(template) 188 189 def list_templates(self): TemplateNotFound: /out/general_info.html
df.groupBy("order_id").count().sort("count",ascending=False)
--------------------------------------------------------------------------- NameError Traceback (most recent call last) <ipython-input-1-86d4d0fdcf73> in <module> ----> 1 df.groupBy("order_id").count().sort("count",ascending=False) NameError: name 'df' is not defined