좌충우돌 Spark 배우기 - Spark RDD

이번 단원에서는 Spark RDD에 대한 개념과 사용법에 대해서 다룰 예정이다.
Spark의 기본 근간이 되는 RDD이기에 더욱 주의 깊게 공부해 보았다.

What is RDD?

Resilent Distributed Datasets (RDD)란 스파크의 가장 기초적인 데이터 구조로서 분산된 환경에서 병렬 처리를 할 수 있게 해줌으로서 distributed data-parallel하게 해준다. RDD는 크게 다섯 가지 특징이 있다.

  1. Data Abstraction 데이터의 경우에 여러 클러스터(노드)에 흩어져 있지만 하나의 파일인 것처럼 사용 가능하다.
    lines = sc.textFile(f"file://Users/Chan/repo/example.csv")
    
  2. Resilient & Immutable
    노드 중 하나가 장애 등의 이유로 망가지더라도 데이터가 변하지 않아 (Immutable) 사라지거나 손상된 파티션의 연산을 다시 실행하며 복원이 가능하다. RDD의 경우 변환과정에서 Acyclic Graph를 그리며 RDD 자체가 연산에서 변경되는게 아니라 새로운 RDD를 제작한다. 따라서, 연산중 문제가 생기면 다시 복원 후 연산을 하면 된다.

  3. Type-Safe
    컴파일시 Type을 판별할 수 있어 (Integer, String, Double) 문제를 일찍 발견할 수 있다.

  4. Handle Unstructured/Structured Data
    RDD는 Text같은 Unstructured Data와 Table같은 Structured Data를 다룰 수 있다.

  5. Lazy RDD는 Action이 있을때까지 Transformation을 실행하지않으며, 우린 이를 Lazy Evaluation이라 한다. 이 과정에 Transformation이 실행전까지 최적화 된다.

이러한 RDD는 유연하고, 짧은 코드로 할 수 있는게 많아 각광받았다 (과거형). 요즘은 Dataframe을 더 많이 쓰는 추세지만, 일단 RDD에 대해 알아본다.

RDD에서 코딩할때 분산 처리 문제, 특히 속도를 고려해야 한다.

RDD.map(A).filter(B).reduceByKey(C).take(100)
RDD.map(A).reduceByKey(c).filter(B).take(100)

두개를 비교한다면 위에가 성능이 좋다. 통신을 필요로 하는 경우 속도가 저하 되기 때문에 필터를 먼저 해야한다 (메모리 > 디스크 > 네트워크 순으로 빠르다)

RDD의 타입에는 크게 두가지가 있다.
Single Value RDD와 Key-Value RDD (Paired RDD)가 있는데, Single Value RDD는 그냥 단순한 값만 가지는 RDD이고, Key-Value RDD의 경우에는 (key, pair) 쌍을 가지고 있다. Map 함수가 2개의 값을 리턴하면 Key-Value RDD이다. Key-Value RDD에서는 Reduction과 Join이 가능하다.

RDD를 생성하는 법

RDD를 생성하는 법엔 크게 3가지가 있다.

  1. Parallelize 가장 쉬운 방법으로, 리스트 등의 형태의 데이터셋을 넘기고 RDD를 생성하는 방식이다.
    rdd = sc.parallelize([1, 2, 3, 4])
    
  2. Text file Text file을 통해 RDD를 제작 할 수 있다.
    rdd = sc.textFile("path/of/txt/file")
    
  3. 존재하는 RDD로부터 만들기 map 등 Transformation을 거치면 새로운 RDD를 반환한다.

Spark Operation - Transformation And Action

Spark Operation은 크게 TransformationAction으로 나눌 수 있다. Transformation은 하나의 RDD에서 처리 과정을 거쳐서 새로운 RDD를 반환한다. 또한 Lazy Evaluation으로 Action이 나오기 전까지 실행되지 않는다. Action은 결과값을 연산하여 출력하거나 저장하는 경우로 Eager Execution하여 즉시 실행한다. 각 Transformation과 Action을 알아본다.

Transformation

Transformation은 크게 Narrow TransformationWide Transformation이 있다. Narrow Transformation은 1:1 변환인 경우이며 1열을 조작하기 위해 다른 Partition/Row의 데이터를 쓰지 않는 경우이다. 따라서 정렬이 필요없기도 하다. 이러한 Narrow Transformation에는 이러한 예시들이 있다.

  • filter()
    특정한 조건에 따라서 filter가 가능하다.
    RDD_New = RDD.filter(lambda x: x == "Hi")
    
  • map()
    map을 통해서 각 Partition/Row의 데이터를 변경하고 싶은 경우 사용한다.
    RDD_New = RDD.map(lambda x: x + 2)
    # [1], [2], [3] -> [3], [4], [5]
    
  • mapValues()
    map을 하지만 파티션과 키를 그대로 냅두고 함수를 벨류에게만 적용한다. Key-Value RDD에 쓰인다.
  • flatmap()
    map과 비슷하지만 map이 각 row에 적용한다면, flatmap은 모든 row를 싸잡아서 적용한다.
    RDD_New = RDD.map(lambda x: x + 2)
    # [1], [2], [3] -> [3,4,5]
    
  • sample()
    데이터에서 특정한 비율대로 샘플링을 한다.
    RDD_New = RDD.sample(True, .5, seed=5)
    # 첫번째 인자는 True/False 불리언으로 한 인자가 선택되면 그것을 제외하고 샘플링을 계속 할지 말지 결정한다. 두번째 인자는 rows 생성 fraction을 의미한다. 세번째는 고정 seed이다.
    
  • union()
    유니언은 두개의 RDD를 단순히 합친다.
    RDD_New = RDD.union(RDD2)
    
  • keys()
    Key-Value RDD에 쓰이는 Transformation으로 모든 key를 가진 RDD를 생성한다.
    RDD_New = RDD.keys()
    

    그렇다면 Wide Transformation은 무엇일까? Wide Transformation은 Shuffling한다는 특징이 있는데, 아웃풋 RDD의 파티션에 다른 파티션의 데이터가 들어갈 수 있다. 따라서 많은 리소스를 요구하고 무거운 편이다. 대표적인 예시는 Intersection, Join, Distinct, reduceByKey(), groupByKey()등이 있다. 여기서 대표적인 reduceByKeys()와 groupByKey()를 보겠다.

  • groupByKey()
    groupByKey()는 Key-Value RDD에서 쓰이는 Transformation으로, 주어지는 key를 기준으로 group을 한다. groupByKey()에 인자를 넣어주게 되면 파티션의 개수를 정할 수 있다.
    rdd.groupByKey().mapValues(len)
    
  • reduceByKey()
    reduceByKey()는 Key-Value RDD에서 쓰이는 Transformation으로, key를 기준으로 그룹을 만들고 합친다.
    rdd.reduceByKey(add)
    

    reduceByKey()groupByKey()보다 훨씬 빠른다! 왜 그럴까?
    그것은 바로 Shuffling때문이다. Shuffling이란 그룹핑시 데이터를 한 노드에서 다른 노드로 옮길 때 일어나고 (결과로 나온 RDD가 원본 RDD의 다른 요소를 참조하거나, 다른 RDD를 참조할때), 성능을 저하시킨다. reduceByKey()도 Shuffling이 있긴 하지만, 각 Partition에서 먼저 reduce한 후에 groupbyKey를 하기 때문에 각 Partition에 담기는 데이터 수가 적어진다. Shuffle을 최소화 하기 위해서는 미리 Partition을 만든 후 Caching 후 reduceByKey / Join을 진행하여 최대한 로컬 환경 (Partition)에서 연산이 실행되도록 하는게 좋다.

Partition이란? 파티션은 RDD나 Dataset을 구성하고 있는 최소 단위 객체로 (즉 RDD는 쪼개져서 여러 Partition에 저장된다), 데이터를 최대한 균일하게 퍼트리고 쿼리가 같이 되는 데이터를 옆에 두어 검색 성능을 향상시킨다.
하나의 Task에서 하나의 Partition이 처리된다. 하나의 Partition은 하나의 노드에 존재하고, 노드는 여러 Partition을 가질 수 있다.
이러한 Partition은 성능에 큰 영향을 끼쳐서 잘 조절해야한다. 또한, Partition 은 Key-Value RDD를 사용할 때에만 의미가 있는데, 그 이유는 일반 RDD는 처음부터 끝까지 어차피 스캐닝을 해야하기 떄문에 의미가 없다. Partitioning에는 hash function을 이용해서 나누는 Hash Partition과 key의 범위로 나누는 Range Partition이 있다. 디스크에서 Partition하는 경우에는 partitionBy(), 메모리에서 Partition하는 경우에는 repartition()coalesce()를 써주면 된다.*partitionBy()는 Transformation으로 사용자가 지정한 파티션을 가지는 RDD를 생성하는 함수로, persist를 통해 반복하지 않게 해야한다. repartition()은 파티션의 크기를 줄이거나 늘리고, coalesce()는 파티션의 크기를 줄이는 작업이다.

Action

Action은 실제로 바로 실행되는 친구들이다. 대부분의 Spark Action은 Reduction이다. Reduction이란 근접한 요소들을 모아서 하나의 결과로 만드는 일을 뜻한다. (물론 collect()나 파일 저장의 경우에는 Reduction이 아니다) Reduction은 파티션에 따라서 결과값이 달라지게 됨으로 분산된 파티션이 합치는 걸 고려하여 교환법칙 (ab = ba)과 결합법칙 ((ab)c = a(bc))를 고려하면서 코딩해줘야 한다. Action에는 다음과 같은 친구들이 있다.

  • collect()
    Executor에 할당된 RDD를 모두 취합하는 Action. 무거워서 OOM이 날 수도 있다.
  • count()
    Element가 총 몇개 인지 구해준다.
  • countByValue()
    defaultdict를 반환하는 친구로, key별로 value가 몇개 나오는지 구해준다.
  • countByKey()
    Key-Value RDD에서 쓰이는 친구로, Key별로 개수를 세어준다.
  • take()
    take(n)이라 하면 n개를 뽑아준다.
  • first()
    첫 element를 반환한다.
  • foreach()
    foods.foreach(lambda x: print(x))
    

    Worker Node에서 이 print는 찍히게 된다. RDD의 연산 후 저장하거나 할때 유용하게 사용할 수 있다.

  • reduce() - Reduction
    reduce()는 사용자가 지정한 function을 받아서 여러 개의 element를 하나로 반환한다.
    from operator import add
    sc.parallelize([1,2,3,4]).reduce(add) #10
    
  • fold() - Reduction
    fold는 reduce와 비슷하지만, zero value가 있어 시작 value를 지정할 수 있다.
    from operator import add
    sc.parallelize([1,2,3,4]).fold(1,add) #14
    
  • groupBy() - Reduction
    groupBy는 함수를 받아서 이를 바탕으로 그룹핑을 해준다.
    from operator import add
    sc.parallelize([1,2,3,4]).groupBy(lambda x: x % 2) #짝수 홀수로 나눠짐
    
  • aggregate() - Reduction
    aggregate는 input과 output결과 타입이 다를 경우 사용한다.
    RDD.aggregate(zeroValue, seqOp, combOp)인데, zeroValue는 이전과 같고, seqOp는 map (type 변경하는 함수), combOp는 reduce (합치는 함수) 와 비슷한 역할을 한다.
    파티션 단위의 연산 결과를 합치는 과정을 거친다.

Cache & Persist

Transformation은 Lazy Execution이 되기에 메모리를 최대한 활용하여 in-memory로 데이터를 주고 받을 수 있다. 데이터를 메모리에 남겨두고 싶을때 쓰는 것이 바로 cache()와 persist()이다.

RDD_original = lines.map(parse).persist()
RDD_new_1 = RDD_original.take(10)
RDD_new_2 = RDD_original.mapValues(lambda x: (x,1)).collect()

다음과 같이 persist()를 사용하면, 두번 사용되는 RDD_original을 그냥 메모리에 저장해 두고 쓸 수 있다. cache() 또한 비슷한 역할을 하는데 둘은 어떻게 다를까? 바로 어디만큼 저장하냐에 따라 다르다. Cache는 디폴트 Storage Level을 사용하게 되는데, RDD에서는 MEMORY_ONLY (메모리에만), Dataframe은 MEMORY_AND_DISK (메모리와 디스크 둘다. RDD가 메모리에 fit하지않는 경우 디스크 사용)에 저장한다. Persist는 사용자가 Storage Level을 지정할 수 있다.

다음 챕터부터는 Spark SQL & DataFrame에 대해서 알아본다.

Reference

실시간 빅데이터 처리를 위한 Spark & Flink Online