Apache Spark - 고차함수(Higher-Order Functions)

User-Defined Functions #

스파크는 자신의 기능을 정의할 수 있는 유연성을 제공한다. 이를 사용자 정의 함수(User-Defined Function, UDF)라고 한다.

UDF를 생성하는 이점은 스파크 SQL 안에서 이를 사용할 수 있다는 것이다.

Spark SQL UDF 활용 #

다음은 스파크 SQL UDF를 만드는 예시로, 인수를 세제곱하는 함수 cubed() 를 생성한다.

python
from pyspark.sql.types import LongType

# 큐브 함수 생성
def cubed(s):
    return s * s * s

# UDF로 등록
spark.udf.register("cubed", cubed, LongType())

스파크 SQL을 사용하여 cubed() 함수를 실행할 수 있다.

Apache Spark - JDBC 및 데이터베이스

Spark SQL CLI #

스파크 SQL 쿼리를 실행하는 쉬운 방법은 spark-sql CLI이다. 스파크 SQL CLI는 Hive 메타스토어와 서비스와 통신하는 대신 Thrift JDBC 서버와 통신할 수 없다.

Hive 설치 #

진행하기 전에 Hive가 설치되어 있지 않아서 설치해야 했다.

설치 과정은 [Hive] virtual box linux [ubuntu 18.04]에 하이브 설치,다운로드 4.ubuntu 에 Hive(하이브) 다운로드 게시글을 참고했다.

/hive/hive-4.0.1/apache-hive-4.0.1-bin.tar.gz
  1. 브라우저 또는 curl, wget 등 명령어를 통해 압축 파일을 내려받는다.
bash
wget https://dlcdn.apache.org/hive/hive-4.0.1/apache-hive-4.0.1-bin.tar.gz
  1. 압축 해제 프로그램을 사용하거나, 터미널에서 아래 명령어를 입력하여 압축 해제한다.
bash
tar zxvf apache-hive-4.0.1-bin.tar.gz
  1. Hive 경로에 접근하기 위해 ~/.zshrc 에 환경변수를 설정한다.
bash
export HIVE_HOME=/Users/{username}/hive-4.0.1
export PATH=$PATH:$HIVE_HOME/bin
  1. 변경 사항을 적용하기 위해 터미널을 재시작하거나 아래 명령어를 실행한다.
bash
source ~/.zshrc
  1. $HIVE_HOME/bin/hive-config.sh 파일에 HDFS 경로를 추가한다.
bash
export HADOOP_HOME=/Users/{username}/hadoop-3.4.0
  1. HDFS에 Hive 디렉터리를 생성한다.
bash
$HADOOP_HOME/sbin/start-all.sh
bash
hdfs dfs -mkdir /tmp
hdfs dfs -chmod g+w /tmp
bash
hdfs dfs -mkdir -p /user/hive/warehouse
hdfs dfs -chmod g+w /user/hive/warehouse
bash
% hdfs dfs -ls /
drwxrwxr-x   - user supergroup          0 2025-07-12 10:08 /tmp
drwxr-xr-x   - user supergroup          0 2025-07-12 10:08 /user
  1. $HIVE_HOME/conf/hive-site.xml 파일에 아래 속성을 맨 윗부분에 추가한다. 파일이 없을 경우 동일한 경로의 hive-default.xml.template 파일을 hive-site.xml 이름의 파일로 복사한다.
xml
<property>
  <name>system:java.io.tmpdir</name>
  <value>/tmp/hive/java</value>
</property>

<property>
  <name>system:user.name</name>
  <value>${user.name}</value>
</property>
  1. Derby DB를 시작한다. 오류가 발생할 경우 참고한 게시글을 확인해볼 수 있다.
bash
$HIVE_HOME/bin/schematool -initSchema -dbType derby
bash
Initializing the schema to: 4.0.0
Metastore connection URL:	 jdbc:derby:;databaseName=metastore_db;create=true
Metastore connection Driver :	 org.apache.derby.jdbc.EmbeddedDriver
Metastore connection User:	 APP
Starting metastore schema initialization to 4.0.0
Initialization script hive-schema-4.0.0.derby.sql
...
Initialization script completed
  1. Hive CLI를 시작해본다.
bash
$HIVE_HOME/bin/hive
bash
Beeline version 4.0.1 by Apache Hive
beeline> 
  1. Hive 메타스토어 서버를 실행한다.
bash
hive --service metastore &

hive-site.xml 편집하기 #

Hive Tables 공식문서에 따르면, Spark SQL로 Hive에 저장된 데이터에 액세스하려면 hive-site.xml, core-site.xml, hdfs-site.xml 파일들을 $SPARK_HOME/conf/ 경로에 배치해야 한다.

Apache Spark - 데이터 소스

Data Source API #

DataFrameReader #

DataFrameReader는 데이터 소스에서 DataFrame으로 데이터를 읽는 방식이다. 아래와 같이 권장되는 사용 패턴이 있다.

python
DataFrameReader
	.format(args) # 데이터 소스 형식
    .option("key", "value") # 키/값 쌍으로 연결되는 옵션
    .schema(args) # DDL 문자열 또는 StructType
    .load() # 데이터 소스의 경로

데이터 소스 형식에는 인수로 "parquet", "csv", "txt", "json", "jdbc", "orc", "avro" 등이 전달된다. 기본값은 "parquet" 또는 spark.sql.sources.default 에 지정된 항목이 설정된다.

JSON이나 CSV 형식은 option() 함수에서 스키마를 유추하는 inferSchema 옵션을 적용할 수 있지만, 스키마를 제공하면 로드 속도가 빨라진다.

Apache Spark - 스파크 SQL

Spark SQL #

스파크 SQL은 다음과 같은 특징을 갖는다.

  • 정형화 API가 엔진으로 제공한다.
  • 다양한 정형 데이터(Parquet 등)를 읽거나 쓸 수 있다.
  • 외부 BI 툴(태블로 등)의 데이터 소스나 RDBMS(MySQL 등)의 데이터를 쿼리할 수 있다.
  • 정형 데이터에 대해 SQL 쿼리를 실행할 수 있는 대화형 쉘을 제공한다.

Spark SQL 사용법 #

python
spark.sql("SELECT * FROM table")

SparkSession 객체에 sql() 함수를 사용한다. 쿼리 결과로는 DataFrame 객체가 반환된다.

Spark SQL 활용 (Python) #

databricks/LearningSparkV2databricks-datasets/learning-spark-v2/flights 경로에서 미국 항공편 운항 지연 데이터세트 departuredelays.csv 를 가져온다. 해당 데이터를 활용해 아래와 같이 임시뷰를 생성한다.

Apache Spark - Structured API

Spark Structure #

정형화 API에 대해 알아보기에 앞서, 정형적 모델 이전의 RDD 프로그래밍 API 모델을 확인해본다.

RDD #

RDD는 Spark 1.x 버전에 있던 저수준의 DSL을 의미하고, 스파크에서 가장 기본적인 추상적인 부분이다. RDD에는 세 가지의 핵심으로 특성이 있다.

  1. 의존성
    어떤 입력을 필요로 하고 RDD가 어떻게 만들어지는지 Spark에게 가르쳐 주는 의존성이 필요하다.

  2. 파티션
    Executor들에 작업을 분산해 파티션별로 병렬 연산할 수 있는 능력을 부여한다. 지역성 정보를 사용하여 각 Executor가 가까이 있는 Executor에게 우선적으로 작업을 보낸다.

Apache Spark - 스파크 애플리케이션 및 RDD

Spark Application #

Spark Application은 Driver Process 하나와 일련의 일련의 Executors로 구성된다.

Spark Applications Explained | Databricks

Driver Process #

Driver Process는 main() 함수를 실행하고 클러스터 내 노드에서 세 가지 작업을 담당한다.

  1. Spark Application 관련 정보를 유지한다.
  2. 사용자의 프로그램이나 입력에 대응한다.
  3. Executor 작업을 분석, 배포, 예약한다.

Executor #

Executor는 Driver가 할당한 작업을 실제로 실행하는 역할을 하는데, 두 가지 작업을 담당한다.

  1. Driver가 할당한 Task를 실행한다.
  2. Task의 상태와 결과를 Driver 노드에 보고한다.

Cluster Manager #

실물 시스템을 제어하고 Spark Application에 리소스를 할당하는 작업은 Cluster Manager가 맡는다. Spark Application의 실행 과정에서 Cluster Manager는 Application이 실행되는 물리적인 머신을 관리한다. Spark Application은 클러스터에서 독립적인 프로세스로 실행되며, SparkContext 객체에 의해 조정된다.

Apache Spark - 설치하고 PySpark 실행하기

Spark Installation #

Apple Silicon 환경에서 스파크 설치를 진행합니다.
각 섹션의 이미지를 클릭하면 설치 페이지 또는 관련 문서로 이동합니다.

Spark 설치 #

Download Apache Spark

아파치 스파크 다운로드 페이지로 가서 최신 버전 4.0.0 및 "Pre-built for Apache Hadoop" 옵션을 선택하면 해당 버전의 다운로드 링크 spark-4.0.0-bin-hadoop3.tgz 가 나타난다. 해당 링크로 이동하면 아래와 같이 Hadoop 관련 바이너리 파일이 포함된 압축 파일의 설치 경로를 확인할 수 있다.

https://www.apache.org/dyn/closer.lua/spark/spark-4.0.0/spark-4.0.0-bin-hadoop3.tgz
  1. 브라우저 또는 curl, wget 등 명령어를 통해 압축 파일을 내려받을 수 있다.
bash
wget https://dlcdn.apache.org/spark/spark-4.0.0/spark-4.0.0-bin-hadoop3.tgz
  1. 압축 해제 프로그램을 사용하거나, 터미널에서 아래 명령어를 입력하여 압축 해제한다.
bash
tar zxvf spark-4.0.0-bin-hadoop3.tgz
  1. Spark 경로에 접근하기 위해 환경변수를 설정한다.
bash
vi ~/.zshrc
  1. vi 편집기로 .zshrc 파일에 Spark 경로를 등록한다. SPARK_HOME 은 압축 해제한 Spark 경로를 입력한다.
bash
export SPARK_HOME=/Users/{username}/spark-4.0.0
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
  1. 변경 사항을 적용하기 위해 터미널을 재시작하거나 아래 명령어를 실행한다.
text
source ~/.zshrc

주의할 점은 Spark를 실행하기 전에 Java와 Hadoop이 설치되어 있어야 한다. 보통은 Java 또는 Hadoop 버전에 맞춰서 Spark를 설치하지만, 어떤 것도 설치되어 있지 않기 때문에 스파크 버전에 맞춰서 Java와 Hadoop을 설치한다.

Apache Spark - 스파크의 기본 개념과 아키텍처

Study Overview #

러닝 스파크 2nd 개정판 과정을 따릅니다.

목적 #

  • 대용량 데이터 처리를 위한 아파치 스파크를 이론적으로 학습
  • 책에서 대상으로 하는 스파크 3.x 버전과 25년 5월 출시된 Spark 4.0 버전을 비교
  • 각 챕터에서 배운 것으로 실습할만한 것이 있다면 추가로 시도하기
  • 실습은 PySpark API를 사용하며, 최신화된 PySpark 4.0.0 문서를 참조

챕터 #

  1. 아파치 스파크 소개: 통합 분석 엔진
  2. 아파치 스파크 다운로드 및 시작
  3. 아파치 스파크의 정형화 API
  4. 스파크 SQL과 데이터 프레임: 내장 데이터 소스 소개
  5. 스파크 SQL과 데이터 프레임: 외부 데이터 소스와 소통하기
  6. 스파크 SQL과 데이터세트
  7. 스파크 애플리케이션의 최적화 및 튜닝
  8. 정형화 스트리밍
  9. 아파치 스파크를 통한 안정적인 데이터 레이크 구축
  10. MLlib을 사용한 머신러닝
  11. 아파치 스파크로 머신러닝 파이프라인 관리, 배포 및 확장
  12. 에필로그: 아파치 스파크 3.0

Spark Overview #

스파크의 시작 #

RDBMS 같은 전통적인 저장 시스템으로는 구글이 방대한 규모의 인터넷 문서를 다룰 수 없어 구글 파일 시스템(GFS), 맵리듀스(MapReduce), 빅테이블(BigTable) 등을 만들어 냈다. GFS는 클러스터 환경에서 분산 파일시스템을 제공하고, 빅테이블은 GFS를 기반으로 대규모 데이터 저장 수단을 제공한다. 맵리듀스는 함수형 프로그래밍 기반으로 대규모 데이터 분산 처리를 구현했다. 클러스터의 워커 노드들이 분산된 데이터에 연산을 하고(Map), 그 결과를 하나로 합쳐(Reduce) 최종 결과를 생성해낸다. 이러한 접근 방식은 네트워크 트래픽을 크게 감소시키면서 로컬 디스크에 대한 I/O를 극대화한다.