<개요>

YARN은 현재 CPU 와 Memory에 대해서만 스케쥴링을 수행한다.

따라서 Spark를 YARN Cluster 혹은 Client로 수행할때에도 해당 리소스의 제어를 받게 되는데 이 때 최적의 세팅값을 찾는 법을 확인해본다.


<내용>

<그림 1 : Spark와 Yarn Containter간의 메모리 관계>


A. 위의 그림을 참고로 해서 Spark와 YARN Configuration값을 살펴보면


1) Sparak Executor

--executor-cores 와 spark.executor.cores  는 Spark Executor가 동시에 수행할 수 있는 tasks의 수이다. 


--executor-memory 와 spark.executor.memory 는 Spark Executor의 heap size이다.


세팅된 두 값은 모든 Spark Executor가 같은 값으로 동작한다.


2) num of executors

--num-executors 와 spark.executor.instances  Spark Executor의 수를 의미하지만

spark.dynamicAllocation.enabled 옵션을 통해서 동적으로 제어된다. (CDH 5.4/Spark 1.3)



3) YARN

yarn.nodemanager.resource.memory-mb 은 각 노드에서 컨테이너들이 사용할 수 있는 메모리의 총합이고


yarn.nodemanager.resource.cpu-vcores 은 각 노드에서 컨테이너들이 사용할 수 있는 core의 총합이다.



4) 보정수치

각 컴포넌트의 관계로 인해서 각 설정값은 정확히 일치할 수 없고 여유의 값을 두어야 한다. 예를 들면 다음과 같다.


  • --executor-memory/spark.executor.memory 는 executor의 heap size를 결정하지만 JVM은 off heap 부분에 대해서도 사용을 하기 때문에 Yarn container는 이를 감안하여 공간을 설정해야 한다.   spark.yarn.executor.memoryOverhead 값을 통해서 추가공간을 확보하며 default 는  0.07 * spark.executor.memory  이다
  • yarn.scheduler.minimum-allocation-mb 과 yarn.scheduler.increment-allocation-mb 를 통해서 



B. 예제를 통해서 살펴보자


각각 16 cores 와 64GB 의 메모리를 가진 노드가 6개 있는 cluster 인경우 


yarn.nodemanager.resource.memory-mb = 63 * 1024

yarn.nodemanager.resource.cpu-vcores = 15


를 사용할 수 있다. 


Hadoop에서 자체적으로 띄우는 데몬들이나 OS에서 사용하는 리소스들이 있기 때문에 최소 1G와 1개의 Core는 남겨둔 상태이며 필요에 따라서는 여유를 더 많이 둬야 한다.


두가지 설정값을 예시로 비교해보면



1) --num-executors 6 --executor-cores 15 --executor-memory 63G 로 세팅한다면??


단순히 생각하면 이상적인 수치이다. 각 노드에서 Executor를 하나씩 띄우고,

각 Executor는 63G의 메모리와 15 core를 Full로 사용해서 작업한다. 

그러나 다음과 같은 이유로 사실은 비효율적인 세팅이 된다.


- 그림1에서 보는 것처럼 executor의 메모리는 Yarn Nodemanager resource memory 와 Container 보다 작아야 한다.


- AM이 기동되어야 하기 때문에 해당노드에서는 executor가 15 cores 를 사용할 수 없다.


- executor가 15 cores를 사용할 경우 HDFS I/O로 인해 성능의 병목이 생긴다.



2) --num-executors 17 --executor-cores 5 --executor-memory 19G 가 더 나을 듯 하다.

이유는 다음과 같다.


- 각 노드에서 3개씩 executors가 기동되고 하나의 노드에서는 2개가 기동되어 AM을 위한 공간이 확보된다.


- 각 노드에서 3개의 executors가 기동되기 때문에 63 / 3 = 21 이 되며 위에서 설명한 여유공간을 위해서 21 * 0.07 = 1.47 을 제외한 만큼 19G로 executor-memory 로 설정한다.


<기타>

실제로 YARN Application에서 모니터링을 해보면 spark.executor.memory로 설정한 값을 모두 사용하지는 않는다. 그 이유는 그림1에서 보는 것처럼 fraction수치를 코드내에서 곱해주도록 되어있기 때문이다.



<참고>

http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

'BigData' 카테고리의 다른 글

Spark에서 Hive ACID Table 접근시 오류  (0) 2017.10.12
Apache hive - transaction  (0) 2017.09.26
Hadoop Security for Multi tenant #4  (0) 2017.04.03
Hadoop Security for Multi tenant #3  (0) 2017.03.24
Hadoop Securiy for Multi tenant #2  (0) 2017.01.20

<개요>

 - Apache Kylo에서 Transformation 이나 Visual Query 수행시 Hive ACID Table에 접근할 경우 오류 발생


java.lang.RuntimeException: serious problem
        at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1021)
        at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getSplits(OrcInputFormat.java:1048)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
        at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:166)
        at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
        at org.apache.spark.sql.hive.HiveContext$QueryExecution.stringResult(HiveContext.scala:635)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:64)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:311)
        at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:226)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: delta_0000000_0000000 does not start with base_
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:998)
        ... 44 more


<내용>

 - Hive ACID Table의 경우 ORC 포멧을 이용하여 BASE File과 Delta File의 형태로 처리한다.

  (http://icthuman.tistory.com/entry/Apache-hive-transaction 참고)


 - Spark에서 파일을 읽으려고 하는데 최초에는 Base File이 존재하지 않는다.

  (테이블 생성후 insert를 하면 Delta File만 생성이 된다.)


<해결방안>

 - 수동으로 Major Compaction을 수행하면 Base File이 만들어지기 때문에 정상적으로 Spark에서 처리가 가능하다.

 

ALTER TABLE table_name [PARTITION (partition_key = 'partition_value' [, ...])]

  COMPACT 'compaction_type'[AND WAIT]
  [WITH OVERWRITE TBLPROPERTIES ("property"="value" [, ...])];


<참고>

https://issues.apache.org/jira/browse/HIVE-15189

https://issues.apache.org/jira/browse/SPARK-16996






<개요>

Apache Hive는 HDFS에 저장되어 있는 파일데이터를  SQL 기반으로 처리할 수 있도록 하는 오픈소스이다. (모든 SQL을 지원하는 것은 아니며, 파일시스템 특성상 UPDATE, DELETE는 권장하지 않는다. )

그러나 지속적으로 DataWareHouse  트랜잭션 처리에 대한 요구사항이 꾸준히 생겨서 Hive에서도 트랜잭션을 지원하기 위한 기능이 개발되었다.

이에 대해서 내부구조를 간략히 살펴본다. (원문해석 + 개인이해/경험추가 )


<ACID>

database transaction의 4대 속성이라고도 하는데 다음과 같다.

https://ko.wikipedia.org/wiki/ACID


Atomicity(원자성)

트랜잭션과 관련된 작업들이 부분적으로 실행되다가 중단되지 않는 것을 보장하는 능력이다. 예를 들어, 자금 이체는 성공할 수도 실패할 수도 있지만 보내는 쪽에서 돈을 빼 오는 작업만 성공하고 받는 쪽에 돈을 넣는 작업을 실패해서는 안된다. 원자성은 이와 같이 중간 단계까지 실행되고 실패하는 일이 없도록 하는 것

Consistency(일관성)

트랜잭션이 실행을 성공적으로 완료하면 언제나 일관성 있는 데이터베이스 상태로 유지하는 것을 의미한다. 무결성 제약이 모든 계좌는 잔고가 있어야 한다면 이를 위반하는 트랜잭션은 중단

Isolation(고립성)

트랜잭션을 수행 시 다른 트랜잭션의 연산 작업이 끼어들지 못하도록 보장하는 것을 의미한다. 이것은 트랜잭션 밖에 있는 어떤 연산도 중간 단계의 데이터를 볼 수 없음을 의미한다. 은행 관리자는 이체 작업을 하는 도중에 쿼리를 실행하더라도 특정 계좌간 이체하는 양 쪽을 볼 수 없다. 공식적으로 고립성은 트랜잭션 실행내역은 연속적이어야 함을 의미한다. 성능관련 이유로 인해 이 특성은 가장 유연성 있는 제약 조건이다

Durability(지속성)

공적으로 수행된 트랜잭션은 영원히 반영되어야 함을 의미한다. 시스템 문제, DB 일관성 체크 등을 하더라도 유지되어야 함을 의미한다. 전형적으로 모든 트랜잭션은 로그로 남고 시스템 장애 발생 전 상태로 되돌릴 수 있다. 트랜잭션은 로그에 모든 것이 저장된 후에만 commit 상태로 간주될 수 있다.


<Hive-ACID>

Hive 0.13이전에는 원자성, 일관성, 지속성에 대해서는 파티션 레벨로 지원하기 시작했으며, 고립성에 대해서는 Zookeeper나 메모리상에서 지원하는 locking 메커니즘 수준으로 제공되었으나 이제는 row level로는 ACID를 만족시킬 수 있다.

그러나 아직은 제약사항이 좀 있다.


1. 제약사항

- BEGIN, COMMIT, ROLLBACK을 지원하지 않는다. 모두다 auto-commit이다

그렇다 보니 Spring과 연계하여 사용할 때 약간의 불편함이 있다. Spring 에서 지원하는 @JdbcTest 를 사용할 경우 단위테스트 작성시 rollback기능등을 이용하여 각 메소드별 테스트시 데이터정합성이 깨지지 않도록 보장해주는 기능이 있는데 해당 기능이 사용불가능 하다. 



- ORC포멧만 지원한다. 

- Bucket설정이 되어야 한다. 또한 External Table의 경우 compactor가 제어할 수 없기 때문에 ACID테이블로 만들 수 없다.


- non-ACID session에서는 ACID Table에 대한 읽기/쓰기를 할 수 없다. 

  어찌보면 당연하다. ACID를 사용하기 위해서는 org.apache.hadoop.hive.ql.lockmgr.DbTxnManager 로 hive transaction manager를 변경해야 한다.


- Dirty read, read committed, repeatable read, serializable의 isolation level은 지원하지 않는다.

- 기존의 zookeeper나 in-memory 락 메커니즘은 호환불가능



2. 기본 설

처음에 이야기한 것처럼 HDFS는 오직 통으로 움직이는 것을 좋아한다.

그래서 transaction들의 특징을 만족시키기 위해서 아래와 같이 설게되었다.


테이블이나 파티션은 Base 파일의 집합으로 저장하고, 새로운 레코드나 update, delete에 대해서는 Delta 파일로 저장한다. 델타파일들의 집합이 만들어지면 읽는 시점에 합친다.



<Compactor>

ACID를 지원하기 위해서 Meatastore에서 수행되고 있는 background processes들을 compactor라고 한다.


Hive는 Base File과 Delta File을 이용한 Compaction으로 ACID를 지원하기 때문에  Table을 생성할때 


STORED AS ORC 와 

CLUSTERED BY XXX INTO N BUCKETS  를 반드시 추가해야 한다.


apache orc 사이트에 가면 Hive ACID를 어떤 형태로 Support 하고 있는지 상세하게 설명이 되어있다.


<그림 1 : Hive Table>



<그림 2 : Major & Minor compaction>



Delta파일들이 늘어나게 되면 성능을 위해서 compaction을 수행하게 되는데 유형은 다음과 같다.


- minor compaction : Delta파일들을 모아서 Bucket당 하나의 Delta파일로 rewrite한다.

- major compaction : 하나이상의 Delta파일과 Bucket당 Base파일을 가지고 Bucket당 새로운 Base파일로 rewrite한다. 


실제로 YARN 에서 조회하면 UPDATE,DELETE를 수행할때마다 YARN Application이 수행되지는 않으며

주기적으로 수행되는 compactor작업을 확인할 수 있다.



<Transaction/Lock Manager>

추후 상세설명



3. Configuration

Client Side

Server Side (Metastore)



4. Table Properties

ACID를 사용하기 위해서는 (update,delete) "transactional=true" 를 설정해야 한다.

그리고 한번 ACID테이블로 생성하면 되돌릴 수 없으니 주의해야 한다.

또한 hive-sitem.xml등에서 hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager 로 지정해야 하며 이 txManager가 없이는 ACID테이블을 사용할 수 없다.



Example: Set compaction options in TBLPROPERTIES at table level
CREATE TABLE table_name (
  id                int,
  name              string
)
CLUSTERED BY (id) INTO 2 BUCKETS STORED AS ORC
TBLPROPERTIES ("transactional"="true",
  "compactor.mapreduce.map.memory.mb"="2048",     -- specify compaction map job properties
  "compactorthreshold.hive.compactor.delta.num.threshold"="4",  -- trigger minor compaction if there are more than 4 delta directories
  "compactorthreshold.hive.compactor.delta.pct.threshold"="0.5" -- trigger major compaction if the ratio of size of delta files to
                                                                   -- size of base files is greater than 50%
);



그리고 2번에서 설명한 것처럼 compaction으로 ACID를 지원하기 때문에 compator에 대한 많은 상세설정이 있으며 compactor에 대한 설정은 ALTER문을 통해서 변경할 수 있다.


Example: Set compaction options in TBLPROPERTIES at request level
ALTER TABLE table_name COMPACT 'minor' 
   WITH OVERWRITE TBLPROPERTIES ("compactor.mapreduce.map.memory.mb"="3072");  -- specify compaction map job properties
ALTER TABLE table_name COMPACT 'major'
   WITH OVERWRITE TBLPROPERTIES ("tblprops.orc.compress.size"="8192");         -- change any other Hive table properties




<참고사이트>

https://cwiki.apache.org/confluence/display/Hive/Hive+Transactions

https://orc.apache.org/docs/acid.html


Oozie 와의 연동

1. Ambari로 Oozie 설치 후 아래와 같이 설정

- custom oozie-site

    oozie.action.jobinfo.enable = true

    oozie.action.launcher.yarn.timeline-service.enabled = false

    oozie.service.ProxyUserService.proxyuser.root.groups = * 

    oozie.service.ProxyUserService.proxyuser.root.hosts = * 

- Advanced oozie-site

oozie.service.AuthorizationService.security.enabled=false



2. oozie관련 xml을 작성하여 지정된 위치에 업로드

(hdfs://node01:8020/user/john/oozie-sample.xml)


<workflow-app xmlns="uri:oozie:workflow:0.5" xmlns:sla="uri:oozie:sla:0.2" name="tenant_test"> <global> <job-tracker>${resourceManager}</job-tracker> <name-node>${nameNode}</name-node> </global> <start to="hdfs8"/> <action name="hdfs8"> <ssh xmlns="uri:oozie:ssh-action:0.1"> <host>root@node01</host> <command>/home/root/apps/bin/run-oozie-sample-mr.sh</command> <args>${wf:user()}</args> <args>/user/${wf:user()}/work</args> </ssh> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>FAIL</message> </kill> <end name="end"/>

</workflow-app>


3. xml에서 사용할 property 설정

(/home/root/apps/config/example.properties)


# Set the Name Node URI e.g. hdfs://sandbox.hortonworks.com:8020 name.node=hdfs://node01:8020 # Set the Resource Manager URI e.g. sandbox.hortonworks.com:8050 resource.manager=node02:8050 nameNode=${name.node} jobTracker=${resource.manager} oozie.wf.application.path=hdfs://node01:8020/user/john/oozie-sample.xml oozie.use.system.libpath=true


4. oozie 실행

 oozie job -oozie http://node01:11000/oozie -config example.properties -doas john -run


5. 참고

  • user 변경

Oozie client 로 job submit 을 할 때는 user.name 속성이 적용되지 않음(os 계정)
user.name oozie command 옵션 중 -doas <userid> 를 사용

 

  • group 변경

Job submit 시 properties 파일에 oozie.job.acl 설정 추가
예: oozie.job.acl=<groupId>



'BigData' 카테고리의 다른 글

Spark에서 Hive ACID Table 접근시 오류  (0) 2017.10.12
Apache hive - transaction  (0) 2017.09.26
Hadoop Security for Multi tenant #3  (0) 2017.03.24
Hadoop Securiy for Multi tenant #2  (0) 2017.01.20
Hadoop Securiy for Multi tenant #1  (0) 2017.01.17

Livy오픈소스를 활용한 Spark impersonation


Hadoop 설정변경이 필요합니다.

1. Hadoop core-site.xml

a. core-site.xml
<property>
<name>hadoop.proxyuser.centos{계정명}.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.centos{계정명}.groups</name>
<value>*</value>
</property>

b. ambari의 경우 custom core-site로 추가

2. Livy configuration

a. livy.conf
livy.impersonation.enabled = true
livy.server.csrf_protection.enabled=false  
(true로 두고 Post로 실행할 경우, Missing Required Header for CSRF protection에러나면  Headers에 추가 X-Requested-By = ambari )


b. ambari의 경우 Spark > Advanced livy-conf 변경

livy.environment = production

livy.impersonation.enabled = true

livy.server.csrf_protection_enabled = false


3. Rest API - Test

a. POST - http://localhost:8998/sessions

   RequestBody {"kind": "spark", "proxyUser": "john"}


b.POST - http://localhost:8998/sessions/{sessionId}/statements

  RequestBody {"code": "var readMe = sc.textFile(\"/user/john/input-data/sample.csv\"); readMe.take(5);"}

c. GET - htpp://localhost:8998/sessions/{sessionId}/statements



'BigData' 카테고리의 다른 글

Apache hive - transaction  (0) 2017.09.26
Hadoop Security for Multi tenant #4  (0) 2017.04.03
Hadoop Securiy for Multi tenant #2  (0) 2017.01.20
Hadoop Securiy for Multi tenant #1  (0) 2017.01.17
Flume-Kafka-Elasticsearch 테스트  (0) 2016.03.14

Hive, MR Job의 Group별 YARN Queue 사용


0. YARN Queue

LDAP User Group별 Queue 생성


Queue Mappings : g:abiz:abiz,g:adev:adev

설정이 적용되도록  restart

1. Hive 

설치

ambari 활용

Hive 실행 및 데이터 생성

sudo su - hive
[hive@node01 ~]$ hive
hive> create table table1(a int, b int);
hive> insert into table1 values( 1,2);
hive> insert into table1 values( 1,3);
hive> insert into table1 values( 2,4);

 

LDAP 계정으로 Hive 실행


[hive@node01 ~]$ beeline
Beeline version 1.2.1000.2.5.3.0-37 by Apache Hive
beeline> !connect jdbc:hive2://node02:10000/default john
Enter password for jdbc:hive2://node02:10000/default: **** (hive)
Connected to: Apache Hive (version 1.2.1000.2.5.3.0-37)
Driver: Hive JDBC (version 1.2.1000.2.5.3.0-37)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:hive2://node02:10000/default> select sum(a) from table;
INFO : Tez session hasn't been created yet. Opening session
INFO : Dag name: select sum(a) from table(Stage-1)
INFO :
INFO : Status: Running (Executing on YARN cluster with App id application_1484727707431_0001)
INFO : Map 1: -/- Reducer 2: 0/1
INFO : Map 1: 0/1 Reducer 2: 0/1
INFO : Map 1: 0/1 Reducer 2: 0/1
INFO : Map 1: 0(+1)/1 Reducer 2: 0/1
INFO : Map 1: 0/1 Reducer 2: 0/1
INFO : Map 1: 1/1 Reducer 2: 0(+1)/1
INFO : Map 1: 1/1 Reducer 2: 1/1
+------+--+
| _c0 |
+------+--+
| 4 |
+------+--+
1 row selected (23.803 seconds)



2. MR Job (OS계정)

WordCount 실행

hadoop home (/usr/hdp/2.5.3.0-37/hadoop)에 Word Count Example 다운로드 후 압축풀기

[root@node01 hadoop]# unzip Hadoop-WordCount.zip
Archive:  Hadoop-WordCount.zip
   creating: Hadoop-WordCount/
   creating: Hadoop-WordCount/classes/
   creating: Hadoop-WordCount/input/
  inflating: Hadoop-WordCount/input/Word_Count_input.txt 
  inflating: Hadoop-WordCount/WordCount.java 
  inflating: Hadoop-WordCount/clean.sh 
  inflating: Hadoop-WordCount/build.sh 
  inflating: Hadoop-WordCount/classes/WordCount$Reduce.class 
  inflating: Hadoop-WordCount/classes/WordCount.class 
  inflating: Hadoop-WordCount/classes/WordCount$Map.class 
  inflating: Hadoop-WordCount/wordcount.jar

 

adev의 jane으로 실행

[root@node01 Hadoop-WordCount]# su - hdfs
[hdfs@node01 Hadoop-WordCount]$ hadoop fs -mkdir /user/jane
[hdfs@node01 Hadoop-WordCount]$ hadoop fs -chown jane:adev /user/jane
[hdfs@node01 Hadoop-WordCount]$ exit
[root@node01 Hadoop-WordCount]# su jane
[jane@node01 Hadoop-WordCount]$ hadoop fs -put input/ /user/jane/input

 

Word Count jar 실행

[jane@node01 Hadoop-WordCount]$ hadoop jar /usr/hdp/2.5.3.0-37/hadoop/Hadoop-WordCount/wordcount.jar WordCount input output
17/01/19 02:28:04 INFO impl.TimelineClientImpl: Timeline service address: http://node02:8188/ws/v1/timeline/
17/01/19 02:28:04 INFO client.RMProxy: Connecting to ResourceManager at node02/172.31.1.255:8050
17/01/19 02:28:04 INFO client.AHSProxy: Connecting to Application History server at node02/172.31.1.255:10200
17/01/19 02:28:05 INFO input.FileInputFormat: Total input paths to process : 1
17/01/19 02:28:05 INFO mapreduce.JobSubmitter: number of splits:1
17/01/19 02:28:05 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1484790795688_0002
17/01/19 02:28:05 INFO impl.YarnClientImpl: Submitted application application_1484790795688_0002
17/01/19 02:28:05 INFO mapreduce.Job: The url to track the job: http://node02:8088/proxy/application_1484790795688_0002/
17/01/19 02:28:05 INFO mapreduce.Job: Running job: job_1484790795688_0002
17/01/19 02:28:16 INFO mapreduce.Job: Job job_1484790795688_0002 running in uber mode : false
17/01/19 02:28:16 INFO mapreduce.Job:  map 0% reduce 0%
17/01/19 02:28:29 INFO mapreduce.Job:  map 100% reduce 0%
17/01/19 02:28:35 INFO mapreduce.Job:  map 100% reduce 100%
17/01/19 02:28:36 INFO mapreduce.Job: Job job_1484790795688_0002 completed successfully
17/01/19 02:28:36 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=167524
        FILE: Number of bytes written=616439
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=384328
        HDFS: Number of bytes written=120766
        HDFS: Number of read operations=6
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters
        Launched map tasks=1
        Launched reduce tasks=1
        Data-local map tasks=1
        Total time spent by all maps in occupied slots (ms)=10159
        Total time spent by all reduces in occupied slots (ms)=8196
        Total time spent by all map tasks (ms)=10159
        Total time spent by all reduce tasks (ms)=4098
        Total vcore-milliseconds taken by all map tasks=10159
        Total vcore-milliseconds taken by all reduce tasks=4098
        Total megabyte-milliseconds taken by all map tasks=10402816
        Total megabyte-milliseconds taken by all reduce tasks=8392704
    Map-Reduce Framework
        Map input records=9488
        Map output records=67825
        Map output bytes=643386
        Map output materialized bytes=167524
        Input split bytes=121
        Combine input records=67825
        Combine output records=11900
        Reduce input groups=11900
        Reduce shuffle bytes=167524
        Reduce input records=11900
        Reduce output records=11900
        Spilled Records=23800
        Shuffled Maps =1
        Failed Shuffles=0
        Merged Map outputs=1
        GC time elapsed (ms)=144
        CPU time spent (ms)=2950
        Physical memory (bytes) snapshot=1022894080
        Virtual memory (bytes) snapshot=6457335808
        Total committed heap usage (bytes)=858783744
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters
        Bytes Read=384207
    File Output Format Counters
        Bytes Written=120766

 

WordCount 결과

[jane@node01 Hadoop-WordCount]$ hadoop fs -ls /user/jane/
Found 3 items
drwx------   - jane adev          0 2017-01-19 02:28 /user/jane/.staging
drwxr-xr-x   - jane adev          0 2017-01-19 02:17 /user/jane/input
drwxr-xr-x   - jane adev          0 2017-01-19 02:28 /user/jane/output

 


3. MR Job (HADOOP_USER_NAME parameter)

WordCount 실행 

HDFS에 user를 위한 폴더를 생성 후 input을 업로드


 

Word Count jar 실행 (HADOOP_USER_NAME=lucy)


[root@node01 Hadoop-WordCount]# HADOOP_USER_NAME=lucy hadoop jar /usr/hdp/2.5.3.0-37/hadoop/Hadoop-WordCount/wordcount.jar WordCount input output
17/01/19 04:58:54 INFO impl.TimelineClientImpl: Timeline service address: http://node02:8188/ws/v1/timeline/
17/01/19 04:58:54 INFO client.RMProxy: Connecting to ResourceManager at node02/172.31.1.255:8050
17/01/19 04:58:54 INFO client.AHSProxy: Connecting to Application History server at node02/172.31.1.255:10200
17/01/19 04:58:55 INFO input.FileInputFormat: Total input paths to process : 1
17/01/19 04:58:55 INFO mapreduce.JobSubmitter: number of splits:1
17/01/19 04:58:56 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1484800564385_0001
17/01/19 04:58:56 INFO impl.YarnClientImpl: Submitted application application_1484800564385_0001
17/01/19 04:58:56 INFO mapreduce.Job: The url to track the job: http://node02:8088/proxy/application_1484800564385_0001/
17/01/19 04:58:56 INFO mapreduce.Job: Running job: job_1484800564385_0001
17/01/19 04:59:05 INFO mapreduce.Job: Job job_1484800564385_0001 running in uber mode : false
17/01/19 04:59:05 INFO mapreduce.Job:  map 0% reduce 0%
17/01/19 04:59:12 INFO mapreduce.Job:  map 100% reduce 0%
17/01/19 04:59:19 INFO mapreduce.Job:  map 100% reduce 100%
17/01/19 04:59:19 INFO mapreduce.Job: Job job_1484800564385_0001 completed successfully
17/01/19 04:59:19 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=167524
        FILE: Number of bytes written=616439
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=384328
        HDFS: Number of bytes written=120766
        HDFS: Number of read operations=6
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters
        Launched map tasks=1
        Launched reduce tasks=1
        Data-local map tasks=1
        Total time spent by all maps in occupied slots (ms)=5114
        Total time spent by all reduces in occupied slots (ms)=7530
        Total time spent by all map tasks (ms)=5114
        Total time spent by all reduce tasks (ms)=3765
        Total vcore-milliseconds taken by all map tasks=5114
        Total vcore-milliseconds taken by all reduce tasks=3765
        Total megabyte-milliseconds taken by all map tasks=5236736
        Total megabyte-milliseconds taken by all reduce tasks=7710720
    Map-Reduce Framework
        Map input records=9488
        Map output records=67825
        Map output bytes=643386
        Map output materialized bytes=167524
        Input split bytes=121
        Combine input records=67825
        Combine output records=11900
        Reduce input groups=11900
        Reduce shuffle bytes=167524
        Reduce input records=11900
        Reduce output records=11900
        Spilled Records=23800
        Shuffled Maps =1
        Failed Shuffles=0
        Merged Map outputs=1
        GC time elapsed (ms)=148
        CPU time spent (ms)=3220
        Physical memory (bytes) snapshot=1033814016
        Virtual memory (bytes) snapshot=6464356352
        Total committed heap usage (bytes)=833617920
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters
        Bytes Read=384207
    File Output Format Counters
        Bytes Written=120766

 

WordCount 결과

[lucy@node01 Hadoop-WordCount]$ hadoop fs -ls /user/lucy
Found 3 items
drwx------   - lucy adev          0 2017-01-19 04:59 /user/lucy/.staging
drwxr-xr-x   - lucy adev          0 2017-01-19 04:48 /user/lucy/input
drwxr-xr-x   - lucy adev          0 2017-01-19 04:59 /user/lucy/output



'BigData' 카테고리의 다른 글

Hadoop Security for Multi tenant #4  (0) 2017.04.03
Hadoop Security for Multi tenant #3  (0) 2017.03.24
Hadoop Securiy for Multi tenant #1  (0) 2017.01.17
Flume-Kafka-Elasticsearch 테스트  (0) 2016.03.14
Storm특징 및 Spark와의 차이점  (0) 2014.12.12

Hadoop 은 기본적으로 하나의 계정을 이용해서 작업하는 것이 매우 편한 구조이지만, 저장공간 및 병렬처리 Application을 Platform의 형태로 다양한 사용자에게 서비스를 제공하기 위해서는 반드시 Multi tenant가 지원되어야 하며 이를 위해서 스터디를 진행했던 내용으로 정리합니다.



Hadoop은 별도의 group정보를 가지고 있지 않습니다. 따라서 GroupMapping을 위해서 다음과 같은 방법들이 권장되고 있는데 이 중에서 추천되고 있는 LDAP - SSSD 연계방안을 테스트해보았습니다.

http://hadoop.apache.org/docs/r2.7.3/hadoop-project-dist/hadoop-hdfs/HdfsPermissionsGuide.html#Group_Mapping


https://docs.hortonworks.com/HDPDocuments/Ambari-2.2.2.18/bk_ambari-security/content/setting_up_hadoop_group_mappping_for_ldap_ad.html



1. OpenLDAP

사용자가 늘어날 때 마다 매번 OS에 사용자를 만드는 작업은 부담이 될 수 있으며 portal, OS, HDFS등 여러 곳에 사용자 정보가 있을경우 관리Point가 늘어나기 때문에 LDAP으로 저장소를 통일합니다.



- 기본환경 설정

yum update;

yum upgrade;

/etc/init.d/iptables save;

/etc/init.d/iptables stop;

chkconfig iptables off;

yum install -y ntp;

chkconfig --level 3 ntpd on;

service ntpd start;

yum update –y openssl


- OpenLDAP설치

yum install openldap-servers

yum install openldap-clients



- OpenLDAP설정 및 실행

service slapd start 



- 환경변수 설정 OpenLDAP설정

LDAP 설정 참조 


- 사용자/그룹 추가

LDAP 설정 참조 



- Config 및 Entry 조회

ldapsearch -Q -LLL -Y EXTERNAL -H ldapi:/// -b cn=config 

ldapsearch -Q -LLL -Y EXTERNAL -H ldapi:/// -b cn=config dn
ldapsearch -x -D "cn=Manager,dc=company,dc=com " -w admin123 -b dc=company,dc=com


* OS와 연동을 위해서 posixGroup 생성을 추천



2. SSSD 설치 및 LDAP 연동

- 참고사이트


- NSS 설정 (/etc/nsswitch.conf)

passwd:     files sss

group:      files sss


- SSSD 설치 및 설정(/etc/sssd/sssd.conf)

 yum install sssd


[sssd]

services = nss, pam
sbus_timeout = 30
domains = LDAP
 
[nss]
filter_users = root
filter_groups = root
 
[domain/LDAP]
enumerate = true
cache_credentials = TRUE
 
id_provider = ldap
auth_provider = ldap
chpass_provider = ldap
 
ldap_uri = ldap://
ldap_user_search_base = dc=company,dc=com
tls_reqcert = demand
ldap_tls_cacert = /etc/pki/tls/certs/ca-bundle.crt


service sssd start




3. 실제 HDFS 환경 테스트

- UGI 생성 테스트 코드

 public class UGI {

        public static void main(String[] args){
                try {
                        String username = "john";
                        UserGroupInformation ugi
                                        = UserGroupInformation.createRemoteUser(username);
                        ugi.doAs(new PrivilegedExceptionAction<Void>() {
                                public Void run() throws Exception {
                                        Configuration conf = new Configuration();
                                        conf.set("fs.defaultFS""hdfs://host01:8020");
                                        conf.set("hadoop.job.ugi", username);
                                        FileSystem fs = FileSystem.get(conf);
                                        fs.createNewFile(new Path("/user/john/test"));
                                        FileStatus[] status = fs.listStatus(new Path("/user/john"));
                                        for(int i=0;i<status.length;i++){
                                                System.out.println(status[i].getPath());
                                        }
                                        return null;
                                }
                        });
                } catch (Exception e) {
                        e.printStackTrace();
                }
        }
}


- UGI 조회 테스트 코드

 public class UGIRead {

        public static void main(String[] args){
                try {
                        String username = "john";
                        UserGroupInformation ugi
                                        = UserGroupInformation.createRemoteUser(username);
                        ugi.doAs(new PrivilegedExceptionAction<Void>() {
                                public Void run() throws Exception {
                                        Configuration conf = new Configuration();
                                        conf.set("fs.defaultFS""hdfs://host01:8020");
                                        conf.set("hadoop.job.ugi", username);
                                        FileSystem fs = FileSystem.get(conf);
                                        InputStream stream = fs.open(new Path("/user/john/test"));
                    BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
                    System.out.println(reader.readLine());
                                        reader.close();
                    stream.close();
                    fs.close();
                                        return null;
                                }
                        });
                } catch (Exception e) {
                        e.printStackTrace();
                }
        }
}

 

- hadoop fs 명령어 테스트

hadoop fs -ls /user/john

hadoop fs -cat /user/john/test


4. 기타 참고사항
LDAP에 있는 사용자들이 실제 OS는 존재하지 않기 때문에 home 디렉토리가 없는 상태이기 때문에 자동으로 생성해줄수 있도록 패키지 설치 및 환경설정이 필요합니다.

yum install oddjob-mkhomedir oddjob

chkconfig oddjobd on
service oddjobd start
authconfig --enablemkhomedir --update



5. 결과내용
- 프로세스를 기동한 OS계정이 아닌 LDAP에 등록되어 있는 계정으로 HDFS 사용이 가능합니다.
 (Java API 및 hadoop fs 명령어 모두 사용가능)
- Hadoop의 namenode가 판단하기 때문에 sssd는 namenode에 설치되어야 합니다.
- 실제 작업이 수행될 때 YARN Queue에 사용자/그룹별로 queue name을 지정해서 수행해야 할텐데 이부분은 추가 테스트가 필요합니다.
- 사용자 인증에 대한 부분이나 기타 보안사항에 대해서는 추가 보완이 필요합니다.
- 단일 Hadoop Cluster에 대한 multi tenant내용이기 때문에 서로 다른 Hadoop Cluster 연계에 대한 부분은 아닙니다.


참고)

A. createRemoteUser

1. OS에 없는 계정으로 시도 : 오류발생

2. OS에 있는 계정으로 하면 성공  : createRemoteUser API의 인자를 user 로 인식함

 

B. ugi.doAs

- createRemoteUser와 Configuration의 hadoop.job.ugi 가 다를 경우 : 별도 오류는 발생하지 않음

 

- FileSystem.createNewFile 시도시 권한없으면 오류 발생 

 ex) jane 계정으로  /user/hdfs/test22파일생성 시도

  Permission denied: user=jane, access=EXECUTE, inode="/user/hdfs/test22":hdfs:hdfs:drwx------

 

- FileSystem.listStatus 시도시 권한 없으면 오류 발생

 ex) jane 계정으로 /user/hdfs 조회시도

  Permission denied: user=jane, access=READ_EXECUTE, inode="/user/hdfs":hdfs:hdfs:drwx------

 

- 적합한 계정으로 작업할 경우 모두 성공


'BigData' 카테고리의 다른 글

Hadoop Security for Multi tenant #3  (0) 2017.03.24
Hadoop Securiy for Multi tenant #2  (0) 2017.01.20
Flume-Kafka-Elasticsearch 테스트  (0) 2016.03.14
Storm특징 및 Spark와의 차이점  (0) 2014.12.12
CAP 정리  (0) 2014.11.28

Flume으로 입력을 받아서 Elasticsearch에 저장

1. Flume설치 

참고사이트 ( https://flume.apache.org/index.html )

  1. Download
    1. wget http://apache.tt.co.kr/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz
  2. untar
    1. tar -zxvf apache-flume-1.6.0-bin.tar.gz
  3. Source, Sink 유형별 참고
    1. https://flume.apache.org/FlumeUserGuide.html
  4. 진행해본 Example

    1. 간단한 seq발생source로 elasticsearch 적재 (conf/example.conf)

    2. 특정 디렉토리를spooling하고, kafka channel을 사용해서 elasticsearch에 적재하는 잡을 구성해봄! (conf/test.conf)

  5. 환경설정
    1. conf/flume-env.sh : export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
  6. flume 실행 ex)bin/flume-ng agent -n a1 -c conf -f conf/test.conf


참고 : flume에서 사용해야 하는 3rd party lib은 plugins.d 디렉토리 하나만들어서 넣어주면 custom 별로 정리

  (https://flume.apache.org/FlumeUserGuide.html#installing-third-party-plugins

  • eleasticsearch사용시 관련 external jar 추가해야함!


2. kafka 설치

참고사이트 ( http://kafka.apache.org/ )

  1. Download
    1. wget http://apache.mirror.cdnetworks.com/kafka/0.9.0.1/kafka_2.10-0.9.0.1.tgz
  2. unzip
    1. tar -zxvf kafka_2.10-0.9.0.1.tgz
  3.  기동 및 테스트
    1. bin/zookeeper-server-start.sh config/zookeeper.properties
    2. bin/kafka-server-start.sh config/server.properties


3. elasticsearch 설치

참고사이트 ( https://www.elastic.co/ )

  1. Download
    1. wget  https://www.elastic.co/downloads/past-releases/elasticsearch-1-7-5
  2. unzip
    1. unzip elasticsearch-1.7.5.zip
  3. 편리성을 위한 plugin 설치
    1. head(인덱스 및 데이터조회)
      1. bin/plugin -install mobz/elasticsearch-head
    2. bigdesk(노드 및 클러스터 모니터링)
      1. bin/plugin -install lukas-vlcek/bigdesk
  4. 환경설정
    1. config/elasticsearch.yml
      1. cluster.name: elasticsearch
      2. node.name: host01
  5.  기동 및 테스트
    1. bin/elasticsearch
    2. curl -X GET http://localhost:9200/
  6. plugin을 통한확인

    1. head : http://localhost:9200/_plugin/head/
    2. bigdesk : http://localhost:9200/_plugin/bigdesk/
  7. curl을 이용한 데이터 조회방법
    1. https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html
버전 호환성의 문제로 1.7.5로 진행함. 현재 2.2까지 Release


'BigData' 카테고리의 다른 글

Hadoop Securiy for Multi tenant #2  (0) 2017.01.20
Hadoop Securiy for Multi tenant #1  (0) 2017.01.17
Storm특징 및 Spark와의 차이점  (0) 2014.12.12
CAP 정리  (0) 2014.11.28
NoSQL  (0) 2014.11.28

+ Recent posts