Spark SQL

  • 데이터 소스와 JDBC/ODBC 커넥터 또는 스파크 애플리케이션 사이를 연결
  • SparkSessionsql() 함수를 통해 SQL 쿼리를 실행
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# 임시뷰 생성
df = (spark.read.format("csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .load(csv_file))
df.createOrReplaceTempView("us_delay_flights_tbl")

# 임시뷰를 기반으로 쿼리
spark.sql("""SELECT distance, origin, destination
FROM us_delay_flights_tbl WHERE distance > 1000
ORDER BY distance DESC""").show(10)

SQL Table

  • 관리형 테이블: 메타데이터와 파일 저장소의 데이터를 모두 관리
  • 비관리형 테이블: 메타데이터만 관리하고 외부 데이터 소스에서 데이터를 직접 관리
  • 관리형 테이블에서 DROP TABLE과 같은 SQL 명령은 실제 데이터를 삭제
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# 관리형 테이블 생성
spark.sql("CREATE DATABASE learn_spark_db")
spark.sql("USE learn_spark_db")
spark.sql("""CREATE TABLE managed_us_delay_flights_tbl (
    data STRING, delay INT, distance INT, origin STRING, destination STRING)""")

# 비관리형 테이블 생성
spark.sql("""CREATE TABLE us_delay_flights_tbl (
    date STRING, delay INT, distance INT, origin STRING, destination STRING)
    USING csv OPTIONS (PATH 'data.csv')""")

SQL View

  • 기존 테이블을 토대로 뷰를 만들 수 있으며, 스파크 애플리케이션이 종료되면 사라짐
  • 임시 뷰: 스파크 애플리케이션 내 단일 SparkSession에 연결
  • 전역 임시 뷰: 스파크 애플리케이션 내 여러 SparkSession을 만들 수 있음
1
2
3
4
-- SQL 예제
CREATE OR REPLACE GLOBAL TEMP VEIW us_origin_airport_SFO_global_tmp_view AS
    SELECT date, delay, origin, destination FROM us_delay_flights_tbl WHERE
    origin = 'SFO'
1
2
3
4
# 파이썬 예제
df_sfo = spark.sql("""SELECT date, delay, origin, destination
FROM us_delay_flights_tbl WHERE origin = 'SFO'""")
df_sfo.createOrReplaceGlobalTempView("us_origin_airport_SFO_global_tmp_view")

Data Source

DataFrameReader

  • 아래와 같이 권장되는 사용 패턴이 존재
    DataFrameReader.format(args).option("Key", "value").schema(args).loads()
  • format(): “parquet”, “csv”, “json” 등이 가능하며 기본적으로는 “parquet”
  • option(): 키/값 쌍을 지정하며 기본 모드로 PERMISSIVE 적용
  • schema(): 스키마를 유추할 수 있는 DDL 문자열 또는 StructType 제공
  • load(): 데이터 소스의 경로이며, option()에 지정된 경우 비워둘 수 있음

DataFrameWriter

  • 아래와 같이 권장되는 사용 패턴이 존재
    DataFrameWriter.format(args).option(args).bucketBy(args).partitionBy(args).save(path)
  • format(): “parquet”, “csv”, “json” 등이 가능하며 기본적으로는 “parquet”
  • option(): 키/값 쌍을 지정하며 기본 모드 옵션은 error 또는 errorifexists
  • bucketBy(): 버킷 개수 및 버킷 기준 칼럼명
  • save(): 데이터 소스의 경로
  • saveAsTable(): 저장할 테이블

Parquet

  • 다양한 I/O 최적화를 제공하는 오픈소스 칼럼 기반 파일 형식
  • 파케이 파일은 데이터 파일, 메타데이터, 여러 압축 파일 및 일부 상태 파일이 포함

JSON

  • 단일 라인 모드와 다중 라인 모드가 있고, 다중 라인 모드는 multiline을 true로 설정
  • compression, dateFormat, multiline, allowUnquoted 등 옵션 사용 가능

CSV

  • 기본적으로 쉼표로 각 데이터를 구분하며, 다른 구분 기호로 필드를 분리할 수 있음
  • inferSchema, sep, escape, header 등 옵션 사용 가능

Avro

  • 스파크 2.4에 내장된 데이터 소스로 소개된 형식으로 카프카에서 메시지를 직렬화할 때 사용
  • JSON에 대한 직접 매핑, 속도와 효율성, 다양한 언어에서 사용할 수 있는 바인딩 등 이점 제공
  • avroSchema, recordName, recordNamespace, ignoreExtension 등 옵션 사용 가능

ORC

  • 스파크 2.x는 벡터화된 ORC 리더를 지원
  • 벡터화된 리더는 행 블록(1024개 단위)을 읽어 작업을 간소화하고
    검색, 필터, 집계, 조인과 같은 집중적인 작업에 대한 CPU 사용량 줄임

Image

  • 딥러닝 및 머신러닝 프레임워크를 지원하기 위해 이미지 파일을 도입
1
2
3
4
5
6
7
8
9
root
|-- image: struct (nullable = true)
|   |-- origin: string (nullable = true)
|   |-- height: integer (nullable = true)
|   |-- width: integer (nullable = true)
|   |-- nChannels: integer (nullable = true)
|   |-- mode: integer (nullable = true)
|   └-- data: binary (nullable = true)
└-- label: integer (nullable = true)

Binary File

  • 아래와 같은 열이 있는 데이터 프레임 생성
  • path: StringType
  • modificationTime: TimestampType
  • length: LongType
  • content: BinaryType
  • recursiveFileLookup이 “true"로 설정된 경우 label 컬럼이 없음