2023-04-02 Log
Spark UDF #
- 자신의 기능을 정의할 수 있는 사용자 정의 함수
- 머신러닝 모델의 내부를 이해하지 않고도 스파크 SQL에서 예측 결과를 쿼리 가능
- 스파크 SQL이 하위 표현식의 평가 순서를 보장하지 않기 때문에 UDF 내부에서 null 검사
python
from pyspark.sql.types import LongType
def cubed(s):
return s * s * s
spark.udf.register("cubed", cubed, LongType())
spark.range(1, 9).createOrReplaceTempView("udf_test")
spark.sql("SELECT id, cubed(id) AS id_cubed FROM udf_test").show()Pandas UDF #
- 입력과 출력이 모두 판다스 인스턴스인 API 지원
python
import pandas as pd
from pyspark.sql.functions ipmort col, pandas_udf
from pyspark.sql.types import LongType
def cubed(a: pd.Series) -> pd.Series:
return a * a * a
cubed_udf = pandas_udf(cubed, returnType=LongType())
df = spark.range(1, 4)
df.select("id", cubed_udf(col("id"))).show()Spark Shell #
$SPARK_HOME폴더에서./bin/spark-sql명령어 실행
bash
spark-sql> CREATE TABLE people (Name STRING, age INT)JDBC #
user, password, url, dbtable, query, driver연결 속성 지정- 파티셔닝을 위한
numPartitions, partitionColumn, lowerBound, upperBound속성
PostgreSQL #
python
jdbcDF1 = (spark
.read # .write
.format("jdbc")
.opiton("url", "jdbc:postgresql://[DBSERVER]")
.option("dbtable", "[SCHEMA].[TABLENAME]").
.option("user", "[USERNAME]")
.option("password", "[PASSWORD]")
.load()) # .save()SQL 고차 함수 #
중첩된 구조를 개별 행으로 분해
sql
SELECT id, collect_list(value = 1) AS values
FROM (SELECT id, EXPLODE(values) AS value
FROM table) x
GROUP BY id배열 유형 함수 #
array_distinct(array): 배열 내의 중복을 제거array_intersect(array, array): 중복되지 않은 두 배열의 교차점을 반환array_union(array, array): 중복 항목 없이 두 배열의 결합을 반환array_except(array, array): 배열1에는 존재하지만 배열2에는 존재하지 않는 요소를 반환array_join(array, string): 구분 기호를 사용하여 배열 요소를 연결array_max(array): Null값을 제외한 배열의 최댓값을 반환array_min(array): Null값을 제외한 배열의 최솟값을 반환array_position(array, T): 배열에서 지정된 요소의 첫번째 인덱스를 반환array_remove(array, T): 배열에서 지정된 요소와 동일한 모든 요소를 제거array_overlap(array, array): Null이 아닌 동일한 값이 두 배열에 있을 경우 true를 반환array_sort(array): 배열을 오름차순으로 정렬하고 Null은 맨 끝에 위치concat(array, ...): 문자열, 바이너리, 배열 등을 연결flatten(array<array>): 배열 안의 배열들을 단일 배열로 플랫화array_repeat(T, Int): 지정된 요소가 포함된 배열을 지정한 횟수만큼 반환reverse(array): 문자열의 역순 또는 배열에서 요소의 역순을 반환sequence(T, T), 시작부터 끝을 포함한 일련의 요소를 생성shuffle(array): 주어진 배열의 무작위 순열을 반환slice(array, Int, Int): 배열에서 지정된 시작과 끝 인덱스에 대한 하위 집합을 반환array_zip(array, array, ...): 병합된 구조 배열을 반환element_at(array, Int): 지정된 인덱스에서 지정된 배열의 요소를 반환cardinality(array): 지정된 배열 또는 맵의 크기를 반환
맵 함수 #
map_form_arrays(array, array): 주어진 키/값 배열 쌍에서 맵을 생성하여 반환map_from_entries(array<struct>): 주어진 배열에서 생성된 맵을 반환map_concat(map, ...): 입력된 맵의 결합을 반환element_at(map, K): 주어진 키에 대한 값을 반환하고 없을 경우 Null을 반환cardinality(array): 지정된 배열 또는 맵의 크기를 반환
고차 함수 #
transform(values, value -> lambda expression)형태의 람다식 사용
python
# transform(): 배열의 각 요소에 함수를 적용하여 배열을 생성
spark.sql("""
SELECT celsius,
transform(celsius, t -> ((t * 9) div 5) + 32) AS fahrenheit
FROM tC
""").show()
# filter(): 입력한 배열의 요소 중 참인 요소만 배열로 반환
spark.sql("""
SELECT celsius,
filter(celsius, t -> t > 38) AS high
FROM tC
""").show()
# exists(): 입력한 배열의 요소 중 불린 함수를 만족시키면 참을 반환
spark.sql("""
SELECT celsius,
exists(celsius, t -> t > 38) AS high
FROM tC
""").show()
# reduce(): 요소를 병합하고 배열의 요소를 단일값으로 줄임
spark.sql("""
SELECT celsius,
reduce(celsius, 0,
(t, acc) -> t + acc,
acc -> (acc div size(celsius) * 9 div 5) + 32
) AS avgFahrenheit
FROM tC
""").show()SQL 작업 #
Union #
python
bar = departureDelays.union(foo)
bar.createOrReplaceTempView("bar")
bar.filter(expr("""origin == 'SEA' AND destination == 'SFO'
AND date LIKE '01010%' AND delay > 0""")).show()JOIN #
python
foo.join(
airports,
airports.IATA == foo.origin
).select("City", "State", "date", "delay", "distance", "destination").show()Window #
rank(),dense_rank(),percent_rank(),ntile(),row_number()cume_dist(),first_value(),last_value(),lag(),lead()
sql
CREATE TABLE departureDelaysWindow AS
SELECT origin, destination, SUM(delay) AS TotalDelays
FROM departureDelays
WHERE origin IN ('SEA', 'SFO', 'JFK')
AND destination IN ('SEA', 'SFO', 'JFK', 'DEN', 'ORD', 'LAX', 'ATL')
GROUP BY origin, destination;
python
spark.sql("""
SELECT origin, destination, TotalDelays, rank
FROM (
SELECT origin, destination, TotalDelays,
dense_rank() OVER (PARTITION BY origin ORDER BY TotalDelays DESC) AS rank
FROM departureDelaysWindow
) t
WHERE rank <= 3
""").show()Modification #
python
from pyspark.sql.functions import *
# 열 추가
foo2 = (foo.withColumn(
"status",
expr("CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END")
))
# 열 삭제
foo3 = foo2.drop("delay")
# 칼럼명 바꾸기
foo4 = foo3.withColumnRenamed("status", "flight_status")
# 피벗
SELECT * FROM (
SELECT destination, CAST(SUBSTRING(date, 0, 2) AS int) AS month, delay
FROM departureDelays WHERE origin = 'SFA
)
PIVOT (
CAST AVG(delay) AS DECIMAL(4, 2) AS AvgDelay, MAX(delay) AS MaxDelay
FOR month IN (1 JAN, 2 FEB)
)