5장 - 스파크 코어 API를 사용한 고급 프로그래밍
5장에서 다루는 내용은
- spark broadcast, accumulater 소개
- RDD partitioning, repartitioning
- RDD 저장옵션
- caching, RDD checkpoint
를 다룬다고 한다.
스파크 공유변수
코드 스니펫으로 따지면
1 | sc = SparkContext() |
모든 Executors에 인메모리에 띄워서 공유할수있는 방법은 두가지
- broadcast : 드라이버에서 익스큐터들에 로드해서 가지고있기
- accumulators(SV) : executors로 부터 이벤트를 받아서 driver에서 가지고있음
(spark streaming에선 long-running 프로세스다보니까 메모리이슈가 생길수도있으니 조심)
막간을 이용한 recap Quiz
4장을 간단하게 요약하거나 질문을 위해 DAG simple job을 가정해
job, stage, task, shuffle을 도식화
아래는 간단한 term들에 대한 퀴즈
👉 정답 확인하기
- 1. transformation
- 2. action
- 3. lazy operation
- 4. running
- 5. persist, DF에선 cache
👉 정답 확인하기
- 1. job
- 2. stage
- 3. task
- 4. shuffle
데이터 파티셔닝
- logLinesRDD에 로그들을 읽어서
- lambda 펑션걸어서 Error 로그만 추출한다고하면 errorsRDD가 생성됨
- 저기서 이제 .coalesce(2)나 repartitioning으로 파티셔닝을 다시만들수있음
- 그 결과 cleanedRDD가 만들어지고
- action func .collect()으로 만들어거 drvier에서 찍어볼수있음
- DAGs들을 만들어지고 action 단계에서 최적화하기위한 노드들을 할당하는 순
캐싱
위의 예제의 전체적인 과정을 보면 아래의 그림처럼 요약이 됨
만약 캐싱과정이 없었다면
logRDD -> lambda(filter) -> errorsRDD -> repartitioning -> cleanedRDD -> lambda(filter)
이 과정이 중복적으로 .count(), .collect(), .save()하는 action에서 매번 실행되게됨
RDD 저장옵션
MEMORY_ONLY
: defaultMEMORY_AND_DISK
: 메모리에 저장하기 적절하지않으면 디스크에MEMORY_ONLY_SER
: serialize해서 저장함, 메모리절약되고 다시 deserialize 할때 오버헤드 줄어듦MEMORY_AND_DISK_SER
: ONLY_SER + AND_DISTDISK_ONLY
: 디스크에만 저장OFF_HEAP
: off-heap 메모리에 ONLY_SER 하게 저장
다음 포스팅
- 197p ~ 이후는 스파크 환경변수, 구성 속성, 구성 우선순위, 스파크 최적화, 병렬처리 최적화 등의
내용이 나오는데 .. 솔직히 지금에는 많은 부분이 와닿거나 이해가 되지않아서 정리가 부족했다 :(
나중에 다시 돌아와서 실제예에서 필요한 부분은 찾아서 봐야겠다 - pyspark skew 있냐없냐는 UI 25, 75 percentile 쪽을 봐도 판단가능함 (예를들면 거래가 많은 카카오페이쪽이 파티셔닝이 몰려가지고 이쪽 스큐가 발생한걸 최대한 쿼리를 이용해서 나눠서 성능향상시킴)
Copyright © 2024 박홍(박형준, DevHyung).