<개요>

 - Apache NiFi를 사용하다 보면 디스크 용량을 많이 차지하는 것을 볼 수 있다.

 - 이유는 DataFlow상에서 각 Processor를 거칠때 마다 모든 내용을 다 저장하기 때문이다.

 - 이러한 내용들도 결국 어딘가에 다 저장될 텐데 NiFi에서는 Repositories 라는 논리적 개념을 통해서 이를 정리하고 있다.

 

<내용>

- The FlowFile Repository : 현재 흐름상에 있는 FlowFiles들의 Metadata를 저장한다.

- The Content Repository : 현재 흐름상에 있는 Contents와 과거 FlowFiles를 저장한다.

- The Provenance Repository : FlowFiles들의 history를 저장한다.

이러한 각 Repository들을 통해서 Nifi가 어떻게 Data Flow를 처리하고, 각 Transcation들을 보장하며 메모리와 디스크를 어떻게 사용하고 Log를 관리하고 활용하는지를 더 자세하게 살펴볼 예정이다. 

 

1. FlowFile Repository

2. Content Repository

3. Provenance Repository

 

 

<참조사이트>

https://nifi.apache.org/docs/nifi-docs/html/nifi-in-depth.html#repositories


<개요>

- Apache NiFi의 경우 Flow-based programming 의 패러다임을 잘 살려서 만든 Data Flow를 위한 OpenSource 이다

- FBP는 결국 'Data Factory'라는 컨셉을 Application을 가져가는 것인데 최근에 MS Azure에서 Data 수집으로 제공하는 솔루션이 'Data Factory' 로 출시되어서 그 연관성을 보여주고 있다.

- FBP에서 간단히 정리하자면 블랙박스로 구성된 네트워크의 개념으로 Application을 정의하는 것으로 Processor -<connection>- Processor 의 형태로 메시지를 전달하면서 데이터를 처리하게 된다.

- Apache Nifi의 개념, 기술요소에 대해서는 나중에 정리하도록 해야겠다.

 

<내용>

- NiFi의 경우 이렇게 데이터의 흐름에 초점을 맞추고 Application을 개발하며 이 때 처리되는 내용들을 여러 Repositories로 저장한다.

- FlowFiles가 가장 중요한 개념인데, 하나의 FlowFile은 하나의 Record를 가리킨다. 이 때 content에 직접 접근하는 것이 아니라 Pointer로 관리하고 그외 content의 속성을 나타내는 attributes와 events  도 같이 포함된다. 즉, (Pointer + Attributes + Events)로 이해하는 것이 쉽다.

 각 Attribute들은 key / value쌍으로 메타정보를 담고 있다. (ex, filename등 processor별로 다를 수 있다.)

- 동시성 프로그래밍에서 필수로 꼽히는 요소가 immutable한가 인데,  Spark RDD , Akka Actor들이 그렇듯이 NiFi에서는 이러한 정보들을 Repositories에 저장할때 immutable하도록 관리한다.

- 예를 들어서 FlowFile의 attribute에 변경이 일어날 경우, 기존 내용을 수정하는 것이 아니라 새로 복사본을 만들어낸 뒤에 저장한다.

- Content에 변경역시 기존의 Content를 읽어서 새롭게 기록하고 FlowFile의 pointer를 새로운 위치로 업데이트 한다.

- 이를 통해서 OS Caching을 활용하고 Randon read/write의 비율을 감소시키는 이점이 있는데 이는 Kafka가 사용하는 방식과 매우 유사하다.

 

<Copy on write의 개념>

- Copy on write는 implicit sharing 또는 shadowing이라고 불리는 기법이다.

- 같은 Resource에 대해서 수정사항이 없지만 복제가 필요한 상황에서 사용된다.

- 꼭 새로운 복제본을 만들 필요는 없지만 수정사항이 발생한다면 복제본을 반드시 만들어야 한다.

- 활용예제

 a. 여러 프로그래밍 언어에서 문자연산 (ex, "+" )을 할 때 많이 사용된다.

 b. snapshot을 생성하는 것에도 사용된다. (Redirect on write, Copy on write)

 

<그림-1>

 

<Nifi에서의 copy on write활용>

- CompressContent processor , Merge processor 등 여러 프로세서에서 활용하고 있는데 이전 프로젝트에서 많이 사용했던 Merge의 예를 살펴보면

- Merge의 대상이 되는 원 프로세서에서 각 FlowFile이 넘어오면 MergeContent 프로세서에서는 모든 FlowFile을 하나로 합쳐서 새로운 하나의 FlowFile을 생성한다.

- 예를 들어서 10개의 프로세서가 하나의 MergeContent 프로세서로 연결되면 10 FlowFiles => 1 FlowFiles로 변경이 된다. 이 과정에서 Interval등을 조정하면 더 많은 FlowFiles을 하나로 합칠 수도 있으나 connection에 담기는 건수가 너무 클 경우 메모리 오류등이 발생할 수 있으니 주의해야 한다.

- MergeContent프로세서 역시 입력된 FlowFiles을 수정하는 것이 아니라 새로운 FlowFile을 생성하고 해당 Location을 지정한다.

 

<정리>

- 원본의 변화가 없기 때문에 에러가 발생할 경우 재현이 가능하다.

- 각 단계별로 별도 저장을 하기 때문에 저장소 공간 확보를 잘 해줘야 한다.

- Memory를 사용하여 처리하는 구조이기 때문에 처리하는 데이터의 양, 주기등을 고려해야 한다.

- 실제로 프로젝트에서 Merge단계에 Memory오류가 자주 발생하였다.

 

<참조사이트>

https://en.wikipedia.org/wiki/Flow-based_programming

https://en.wikipedia.org/wiki/Copy-on-write

http://storagegaga.com/tag/copy-on-write/

https://nifi.apache.org/docs/nifi-docs/html/nifi-in-depth.html#copy-on-write

 

 

<개요>

YARN은 현재 CPU 와 Memory에 대해서만 스케쥴링을 수행한다.

따라서 Spark를 YARN Cluster 혹은 Client로 수행할때에도 해당 리소스의 제어를 받게 되는데 이 때 최적의 세팅값을 찾는 법을 확인해본다.


<내용>

<그림 1 : Spark와 Yarn Containter간의 메모리 관계>


A. 위의 그림을 참고로 해서 Spark와 YARN Configuration값을 살펴보면


1) Sparak Executor

--executor-cores 와 spark.executor.cores  는 Spark Executor가 동시에 수행할 수 있는 tasks의 수이다. 


--executor-memory 와 spark.executor.memory 는 Spark Executor의 heap size이다.


세팅된 두 값은 모든 Spark Executor가 같은 값으로 동작한다.


2) num of executors

--num-executors 와 spark.executor.instances  Spark Executor의 수를 의미하지만

spark.dynamicAllocation.enabled 옵션을 통해서 동적으로 제어된다. (CDH 5.4/Spark 1.3)



3) YARN

yarn.nodemanager.resource.memory-mb 은 각 노드에서 컨테이너들이 사용할 수 있는 메모리의 총합이고


yarn.nodemanager.resource.cpu-vcores 은 각 노드에서 컨테이너들이 사용할 수 있는 core의 총합이다.



4) 보정수치

각 컴포넌트의 관계로 인해서 각 설정값은 정확히 일치할 수 없고 여유의 값을 두어야 한다. 예를 들면 다음과 같다.


  • --executor-memory/spark.executor.memory 는 executor의 heap size를 결정하지만 JVM은 off heap 부분에 대해서도 사용을 하기 때문에 Yarn container는 이를 감안하여 공간을 설정해야 한다.   spark.yarn.executor.memoryOverhead 값을 통해서 추가공간을 확보하며 default 는  0.07 * spark.executor.memory  이다
  • yarn.scheduler.minimum-allocation-mb 과 yarn.scheduler.increment-allocation-mb 를 통해서 



B. 예제를 통해서 살펴보자


각각 16 cores 와 64GB 의 메모리를 가진 노드가 6개 있는 cluster 인경우 


yarn.nodemanager.resource.memory-mb = 63 * 1024

yarn.nodemanager.resource.cpu-vcores = 15


를 사용할 수 있다. 


Hadoop에서 자체적으로 띄우는 데몬들이나 OS에서 사용하는 리소스들이 있기 때문에 최소 1G와 1개의 Core는 남겨둔 상태이며 필요에 따라서는 여유를 더 많이 둬야 한다.


두가지 설정값을 예시로 비교해보면



1) --num-executors 6 --executor-cores 15 --executor-memory 63G 로 세팅한다면??


단순히 생각하면 이상적인 수치이다. 각 노드에서 Executor를 하나씩 띄우고,

각 Executor는 63G의 메모리와 15 core를 Full로 사용해서 작업한다. 

그러나 다음과 같은 이유로 사실은 비효율적인 세팅이 된다.


- 그림1에서 보는 것처럼 executor의 메모리는 Yarn Nodemanager resource memory 와 Container 보다 작아야 한다.


- AM이 기동되어야 하기 때문에 해당노드에서는 executor가 15 cores 를 사용할 수 없다.


- executor가 15 cores를 사용할 경우 HDFS I/O로 인해 성능의 병목이 생긴다.



2) --num-executors 17 --executor-cores 5 --executor-memory 19G 가 더 나을 듯 하다.

이유는 다음과 같다.


- 각 노드에서 3개씩 executors가 기동되고 하나의 노드에서는 2개가 기동되어 AM을 위한 공간이 확보된다.


- 각 노드에서 3개의 executors가 기동되기 때문에 63 / 3 = 21 이 되며 위에서 설명한 여유공간을 위해서 21 * 0.07 = 1.47 을 제외한 만큼 19G로 executor-memory 로 설정한다.


<기타>

실제로 YARN Application에서 모니터링을 해보면 spark.executor.memory로 설정한 값을 모두 사용하지는 않는다. 그 이유는 그림1에서 보는 것처럼 fraction수치를 코드내에서 곱해주도록 되어있기 때문이다.



<참고>

http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

'BigData' 카테고리의 다른 글

Spark에서 Hive ACID Table 접근시 오류  (0) 2017.10.12
Apache hive - transaction  (0) 2017.09.26
Hadoop Security for Multi tenant #4  (0) 2017.04.03
Hadoop Security for Multi tenant #3  (0) 2017.03.24
Hadoop Securiy for Multi tenant #2  (0) 2017.01.20

<개요>

 - Apache Kylo에서 Transformation 이나 Visual Query 수행시 Hive ACID Table에 접근할 경우 오류 발생


java.lang.RuntimeException: serious problem
        at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1021)
        at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getSplits(OrcInputFormat.java:1048)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
        at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:166)
        at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
        at org.apache.spark.sql.hive.HiveContext$QueryExecution.stringResult(HiveContext.scala:635)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:64)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:311)
        at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:226)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: delta_0000000_0000000 does not start with base_
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:998)
        ... 44 more


<내용>

 - Hive ACID Table의 경우 ORC 포멧을 이용하여 BASE File과 Delta File의 형태로 처리한다.

  (http://icthuman.tistory.com/entry/Apache-hive-transaction 참고)


 - Spark에서 파일을 읽으려고 하는데 최초에는 Base File이 존재하지 않는다.

  (테이블 생성후 insert를 하면 Delta File만 생성이 된다.)


<해결방안>

 - 수동으로 Major Compaction을 수행하면 Base File이 만들어지기 때문에 정상적으로 Spark에서 처리가 가능하다.

 

ALTER TABLE table_name [PARTITION (partition_key = 'partition_value' [, ...])]

  COMPACT 'compaction_type'[AND WAIT]
  [WITH OVERWRITE TBLPROPERTIES ("property"="value" [, ...])];


<참고>

https://issues.apache.org/jira/browse/HIVE-15189

https://issues.apache.org/jira/browse/SPARK-16996






네이버 블로그

LinkedIn


https://github.com/ggthename

'개인연결사이트' 카테고리의 다른 글

벡엔드 개발자 로드맵  (0) 2021.04.06

<개요>

Apache Hive는 HDFS에 저장되어 있는 파일데이터를  SQL 기반으로 처리할 수 있도록 하는 오픈소스이다. (모든 SQL을 지원하는 것은 아니며, 파일시스템 특성상 UPDATE, DELETE는 권장하지 않는다. )

그러나 지속적으로 DataWareHouse  트랜잭션 처리에 대한 요구사항이 꾸준히 생겨서 Hive에서도 트랜잭션을 지원하기 위한 기능이 개발되었다.

이에 대해서 내부구조를 간략히 살펴본다. (원문해석 + 개인이해/경험추가 )


<ACID>

database transaction의 4대 속성이라고도 하는데 다음과 같다.

https://ko.wikipedia.org/wiki/ACID


Atomicity(원자성)

트랜잭션과 관련된 작업들이 부분적으로 실행되다가 중단되지 않는 것을 보장하는 능력이다. 예를 들어, 자금 이체는 성공할 수도 실패할 수도 있지만 보내는 쪽에서 돈을 빼 오는 작업만 성공하고 받는 쪽에 돈을 넣는 작업을 실패해서는 안된다. 원자성은 이와 같이 중간 단계까지 실행되고 실패하는 일이 없도록 하는 것

Consistency(일관성)

트랜잭션이 실행을 성공적으로 완료하면 언제나 일관성 있는 데이터베이스 상태로 유지하는 것을 의미한다. 무결성 제약이 모든 계좌는 잔고가 있어야 한다면 이를 위반하는 트랜잭션은 중단

Isolation(고립성)

트랜잭션을 수행 시 다른 트랜잭션의 연산 작업이 끼어들지 못하도록 보장하는 것을 의미한다. 이것은 트랜잭션 밖에 있는 어떤 연산도 중간 단계의 데이터를 볼 수 없음을 의미한다. 은행 관리자는 이체 작업을 하는 도중에 쿼리를 실행하더라도 특정 계좌간 이체하는 양 쪽을 볼 수 없다. 공식적으로 고립성은 트랜잭션 실행내역은 연속적이어야 함을 의미한다. 성능관련 이유로 인해 이 특성은 가장 유연성 있는 제약 조건이다

Durability(지속성)

공적으로 수행된 트랜잭션은 영원히 반영되어야 함을 의미한다. 시스템 문제, DB 일관성 체크 등을 하더라도 유지되어야 함을 의미한다. 전형적으로 모든 트랜잭션은 로그로 남고 시스템 장애 발생 전 상태로 되돌릴 수 있다. 트랜잭션은 로그에 모든 것이 저장된 후에만 commit 상태로 간주될 수 있다.


<Hive-ACID>

Hive 0.13이전에는 원자성, 일관성, 지속성에 대해서는 파티션 레벨로 지원하기 시작했으며, 고립성에 대해서는 Zookeeper나 메모리상에서 지원하는 locking 메커니즘 수준으로 제공되었으나 이제는 row level로는 ACID를 만족시킬 수 있다.

그러나 아직은 제약사항이 좀 있다.


1. 제약사항

- BEGIN, COMMIT, ROLLBACK을 지원하지 않는다. 모두다 auto-commit이다

그렇다 보니 Spring과 연계하여 사용할 때 약간의 불편함이 있다. Spring 에서 지원하는 @JdbcTest 를 사용할 경우 단위테스트 작성시 rollback기능등을 이용하여 각 메소드별 테스트시 데이터정합성이 깨지지 않도록 보장해주는 기능이 있는데 해당 기능이 사용불가능 하다. 



- ORC포멧만 지원한다. 

- Bucket설정이 되어야 한다. 또한 External Table의 경우 compactor가 제어할 수 없기 때문에 ACID테이블로 만들 수 없다.


- non-ACID session에서는 ACID Table에 대한 읽기/쓰기를 할 수 없다. 

  어찌보면 당연하다. ACID를 사용하기 위해서는 org.apache.hadoop.hive.ql.lockmgr.DbTxnManager 로 hive transaction manager를 변경해야 한다.


- Dirty read, read committed, repeatable read, serializable의 isolation level은 지원하지 않는다.

- 기존의 zookeeper나 in-memory 락 메커니즘은 호환불가능



2. 기본 설

처음에 이야기한 것처럼 HDFS는 오직 통으로 움직이는 것을 좋아한다.

그래서 transaction들의 특징을 만족시키기 위해서 아래와 같이 설게되었다.


테이블이나 파티션은 Base 파일의 집합으로 저장하고, 새로운 레코드나 update, delete에 대해서는 Delta 파일로 저장한다. 델타파일들의 집합이 만들어지면 읽는 시점에 합친다.



<Compactor>

ACID를 지원하기 위해서 Meatastore에서 수행되고 있는 background processes들을 compactor라고 한다.


Hive는 Base File과 Delta File을 이용한 Compaction으로 ACID를 지원하기 때문에  Table을 생성할때 


STORED AS ORC 와 

CLUSTERED BY XXX INTO N BUCKETS  를 반드시 추가해야 한다.


apache orc 사이트에 가면 Hive ACID를 어떤 형태로 Support 하고 있는지 상세하게 설명이 되어있다.


<그림 1 : Hive Table>



<그림 2 : Major & Minor compaction>



Delta파일들이 늘어나게 되면 성능을 위해서 compaction을 수행하게 되는데 유형은 다음과 같다.


- minor compaction : Delta파일들을 모아서 Bucket당 하나의 Delta파일로 rewrite한다.

- major compaction : 하나이상의 Delta파일과 Bucket당 Base파일을 가지고 Bucket당 새로운 Base파일로 rewrite한다. 


실제로 YARN 에서 조회하면 UPDATE,DELETE를 수행할때마다 YARN Application이 수행되지는 않으며

주기적으로 수행되는 compactor작업을 확인할 수 있다.



<Transaction/Lock Manager>

추후 상세설명



3. Configuration

Client Side

Server Side (Metastore)



4. Table Properties

ACID를 사용하기 위해서는 (update,delete) "transactional=true" 를 설정해야 한다.

그리고 한번 ACID테이블로 생성하면 되돌릴 수 없으니 주의해야 한다.

또한 hive-sitem.xml등에서 hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager 로 지정해야 하며 이 txManager가 없이는 ACID테이블을 사용할 수 없다.



Example: Set compaction options in TBLPROPERTIES at table level
CREATE TABLE table_name (
  id                int,
  name              string
)
CLUSTERED BY (id) INTO 2 BUCKETS STORED AS ORC
TBLPROPERTIES ("transactional"="true",
  "compactor.mapreduce.map.memory.mb"="2048",     -- specify compaction map job properties
  "compactorthreshold.hive.compactor.delta.num.threshold"="4",  -- trigger minor compaction if there are more than 4 delta directories
  "compactorthreshold.hive.compactor.delta.pct.threshold"="0.5" -- trigger major compaction if the ratio of size of delta files to
                                                                   -- size of base files is greater than 50%
);



그리고 2번에서 설명한 것처럼 compaction으로 ACID를 지원하기 때문에 compator에 대한 많은 상세설정이 있으며 compactor에 대한 설정은 ALTER문을 통해서 변경할 수 있다.


Example: Set compaction options in TBLPROPERTIES at request level
ALTER TABLE table_name COMPACT 'minor' 
   WITH OVERWRITE TBLPROPERTIES ("compactor.mapreduce.map.memory.mb"="3072");  -- specify compaction map job properties
ALTER TABLE table_name COMPACT 'major'
   WITH OVERWRITE TBLPROPERTIES ("tblprops.orc.compress.size"="8192");         -- change any other Hive table properties




<참고사이트>

https://cwiki.apache.org/confluence/display/Hive/Hive+Transactions

https://orc.apache.org/docs/acid.html


+ Recent posts