Spark UDF

  • 자신의 기능을 정의할 수 있는 사용자 정의 함수
  • 머신러닝 모델의 내부를 이해하지 않고도 스파크 SQL에서 예측 결과를 쿼리 가능
  • 스파크 SQL이 하위 표현식의 평가 순서를 보장하지 않기 때문에 UDF 내부에서 null 검사
1
2
3
4
5
6
7
8
9
from pyspark.sql.types import LongType

def cubed(s):
    return s * s * s

spark.udf.register("cubed", cubed, LongType())
spark.range(1, 9).createOrReplaceTempView("udf_test")

spark.sql("SELECT id, cubed(id) AS id_cubed FROM udf_test").show()

Pandas UDF

  • 입력과 출력이 모두 판다스 인스턴스인 API 지원
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import pandas as pd

from pyspark.sql.functions ipmort col, pandas_udf
from pyspark.sql.types import LongType

def cubed(a: pd.Series) -> pd.Series:
    return a * a * a

cubed_udf = pandas_udf(cubed, returnType=LongType())

df = spark.range(1, 4)
df.select("id", cubed_udf(col("id"))).show()

Spark Shell

  • $SPARK_HOME 폴더에서 ./bin/spark-sql 명령어 실행
1
spark-sql> CREATE TABLE people (Name STRING, age INT)

JDBC

  • user, password, url, dbtable, query, driver 연결 속성 지정
  • 파티셔닝을 위한 numPartitions, partitionColumn, lowerBound, upperBound 속성

PostgreSQL

1
2
3
4
5
6
7
8
jdbcDF1 = (spark
    .read # .write
    .format("jdbc")
    .opiton("url", "jdbc:postgresql://[DBSERVER]")
    .option("dbtable", "[SCHEMA].[TABLENAME]").
    .option("user", "[USERNAME]")
    .option("password", "[PASSWORD]")
    .load()) # .save()

SQL 고차 함수

중첩된 구조를 개별 행으로 분해

1
2
3
4
SELECT id, collect_list(value = 1) AS values
FROM (SELECT id, EXPLODE(values) AS value
        FROM table) x
GROUP BY id

배열 유형 함수

  • array_distinct(array): 배열 내의 중복을 제거
  • array_intersect(array, array): 중복되지 않은 두 배열의 교차점을 반환
  • array_union(array, array): 중복 항목 없이 두 배열의 결합을 반환
  • array_except(array, array): 배열1에는 존재하지만 배열2에는 존재하지 않는 요소를 반환
  • array_join(array, string): 구분 기호를 사용하여 배열 요소를 연결
  • array_max(array): Null값을 제외한 배열의 최댓값을 반환
  • array_min(array): Null값을 제외한 배열의 최솟값을 반환
  • array_position(array, T): 배열에서 지정된 요소의 첫번째 인덱스를 반환
  • array_remove(array, T): 배열에서 지정된 요소와 동일한 모든 요소를 제거
  • array_overlap(array, array): Null이 아닌 동일한 값이 두 배열에 있을 경우 true를 반환
  • array_sort(array): 배열을 오름차순으로 정렬하고 Null은 맨 끝에 위치
  • concat(array, ...): 문자열, 바이너리, 배열 등을 연결
  • flatten(array<array>): 배열 안의 배열들을 단일 배열로 플랫화
  • array_repeat(T, Int): 지정된 요소가 포함된 배열을 지정한 횟수만큼 반환
  • reverse(array): 문자열의 역순 또는 배열에서 요소의 역순을 반환
  • sequence(T, T), 시작부터 끝을 포함한 일련의 요소를 생성
  • shuffle(array): 주어진 배열의 무작위 순열을 반환
  • slice(array, Int, Int): 배열에서 지정된 시작과 끝 인덱스에 대한 하위 집합을 반환
  • array_zip(array, array, ...): 병합된 구조 배열을 반환
  • element_at(array, Int): 지정된 인덱스에서 지정된 배열의 요소를 반환
  • cardinality(array): 지정된 배열 또는 맵의 크기를 반환

맵 함수

  • map_form_arrays(array, array): 주어진 키/값 배열 쌍에서 맵을 생성하여 반환
  • map_from_entries(array<struct>): 주어진 배열에서 생성된 맵을 반환
  • map_concat(map, ...): 입력된 맵의 결합을 반환
  • element_at(map, K): 주어진 키에 대한 값을 반환하고 없을 경우 Null을 반환
  • cardinality(array): 지정된 배열 또는 맵의 크기를 반환

고차 함수

  • transform(values, value -> lambda expression) 형태의 람다식 사용
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# transform(): 배열의 각 요소에 함수를 적용하여 배열을 생성
spark.sql("""
SELECT celsius,
    transform(celsius, t -> ((t * 9) div 5) + 32) AS fahrenheit
FROM tC
""").show()

# filter(): 입력한 배열의 요소 중 참인 요소만 배열로 반환
spark.sql("""
SELECT celsius,
    filter(celsius, t -> t > 38) AS high
FROM tC
""").show()

# exists(): 입력한 배열의 요소 중 불린 함수를 만족시키면 참을 반환
spark.sql("""
SELECT celsius,
    exists(celsius, t -> t > 38) AS high
FROM tC
""").show()

# reduce(): 요소를 병합하고 배열의 요소를 단일값으로 줄임
spark.sql("""
SELECT celsius,
    reduce(celsius, 0,
        (t, acc) -> t + acc,
        acc -> (acc div size(celsius) * 9 div 5) + 32
    ) AS avgFahrenheit
FROM tC
""").show()

SQL 작업

Union

1
2
3
4
5
bar = departureDelays.union(foo)
bar.createOrReplaceTempView("bar")

bar.filter(expr("""origin == 'SEA' AND destination == 'SFO'
AND date LIKE '01010%' AND delay > 0""")).show()

JOIN

1
2
3
4
foo.join(
    airports,
    airports.IATA == foo.origin
).select("City", "State", "date", "delay", "distance", "destination").show()

Window

  • rank(), dense_rank(), percent_rank(), ntile(), row_number()
  • cume_dist(), first_value(), last_value(), lag(), lead()
1
2
3
4
5
6
CREATE TABLE departureDelaysWindow AS
SELECT origin, destination, SUM(delay) AS TotalDelays
    FROM departureDelays
    WHERE origin IN ('SEA', 'SFO', 'JFK')
        AND destination IN ('SEA', 'SFO', 'JFK', 'DEN', 'ORD', 'LAX', 'ATL')
    GROUP BY origin, destination;
1
2
3
4
5
6
7
8
9
spark.sql("""
SELECT origin, destination, TotalDelays, rank
    FROM (
        SELECT origin, destination, TotalDelays,
            dense_rank() OVER (PARTITION BY origin ORDER BY TotalDelays DESC) AS rank
            FROM departureDelaysWindow
    ) t
    WHERE rank <= 3
""").show()

Modification

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from pyspark.sql.functions import *

# 열 추가
foo2 = (foo.withColumn(
    "status",
    expr("CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END")
    ))

# 열 삭제
foo3 = foo2.drop("delay")

# 칼럼명 바꾸기
foo4 = foo3.withColumnRenamed("status", "flight_status")

# 피벗
SELECT * FROM (
    SELECT destination, CAST(SUBSTRING(date, 0, 2) AS int) AS month, delay
    FROM departureDelays WHERE origin = 'SFA
)
PIVOT (
    CAST AVG(delay) AS DECIMAL(4, 2) AS AvgDelay, MAX(delay) AS MaxDelay
    FOR month IN (1 JAN, 2 FEB)
)