Map Reduce#
- MR API는 많은 양의 기본 셋업 코드가 필요하고,
장애 대응이 불안정하며 반복적인 디스크 I/O 작업 발생 - 머신러닝, 스트리밍 등 동적이고 반복적인 컴퓨팅 작업에서 효율 개선을 위해 스파크 개발
Spark#
- DAG의 스케줄러와 질의 최적화 모듈을 통해 병렬 수행
- 모든 결과는 메모리에 유지되며, 디스크 I/O를 제한적으로 사용
- 데이터 프레임과 같은 고수준 데이터 추상화 계층 아래 단순한 논리 자료구조 구축
- 스파크 SQL, 스파크 MLlib, 스파크 정형화 스트리밍 GraphX 등 모듈 지원
- 저장과 연산을 분리하여 데이터 소스를 메모리에서 처리
Spark Components#
- 어떠한 코드로 작성해도 고도로 경량화된 바이트코드로 변환되어 워커 노드의 JVM에서 실행
- 스파크 SQL: SQL 계통의 질의를 써서 데이터를 데이터 프레임으로 읽어들이는 기능
- 스파크 MLlib: 경사 하강법 최적화를 포함한 저수준 ML 기능 포함
- 스파크 정형화 스트리밍: 카프카 등 실시간 연결 및 반응을 위한 모델
- GraphX: 그래프 병렬 연산을 수행하기 위한 라이브러리
Spark Architecture#
- 스파크 드라이버: 스파크 이그제큐터를 위한 자원 요청 및 태스크 분배
- SparkSession: 모든 스파크 연산과 데이터에 대한 통합 연결 채널, 스파크 SQL 질의 가능
- 클러스터 매니저: 클러스터에서 자원을 관리 및 할당하는 책임
- 스파크 이그제큐터: 클러스터의 각 워커 노드에서 동작하며 태스크를 실행하는 역할
Spark Application#
- 스파크 애플리케이션의 핵심에는 스파크 드라이버가 있으며, 이 드라이버는 SparkSession 객체를 생성
- 스파크 잡: 드라이버는 스파크 애플리케이션을 하나 이상의 스파크 잡으로 변환
- 스파크 스테이지: 스파크 연산을 여러 스테이지로 나뉘어 실행, 최소 실행 단위
- 스파크 태스크: 개별 CPU 코어에 할당되어 작업
Spark Calculation#
- 트랜스포메이션: 원본 데이터를 수정하지 않고 새로운 데이터 프레임으로 변형
(select()나 filter() 같은 연산으로 원본 데이터 프레임을 수정하지 않음) - 트랜스포메이션은 즉시 계산되지 않고 리니지 형태로 기록,
액션이 실행될 때 트랜스포메이션끼리 재배열하거나 합쳐서 더 효율적으로 최적화 - 트랜스포메이션 연산: orderBy(), groupBy(), filter(), select(), join()
- 액션 연산: show(), take(), count(), collect(), save()
- 좁은 트랜스포메이션: 하나의 입력 파티션을 연산하여 하나의 결과 파티션을 생성 (filter, contains)
- 넓은 트랜스포메이션: 다른 파티션으로부터 데이터를 읽어 들여서 합치고 디스크에 작성 (groupBy, orderBy)
Spark RDD#
- 의존성: 어떤 입력을 필요로 하고 현재의 RDD가 어떻게 만들어지는지
- 파티션: 작업을 나눠 파티션별로 병렬 연산할 수 있는 능력을 부여
- 연산 함수: RDD에 저장되는 데이터를 Iterator[T] 형태로 변환
스파크 2.x는 고수준 연산을 사용하여 명료함과 단순함을 가짐
예시로 #1과 같은 그룹화는 #2로 간소화할 수 있음
1
2
3
4
5
| #1
agesRDD = (dataRDD.
map(lambda x: (x[0], (x[1], 1))))
.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
.map(lambda x: (x[0], x[1][0]/x[1][1]))
|
1
2
| #2
avgDf = dataDf.groupby("name").agg(avg("age"))
|
DataFrame#
- 스파크는 이전 변경 내역을 보관하여 이를 보존한 채로 칼럼의 이름이나 타입을 변경 가능
- 스파크의 기본 데이터 타입은 String, Byte, Integer, Float 등 (파이썬 데이터 타입도 지원)
- 복합 데이터 분석을 위한 Map, Array, Struct, Date 타입 등의 복합 타입 및 정형화 타입 지원
Schema#
- 데이터 프레임을 위해 칼럼 이름과 연관된 데이터 타입을 정의한 것
- 스파크가 데이터 타입을 추측해야 하는 책임을 덜어주어 이에 대한 별도의 잡을 만드는 것을 방지
- 데이터가 스키마와 맞지 않는 경우, 조기에 문제를 발견 가능
- 프로그래밍 스타일 또는 DDL을 사용하여 스키마 정의
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| from pyspark.sql.types import *
# 프로그래밍 스타일
schema = StructType([
StructField("author", StringType(), False),
StructField("title", StringType(), False),
StructField("pages", IntegerType(), False)])
# DDL
schema = "author STRING, title STRING, pages INT"
# 스키마로 데이터 프레임 생성
df = spark.createDataFrame(data, schema)
# 데이터 프레임의 스키마 출력
print(df.printSchema())
|
Column#
1
2
3
4
5
6
7
8
9
10
11
| df.select(expr("Hits * 2")) # 표현식을 사용한 계산
df.select(col("Hits") * 2) # 칼럼명을 사용한 계산
# 기존 칼럼을 연결하여 새로운 칼럼을 생성
df
.withColumn("AuthorsId", (concat(expr("First"), expr("Last"), expr("Id"))))
.select(col("AuthorsId"))
# 칼럼값에 따라 정렬
df.sort(col("Id").desc)
df.sort($"Id".desc) # $는 칼럼을 Column 타입으로 변환해주는 스파크의 함수
|
Row#
1
2
3
4
5
6
| blog_row = Row(6, "Reynold", "2015-03-02")
blog_row[1] # >> 'Reynold' ; 인덱스로 개별 Row에 접근
# Row 객체들을 데이터 프레임으로 변환
rows = [Row("Matei", "CA"), Row("Reynold", "CA")]
df = spark.createDateFrame(rows, ["Authors", "State"])
|
DataFrameReader#
- JSON, CSV 등 다양한 포맷의 데이터 소스에서 데이터를 읽어 데이터 프레임으로 가져옴
- 동일하게 데이터 프레임을 내보내기 위해
DataFrameWriter
사용 DataFrameWriter
의 포맷은 기본으로 parquet
, 데이터 압축에서는 snapy
사용
1
2
3
4
5
6
7
8
9
| from pyspark.sql.types import *
# 데이터 읽기
schema = StructType([StructField(...), ...])
file = "data.csv"
df = spark.read_csv(file, header=True, schema=schema)
# 데이터 쓰기
df.write.format("parquet").save(path)
|
Projection#
- 필터를 이용해 특정 관계 상태와 매칭되는 행들만 반환
filter()
나 where()
메서드로 표현
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| from pyspark.sql.functions import *
# 앞에서 5개 행 반환
df.show(5, truncate=False)
# CallType의 unique한 개수 반환
(df
.select("CallType")
.where(col("CallType").isNotNull()) # null 타입 제외
.agg(countDistinct("CallType").alias("DistinctCallTypes"))
.show())
# 칼럼명 변경
df.withColumnRenamed("Delay", "ResponseDelayedinMins")
# 칼럼 내용 변경
(df
.withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy"))
.drop("CallDate"))
# 날짜 칼럼 질의
(df
.select(year("IncidentDate"))
.distinct()
.orderBy(year("IncidentDate"))
.show())
|
Aggregation#
count()
, min()
, max()
, sum()
, avg()
등 연산
1
2
3
4
5
6
| (df
.select("CallType")
.groupBy("CallType")
.count()
.orderBy("count", ascending=False)
.show(n=10, truncate=False))
|
Dataset API#
- 정적 타입 API와 동적 타입 API로 구분
- 데이터 프레임은
Dataset[Row]
으로 표현되며,
Row는 서로 다른 타입의 값을 저장할 수 있는 포괄적 JVM 객체 - 데이터세트는
Dataset[T]
로 표현되며, 엄격하게 타입이 정해진 JVM 객체의 집합 - 동적으로 타입이 부여되는 파이썬과 R에서는 데이터프레임, 자바 등에서는 데이터세트 사용
- 스칼라에서는 케이스 클래스로 데이터세트 정의
1
2
3
4
5
6
| case class DeviceIoTData (battery_level: Long, ...)
ds = spark.read.json("devices.json").as[DeviceIoTData]
// 데이터세트에서도 트랜스포메이션 또는 액션 수행 가능
// 람다 함수의 인자는 DeviceIoT의 JVM 객체로 개별 데이터 필드에 접근 가능
ds.filter(d => d.temp > 30 && d.humidity > 70)
|
Spark SQL#
- 정형화 데이터 관련 작업을 단순화할 수 있도록 추상화
- 빠른 데이터 탐색을 위한 대화형 스파크 SQL 셸을 제공
- 표준 데이터베이스의 JDBC/ODBC 커넥터를 통해 외부의 도구들과 연결할 수 있는 중간 역할
- 최적화된 질의 계획과 JVM을 위한 최적화된 코드 생성
Catalyst Optimizer#
- 연산 쿼리를 실행 계획으로 변환하기 위해
분석 > 논리적 최적화 > 물리 계획 수립 > 코드 생성 과정을 거침 - 언어에 상관없이 사용자가 실행한 작업은 동일한 과정을 거쳐 바이트 코드로 변환
- 파이썬에서
df.explain(True)
함수를 통해 스테이지별 상세 내용 확인 가능
Optimization Flow#
- 분석: 쿼리를 위한 추상 문법 트리 생성
- 논리적 최적화: 여러 계획들을 수립하고 비용 기반 옵티마이저를 통해 최적화
- 물리 계획 수립: 논리 계획을 바탕으로 물리적 연산자를 사용해 최적화된 물리 계획 생성
- 코드 생성: 각 머신에서 실행할 효율적인 자바 바이트 코드를 생성
- 포괄 코드 생성을 통해 전체 쿼리를 하나의 함수로 합치고 CPU 레지스터 사용을 없앰