#!/usr/bin/env python # coding: utf-8 # # Truy vấn nâng cao # In[1]: # import libraries from pyspark.sql import SparkSession # view process print "http://localhost:4040/jobs/" # In[2]: spark = SparkSession.builder \ .master("local[*]") \ .appName("Advanced SQL") \ .getOrCreate() # In[3]: # load data if "df_cars" in locals(): df_cars.unpersist() if "df_makers" in locals(): df_makers.unpersist() df_cars = spark.read \ .format("csv") \ .option("header", "true") \ .load("cars.csv") df_makers = spark.read \ .format("csv") \ .option("header", "true") \ .load("makers.csv") # store as table df_cars.cache() df_makers.cache() sqlContext.registerDataFrameAsTable(df_cars, "car") sqlContext.registerDataFrameAsTable(df_makers, "maker") # In[4]: # view schema and top data print "# Cars schema" df_cars.printSchema() df_cars.show(5) print "# Makers schema" df_makers.printSchema() df_makers.show(5) # # Truy vấn lồng # Truy vấn lồng là một câu truy vấn mà ở bên trong nội dung của nó có chứa một câu truy vấn con khác. # - Truy vấn lồng phân cấp: Khi nội dung của câu truy vấn con độc lập với câu truy vấn cha. # - Truy vấn lồng tương quan: Khi nội dung của câu truy vấn con phụ thuộc vào câu truy vấn cha. # # ## Đặt tại mệnh đề SELECT # Kết quả của câu truy vấn sẽ như là một giá trị của một thuộc tính. # In[5]: # Với mỗi hãng xe, cho biết tên của hãng và số lượng xe tương ứng df_sub_queries = sqlContext.sql(""" SELECT maker_name AS Hang_Xe, ( SELECT COUNT(*) FROM car WHERE car.maker_id = maker.id ) AS SL_XE FROM maker """) df_sub_queries.show() # ## Đặt tại mệnh đề FROM: # Kết quả của câu truy vấn sẽ xem như là một bảng dữ liệu, do vậy có thể truy vấn từ bảng dữ liệu này. # In[6]: # Cho biết tên và giá mỗi xe của hãng Nissan df_sub_queries = sqlContext.sql(""" SELECT T.car_name, T.price AS price_usd FROM ( SELECT * FROM car WHERE maker_id = 2 ) AS T """) df_sub_queries.show(5) # ## Đặt tại mệnh đề WHERE: # Kết quả của câu truy vấn được sử dụng như một thành phần trong biểu thức điều kiện. # In[7]: # Cho biết những xe có giá lớn hơn xe có mã = 5 df_sub_queries = sqlContext.sql(""" SELECT car_name, price FROM car WHERE price > ( SELECT price FROM car WHERE id = 5 ) """) df_sub_queries.show(5) # ## Truy vấn lồng phân cấp với toán tử IN # Toán tử IN dùng để kiểm tra một giá trị có nằm trong một tập hợp nào đó hay không. Tập hợp đó có thể là kết quả của một câu truy vấn hoặc một tập hợp tường minh # In[8]: # Cho biết các xe có giá nhỏ hơn 3000 USD df_sub_queries = sqlContext.sql(""" SELECT car_name, price FROM car WHERE id NOT IN ( SELECT id FROM car WHERE price > 3000 ) """) df_sub_queries.show(5) # ## Truy vấn lồng tương quan với EXISTS # In[9]: # Tìm xe không phải hãng Nissan df_sub_queries = sqlContext.sql(""" SELECT car_name FROM car WHERE NOT EXISTS ( SELECT * FROM maker WHERE car.maker_id = maker.id AND maker.id = 5 ) """) df_sub_queries.show(5) # ## Ví dụ khác về truy vấn lồng tương quan # In[10]: # Cho biết các xe có giá lớn hơn giá trung bình của hãng xe đó sản xuất df_sub_queries = sqlContext.sql(""" SELECT car_name, price FROM car AS car1 WHERE car1.price > ( SELECT AVG(car2.price) FROM car AS car2 WHERE car2.maker_id = car1.maker_id ) """) df_sub_queries.show(5) # In[11]: # Cho biết các xe có giá cao nhất df_sub_queries = sqlContext.sql(""" SELECT car_name, price FROM car WHERE price = ( SELECT MAX(price) FROM car ) """) df_sub_queries.show(5) # In[12]: # Cho biết các hãng sản xuất nhiều xe nhất df_sub_queries = sqlContext.sql(""" SELECT maker_id FROM car GROUP BY maker_id HAVING COUNT(*) = ( SELECT MAX(SL_XE) FROM ( SELECT COUNT(*) AS SL_XE FROM car GROUP BY maker_id ) ) """) df_sub_queries.show(5) # # Truy vấn khác # In[13]: # create inventory by joining cars and makers if "df_inventory" in locals(): df_inventory.unpersist() df_inventory = sqlContext.sql(""" SELECT car_name, price, maker_name, years FROM car JOIN maker ON car.maker_id = maker.id """) df_inventory.show(5) # store as table df_inventory.cache() sqlContext.registerDataFrameAsTable(df_inventory, "inventory") # ## Tổng hợp sử dụng CUBE # In[14]: # Tổng hợp tổng giá trị của các xe theo: # - Tên xe và tên hãng # - Từng tên xe # - Từng hãng # - Tổng giá trị # Output: # - Ranger, Land Rover, 7535.8: tổng giá trị xe Ranger, hãng Land Rover là 7535.8 # - Chariot, null, 7867.58: tổng giá trị xe Chariot, hãng bất kỳ là 7867.58 # - null, null, 5396393.689999999: tổng giá trị các xe từ các hãng là 5396393.689999999 # - null, Cadillac, 544548.9600000001: tổng giá trị từ hãng Cadillac là 544548.9600000001 df_compute = sqlContext.sql(""" SELECT car_name, maker_name, SUM(price) AS TotalPrice FROM inventory GROUP BY car_name, maker_name WITH CUBE """) df_compute.show(10) # ## Tổng hợp sử dụng ROLLUP # In[15]: # Tổng hợp tổng giá trị của các xe theo: # - Tên xe và tên hãng # - Từng tên xe # - Tổng giá trị # Output: # - Ranger, Land Rover, 7535.8: tổng giá trị xe Ranger, hãng Land Rover là 7535.8 # - Chariot, null, 7867.58: tổng giá trị xe Chariot, hãng bất kỳ là 7867.58 # - null, null, 5396393.689999999: tổng giá trị các xe từ các hãng là 5396393.689999999 df_compute = sqlContext.sql(""" SELECT car_name, maker_name, SUM(price) AS TotalPrice FROM inventory GROUP BY car_name, maker_name WITH ROLLUP """) df_compute.show(10) # In[ ]: