Map Reduce

  • MR API는 많은 양의 기본 셋업 코드가 필요하고,
    장애 대응이 불안정하며 반복적인 디스크 I/O 작업 발생
  • 머신러닝, 스트리밍 등 동적이고 반복적인 컴퓨팅 작업에서 효율 개선을 위해 스파크 개발

Spark

  • DAG의 스케줄러와 질의 최적화 모듈을 통해 병렬 수행
  • 모든 결과는 메모리에 유지되며, 디스크 I/O를 제한적으로 사용
  • 데이터 프레임과 같은 고수준 데이터 추상화 계층 아래 단순한 논리 자료구조 구축
  • 스파크 SQL, 스파크 MLlib, 스파크 정형화 스트리밍 GraphX 등 모듈 지원
  • 저장과 연산을 분리하여 데이터 소스를 메모리에서 처리

Spark Components

  • 어떠한 코드로 작성해도 고도로 경량화된 바이트코드로 변환되어 워커 노드의 JVM에서 실행
  • 스파크 SQL: SQL 계통의 질의를 써서 데이터를 데이터 프레임으로 읽어들이는 기능
  • 스파크 MLlib: 경사 하강법 최적화를 포함한 저수준 ML 기능 포함
  • 스파크 정형화 스트리밍: 카프카 등 실시간 연결 및 반응을 위한 모델
  • GraphX: 그래프 병렬 연산을 수행하기 위한 라이브러리

Spark Architecture

  • 스파크 드라이버: 스파크 이그제큐터를 위한 자원 요청 및 태스크 분배
  • SparkSession: 모든 스파크 연산과 데이터에 대한 통합 연결 채널, 스파크 SQL 질의 가능
  • 클러스터 매니저: 클러스터에서 자원을 관리 및 할당하는 책임
  • 스파크 이그제큐터: 클러스터의 각 워커 노드에서 동작하며 태스크를 실행하는 역할

Spark Application

  • 스파크 애플리케이션의 핵심에는 스파크 드라이버가 있으며, 이 드라이버는 SparkSession 객체를 생성
  • 스파크 잡: 드라이버는 스파크 애플리케이션을 하나 이상의 스파크 잡으로 변환
  • 스파크 스테이지: 스파크 연산을 여러 스테이지로 나뉘어 실행, 최소 실행 단위
  • 스파크 태스크: 개별 CPU 코어에 할당되어 작업

Spark Calculation

  • 트랜스포메이션: 원본 데이터를 수정하지 않고 새로운 데이터 프레임으로 변형
    (select()나 filter() 같은 연산으로 원본 데이터 프레임을 수정하지 않음)
  • 트랜스포메이션은 즉시 계산되지 않고 리니지 형태로 기록,
    액션이 실행될 때 트랜스포메이션끼리 재배열하거나 합쳐서 더 효율적으로 최적화
  • 트랜스포메이션 연산: orderBy(), groupBy(), filter(), select(), join()
  • 액션 연산: show(), take(), count(), collect(), save()
  • 좁은 트랜스포메이션: 하나의 입력 파티션을 연산하여 하나의 결과 파티션을 생성 (filter, contains)
  • 넓은 트랜스포메이션: 다른 파티션으로부터 데이터를 읽어 들여서 합치고 디스크에 작성 (groupBy, orderBy)

Spark RDD

  • 의존성: 어떤 입력을 필요로 하고 현재의 RDD가 어떻게 만들어지는지
  • 파티션: 작업을 나눠 파티션별로 병렬 연산할 수 있는 능력을 부여
  • 연산 함수: RDD에 저장되는 데이터를 Iterator[T] 형태로 변환

스파크 2.x는 고수준 연산을 사용하여 명료함과 단순함을 가짐
예시로 #1과 같은 그룹화는 #2로 간소화할 수 있음

1
2
3
4
5
#1
agesRDD = (dataRDD.
  map(lambda x: (x[0], (x[1], 1))))
  .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
  .map(lambda x: (x[0], x[1][0]/x[1][1]))
1
2
#2
avgDf = dataDf.groupby("name").agg(avg("age"))

DataFrame

  • 스파크는 이전 변경 내역을 보관하여 이를 보존한 채로 칼럼의 이름이나 타입을 변경 가능
  • 스파크의 기본 데이터 타입은 String, Byte, Integer, Float 등 (파이썬 데이터 타입도 지원)
  • 복합 데이터 분석을 위한 Map, Array, Struct, Date 타입 등의 복합 타입 및 정형화 타입 지원

Schema

  • 데이터 프레임을 위해 칼럼 이름과 연관된 데이터 타입을 정의한 것
  • 스파크가 데이터 타입을 추측해야 하는 책임을 덜어주어 이에 대한 별도의 잡을 만드는 것을 방지
  • 데이터가 스키마와 맞지 않는 경우, 조기에 문제를 발견 가능
  • 프로그래밍 스타일 또는 DDL을 사용하여 스키마 정의
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from pyspark.sql.types import *

# 프로그래밍 스타일
schema = StructType([
  StructField("author", StringType(), False),
  StructField("title", StringType(), False),
  StructField("pages", IntegerType(), False)])

# DDL
schema = "author STRING, title STRING, pages INT"

# 스키마로 데이터 프레임 생성
df = spark.createDataFrame(data, schema)

# 데이터 프레임의 스키마 출력
print(df.printSchema())

Column

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
df.select(expr("Hits * 2")) # 표현식을 사용한 계산
df.select(col("Hits") * 2) # 칼럼명을 사용한 계산

# 기존 칼럼을 연결하여 새로운 칼럼을 생성
df
  .withColumn("AuthorsId", (concat(expr("First"), expr("Last"), expr("Id"))))
  .select(col("AuthorsId"))

# 칼럼값에 따라 정렬
df.sort(col("Id").desc)
df.sort($"Id".desc) # $는 칼럼을 Column 타입으로 변환해주는 스파크의 함수

Row

1
2
3
4
5
6
blog_row = Row(6, "Reynold", "2015-03-02")
blog_row[1] # >> 'Reynold' ; 인덱스로 개별 Row에 접근

# Row 객체들을 데이터 프레임으로 변환
rows = [Row("Matei", "CA"), Row("Reynold", "CA")]
df = spark.createDateFrame(rows, ["Authors", "State"]) 

DataFrameReader

  • JSON, CSV 등 다양한 포맷의 데이터 소스에서 데이터를 읽어 데이터 프레임으로 가져옴
  • 동일하게 데이터 프레임을 내보내기 위해 DataFrameWriter 사용
  • DataFrameWriter의 포맷은 기본으로 parquet, 데이터 압축에서는 snapy 사용
1
2
3
4
5
6
7
8
9
from pyspark.sql.types import *

# 데이터 읽기
schema = StructType([StructField(...), ...])
file = "data.csv"
df = spark.read_csv(file, header=True, schema=schema)

# 데이터 쓰기
df.write.format("parquet").save(path)

Projection

  • 필터를 이용해 특정 관계 상태와 매칭되는 행들만 반환
  • filter()where() 메서드로 표현
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from pyspark.sql.functions import *

# 앞에서 5개 행 반환
df.show(5, truncate=False)

# CallType의 unique한 개수 반환
(df
  .select("CallType")
  .where(col("CallType").isNotNull()) # null 타입 제외
  .agg(countDistinct("CallType").alias("DistinctCallTypes"))
  .show())

# 칼럼명 변경
df.withColumnRenamed("Delay", "ResponseDelayedinMins")

# 칼럼 내용 변경
(df
  .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy"))
  .drop("CallDate"))

# 날짜 칼럼 질의
(df
  .select(year("IncidentDate"))
  .distinct()
  .orderBy(year("IncidentDate"))
  .show())

Aggregation

  • count(), min(), max(), sum(), avg() 등 연산
1
2
3
4
5
6
(df
  .select("CallType")
  .groupBy("CallType")
  .count()
  .orderBy("count", ascending=False)
  .show(n=10, truncate=False))

Dataset API

  • 정적 타입 API와 동적 타입 API로 구분
  • 데이터 프레임은 Dataset[Row]으로 표현되며,
    Row는 서로 다른 타입의 값을 저장할 수 있는 포괄적 JVM 객체
  • 데이터세트는 Dataset[T]로 표현되며, 엄격하게 타입이 정해진 JVM 객체의 집합
  • 동적으로 타입이 부여되는 파이썬과 R에서는 데이터프레임, 자바 등에서는 데이터세트 사용
  • 스칼라에서는 케이스 클래스로 데이터세트 정의
1
2
3
4
5
6
case class DeviceIoTData (battery_level: Long, ...)
ds = spark.read.json("devices.json").as[DeviceIoTData]

// 데이터세트에서도 트랜스포메이션 또는 액션 수행 가능
// 람다 함수의 인자는 DeviceIoT의 JVM 객체로 개별 데이터 필드에 접근 가능
ds.filter(d => d.temp > 30 && d.humidity > 70)

Spark SQL

  • 정형화 데이터 관련 작업을 단순화할 수 있도록 추상화
  • 빠른 데이터 탐색을 위한 대화형 스파크 SQL 셸을 제공
  • 표준 데이터베이스의 JDBC/ODBC 커넥터를 통해 외부의 도구들과 연결할 수 있는 중간 역할
  • 최적화된 질의 계획과 JVM을 위한 최적화된 코드 생성

Catalyst Optimizer

  • 연산 쿼리를 실행 계획으로 변환하기 위해
    분석 > 논리적 최적화 > 물리 계획 수립 > 코드 생성 과정을 거침
  • 언어에 상관없이 사용자가 실행한 작업은 동일한 과정을 거쳐 바이트 코드로 변환
  • 파이썬에서 df.explain(True) 함수를 통해 스테이지별 상세 내용 확인 가능

Optimization Flow

  • 분석: 쿼리를 위한 추상 문법 트리 생성
  • 논리적 최적화: 여러 계획들을 수립하고 비용 기반 옵티마이저를 통해 최적화
  • 물리 계획 수립: 논리 계획을 바탕으로 물리적 연산자를 사용해 최적화된 물리 계획 생성
  • 코드 생성: 각 머신에서 실행할 효율적인 자바 바이트 코드를 생성
  • 포괄 코드 생성을 통해 전체 쿼리를 하나의 함수로 합치고 CPU 레지스터 사용을 없앰