# import libraries
from pyspark.sql import SparkSession
# view process
print "http://localhost:4040/jobs/"
http://localhost:4040/jobs/
spark = SparkSession.builder \
.master("local[*]") \
.appName("Advanced SQL") \
.getOrCreate()
# load data
if "df_cars" in locals():
df_cars.unpersist()
if "df_makers" in locals():
df_makers.unpersist()
df_cars = spark.read \
.format("csv") \
.option("header", "true") \
.load("cars.csv")
df_makers = spark.read \
.format("csv") \
.option("header", "true") \
.load("makers.csv")
# store as table
df_cars.cache()
df_makers.cache()
sqlContext.registerDataFrameAsTable(df_cars, "car")
sqlContext.registerDataFrameAsTable(df_makers, "maker")
# view schema and top data
print "# Cars schema"
df_cars.printSchema()
df_cars.show(5)
print "# Makers schema"
df_makers.printSchema()
df_makers.show(5)
# Cars schema root |-- id: string (nullable = true) |-- car_name: string (nullable = true) |-- price: string (nullable = true) |-- maker_id: string (nullable = true) +---+------------+-------+--------+ | id| car_name| price|maker_id| +---+------------+-------+--------+ | 1| X5|5616.38| 2| | 2| Compass| 1837.4| 7| | 3| Excursion|1949.55| 2| | 4| Escalade|8539.64| 3| | 5|Express 3500|2685.09| 4| +---+------------+-------+--------+ only showing top 5 rows # Makers schema root |-- id: string (nullable = true) |-- maker_name: string (nullable = true) |-- years: string (nullable = true) +---+----------+-----+ | id|maker_name|years| +---+----------+-----+ | 1| Porsche| 2011| | 2| Nissan| 2011| | 3| Dodge| 2008| | 4| Cadillac| 2006| | 5|Land Rover| 2011| +---+----------+-----+ only showing top 5 rows
Truy vấn lồng là một câu truy vấn mà ở bên trong nội dung của nó có chứa một câu truy vấn con khác.
Kết quả của câu truy vấn sẽ như là một giá trị của một thuộc tính.
# Với mỗi hãng xe, cho biết tên của hãng và số lượng xe tương ứng
df_sub_queries = sqlContext.sql("""
SELECT maker_name AS Hang_Xe, (
SELECT COUNT(*)
FROM car
WHERE car.maker_id = maker.id
) AS SL_XE
FROM maker
""")
df_sub_queries.show()
+----------+-----+ | Hang_Xe|SL_XE| +----------+-----+ | Porsche| 91| | Nissan| 116| | Dodge| 98| | Cadillac| 104| |Land Rover| 82| | Mazda| 101| | Isuzu| 100| | Hyundai| 109| | Hyundai| 96| | Chevrolet| 103| +----------+-----+
Kết quả của câu truy vấn sẽ xem như là một bảng dữ liệu, do vậy có thể truy vấn từ bảng dữ liệu này.
# Cho biết tên và giá mỗi xe của hãng Nissan
df_sub_queries = sqlContext.sql("""
SELECT T.car_name, T.price AS price_usd
FROM (
SELECT *
FROM car
WHERE maker_id = 2
) AS T
""")
df_sub_queries.show(5)
+---------+---------+ | car_name|price_usd| +---------+---------+ | X5| 5616.38| |Excursion| 1949.55| | Tacoma| 5696.24| | Tahoe| 7247.84| | M-Class| 8541.21| +---------+---------+ only showing top 5 rows
Kết quả của câu truy vấn được sử dụng như một thành phần trong biểu thức điều kiện.
# Cho biết những xe có giá lớn hơn xe có mã = 5
df_sub_queries = sqlContext.sql("""
SELECT car_name, price
FROM car
WHERE price > (
SELECT price
FROM car
WHERE id = 5
)
""")
df_sub_queries.show(5)
+--------+-------+ |car_name| price| +--------+-------+ | X5|5616.38| |Escalade|8539.64| | Virage|6297.87| | RX-8|7033.46| | Caravan|3101.49| +--------+-------+ only showing top 5 rows
Toán tử IN dùng để kiểm tra một giá trị có nằm trong một tập hợp nào đó hay không. Tập hợp đó có thể là kết quả của một câu truy vấn hoặc một tập hợp tường minh
# Cho biết các xe có giá nhỏ hơn 3000 USD
df_sub_queries = sqlContext.sql("""
SELECT car_name, price
FROM car
WHERE id NOT IN (
SELECT id
FROM car
WHERE price > 3000
)
""")
df_sub_queries.show(5)
+------------+-------+ | car_name| price| +------------+-------+ | Compass| 1837.4| | Excursion|1949.55| |Express 3500|2685.09| | Fleetwood|1016.99| | Cabriolet|1185.46| +------------+-------+ only showing top 5 rows
# Tìm xe không phải hãng Nissan
df_sub_queries = sqlContext.sql("""
SELECT car_name
FROM car
WHERE NOT EXISTS (
SELECT *
FROM maker
WHERE car.maker_id = maker.id
AND maker.id = 5
)
""")
df_sub_queries.show(5)
+------------+ | car_name| +------------+ | X5| | Compass| | Excursion| | Escalade| |Express 3500| +------------+ only showing top 5 rows
# Cho biết các xe có giá lớn hơn giá trung bình của hãng xe đó sản xuất
df_sub_queries = sqlContext.sql("""
SELECT car_name, price
FROM car AS car1
WHERE car1.price > (
SELECT AVG(car2.price)
FROM car AS car2
WHERE car2.maker_id = car1.maker_id
)
""")
df_sub_queries.show(5)
+--------+-------+ |car_name| price| +--------+-------+ | X5|5616.38| |Escalade|8539.64| | Virage|6297.87| | RX-8|7033.46| | X3|9361.88| +--------+-------+ only showing top 5 rows
# Cho biết các xe có giá cao nhất
df_sub_queries = sqlContext.sql("""
SELECT car_name, price
FROM car
WHERE price = (
SELECT MAX(price)
FROM car
)
""")
df_sub_queries.show(5)
+--------+-------+ |car_name| price| +--------+-------+ | 100|9986.79| +--------+-------+
# Cho biết các hãng sản xuất nhiều xe nhất
df_sub_queries = sqlContext.sql("""
SELECT maker_id
FROM car
GROUP BY maker_id
HAVING COUNT(*) = (
SELECT MAX(SL_XE)
FROM (
SELECT COUNT(*) AS SL_XE
FROM car
GROUP BY maker_id
)
)
""")
df_sub_queries.show(5)
+--------+ |maker_id| +--------+ | 2| +--------+
# create inventory by joining cars and makers
if "df_inventory" in locals():
df_inventory.unpersist()
df_inventory = sqlContext.sql("""
SELECT car_name, price, maker_name, years
FROM car
JOIN maker
ON car.maker_id = maker.id
""")
df_inventory.show(5)
# store as table
df_inventory.cache()
sqlContext.registerDataFrameAsTable(df_inventory, "inventory")
+------------+-------+----------+-----+ | car_name| price|maker_name|years| +------------+-------+----------+-----+ | X5|5616.38| Nissan| 2011| | Compass| 1837.4| Isuzu| 1998| | Excursion|1949.55| Nissan| 2011| | Escalade|8539.64| Dodge| 2008| |Express 3500|2685.09| Cadillac| 2006| +------------+-------+----------+-----+ only showing top 5 rows
# Tổng hợp tổng giá trị của các xe theo:
# - Tên xe và tên hãng
# - Từng tên xe
# - Từng hãng
# - Tổng giá trị
# Output:
# - Ranger, Land Rover, 7535.8: tổng giá trị xe Ranger, hãng Land Rover là 7535.8
# - Chariot, null, 7867.58: tổng giá trị xe Chariot, hãng bất kỳ là 7867.58
# - null, null, 5396393.689999999: tổng giá trị các xe từ các hãng là 5396393.689999999
# - null, Cadillac, 544548.9600000001: tổng giá trị từ hãng Cadillac là 544548.9600000001
df_compute = sqlContext.sql("""
SELECT car_name, maker_name, SUM(price) AS TotalPrice
FROM inventory
GROUP BY car_name, maker_name WITH CUBE
""")
df_compute.show(10)
+--------+----------+------------------+ |car_name|maker_name| TotalPrice| +--------+----------+------------------+ | Ranger|Land Rover| 7535.8| | Durango| Hyundai| 4750.66| | Outback| Isuzu| 5449.95| | 350Z| Dodge| 5936.84| |3 Series| Porsche| 10210.57| | MR2|Land Rover| 4468.72| | null| Cadillac| 544548.9600000001| | TT| null|7733.4400000000005| | Chariot| null| 7867.58| |E-Series| Chevrolet|11777.220000000001| +--------+----------+------------------+ only showing top 10 rows
# Tổng hợp tổng giá trị của các xe theo:
# - Tên xe và tên hãng
# - Từng tên xe
# - Tổng giá trị
# Output:
# - Ranger, Land Rover, 7535.8: tổng giá trị xe Ranger, hãng Land Rover là 7535.8
# - Chariot, null, 7867.58: tổng giá trị xe Chariot, hãng bất kỳ là 7867.58
# - null, null, 5396393.689999999: tổng giá trị các xe từ các hãng là 5396393.689999999
df_compute = sqlContext.sql("""
SELECT car_name, maker_name, SUM(price) AS TotalPrice
FROM inventory
GROUP BY car_name, maker_name WITH ROLLUP
""")
df_compute.show(10)
+------------+----------+------------------+ | car_name|maker_name| TotalPrice| +------------+----------+------------------+ | Ranger|Land Rover| 7535.8| | Durango| Hyundai| 4750.66| | Outback| Isuzu| 5449.95| | 350Z| Dodge| 5936.84| | 3 Series| Porsche| 10210.57| | MR2|Land Rover| 4468.72| | TT| null|7733.4400000000005| | Chariot| null| 7867.58| | E-Series| Chevrolet|11777.220000000001| |Express 3500| Porsche| 8971.060000000001| +------------+----------+------------------+ only showing top 10 rows