2023-03-25 Log
Spark SQL #
- 데이터 소스와 JDBC/ODBC 커넥터 또는 스파크 애플리케이션 사이를 연결
SparkSession의sql()함수를 통해 SQL 쿼리를 실행
python
# 임시뷰 생성
df = (spark.read.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load(csv_file))
df.createOrReplaceTempView("us_delay_flights_tbl")
# 임시뷰를 기반으로 쿼리
spark.sql("""SELECT distance, origin, destination
FROM us_delay_flights_tbl WHERE distance > 1000
ORDER BY distance DESC""").show(10)SQL Table #
- 관리형 테이블: 메타데이터와 파일 저장소의 데이터를 모두 관리
- 비관리형 테이블: 메타데이터만 관리하고 외부 데이터 소스에서 데이터를 직접 관리
- 관리형 테이블에서
DROP TABLE과 같은 SQL 명령은 실제 데이터를 삭제
python
# 관리형 테이블 생성
spark.sql("CREATE DATABASE learn_spark_db")
spark.sql("USE learn_spark_db")
spark.sql("""CREATE TABLE managed_us_delay_flights_tbl (
data STRING, delay INT, distance INT, origin STRING, destination STRING)""")
# 비관리형 테이블 생성
spark.sql("""CREATE TABLE us_delay_flights_tbl (
date STRING, delay INT, distance INT, origin STRING, destination STRING)
USING csv OPTIONS (PATH 'data.csv')""")SQL View #
- 기존 테이블을 토대로 뷰를 만들 수 있으며, 스파크 애플리케이션이 종료되면 사라짐
- 임시 뷰: 스파크 애플리케이션 내 단일 SparkSession에 연결
- 전역 임시 뷰: 스파크 애플리케이션 내 여러 SparkSession을 만들 수 있음
sql
-- SQL 예제
CREATE OR REPLACE GLOBAL TEMP VEIW us_origin_airport_SFO_global_tmp_view AS
SELECT date, delay, origin, destination FROM us_delay_flights_tbl WHERE
origin = 'SFO'
python
# 파이썬 예제
df_sfo = spark.sql("""SELECT date, delay, origin, destination
FROM us_delay_flights_tbl WHERE origin = 'SFO'""")
df_sfo.createOrReplaceGlobalTempView("us_origin_airport_SFO_global_tmp_view")Data Source #
DataFrameReader #
- 아래와 같이 권장되는 사용 패턴이 존재
DataFrameReader.format(args).option("Key", "value").schema(args).loads() format(): “parquet”, “csv”, “json” 등이 가능하며 기본적으로는 “parquet”option(): 키/값 쌍을 지정하며 기본 모드로 PERMISSIVE 적용schema(): 스키마를 유추할 수 있는 DDL 문자열 또는 StructType 제공load(): 데이터 소스의 경로이며,option()에 지정된 경우 비워둘 수 있음
DataFrameWriter #
- 아래와 같이 권장되는 사용 패턴이 존재
DataFrameWriter.format(args).option(args).bucketBy(args).partitionBy(args).save(path) format(): “parquet”, “csv”, “json” 등이 가능하며 기본적으로는 “parquet”option(): 키/값 쌍을 지정하며 기본 모드 옵션은 error 또는 errorifexistsbucketBy(): 버킷 개수 및 버킷 기준 칼럼명save(): 데이터 소스의 경로saveAsTable(): 저장할 테이블
Parquet #
- 다양한 I/O 최적화를 제공하는 오픈소스 칼럼 기반 파일 형식
- 파케이 파일은 데이터 파일, 메타데이터, 여러 압축 파일 및 일부 상태 파일이 포함
JSON #
- 단일 라인 모드와 다중 라인 모드가 있고, 다중 라인 모드는 multiline을 true로 설정
compression,dateFormat,multiline,allowUnquoted등 옵션 사용 가능
CSV #
- 기본적으로 쉼표로 각 데이터를 구분하며, 다른 구분 기호로 필드를 분리할 수 있음
inferSchema,sep,escape,header등 옵션 사용 가능
Avro #
- 스파크 2.4에 내장된 데이터 소스로 소개된 형식으로 카프카에서 메시지를 직렬화할 때 사용
- JSON에 대한 직접 매핑, 속도와 효율성, 다양한 언어에서 사용할 수 있는 바인딩 등 이점 제공
avroSchema,recordName,recordNamespace,ignoreExtension등 옵션 사용 가능
ORC #
- 스파크 2.x는 벡터화된 ORC 리더를 지원
- 벡터화된 리더는 행 블록(1024개 단위)을 읽어 작업을 간소화하고
검색, 필터, 집계, 조인과 같은 집중적인 작업에 대한 CPU 사용량 줄임
Image #
- 딥러닝 및 머신러닝 프레임워크를 지원하기 위해 이미지 파일을 도입
bash
root
|-- image: struct (nullable = true)
| |-- origin: string (nullable = true)
| |-- height: integer (nullable = true)
| |-- width: integer (nullable = true)
| |-- nChannels: integer (nullable = true)
| |-- mode: integer (nullable = true)
| └-- data: binary (nullable = true)
└-- label: integer (nullable = true)Binary File #
- 아래와 같은 열이 있는 데이터 프레임 생성
- path: StringType
- modificationTime: TimestampType
- length: LongType
- content: BinaryType
recursiveFileLookup이 “true"로 설정된 경우 label 컬럼이 없음