간단한 YARN Application 만들기

이 내용은 나중에 참고하기 위해 제가 공부하며 정리한 내용입니다.
의역, 오역, 직역이 있을 수 있음을 알려드립니다.
Original article: Writing YARN Applications
이문서는 아파치 하둡 2.7.2 기준으로 작성되어 있습니다.


Purpose

이 문서는 High level에서 YARN용 Application을 구현하는 방법을 설명합니다.

개념 및 흐름

일반적인 YARN 프로그램은 응용 프로그램을 제출하는 클라이언트(Driver 클래스)가 YARN ResourceManager(RM)에 응용 프로그램을 제출하는 형식입니다. 이 작업은 YarnClient object를 설정하여 수행 할 수 있습니다.
YarnClient가 시작되면 클라이언트는 응용 프로그램 컨텍스트를 설정하고, ApplicationMaster(AM)가 포함 된 응용 프로그램의 첫 번째 컨테이너를 준비한 후 응용 프로그램을 제출할 수 있습니다.

응용 프로그램을 실행하는 데 필요한 로컬 file/jar, 실행 해야하는 실제 명령 (필요한 command line argument 사용), 모든 OS 환경 설정 (선택 사항) 등의 정보를 제공해야합니다.또한 ApplicationMaster를 위해 필요한 유닉스 프로세스를 효과적으로 기술 할 필요가 있습니다.

YARN ResourceManager는 할당 된 컨테이너에서 지정한대로 ApplicationMaster를 시작합니다. 실행된 ApplicationMaster는 YARN 클러스터와 통신하고 응용 프로그램 실행을 처리합니다. 이러한 작업은 Asynchronous 방식으로 수행됩니다.

응용 프로그램 시작 시간 동안 ApplicationMaster의 주요 작업은 다음과 같습니다.
a) 향후 컨테이너에 대한 자원을 협상하고 할당하기 위해 ResourceManager와 통신하고,
b) 컨테이너 할당 후 YARN NodeManager (NMs)와 통신하여 응용 프로그램 컨테이너를 시작합니다 .

a)는 AMRMClientAsyncCallbackHandler 유형의 Event Handler에 지정된 이벤트 핸들링 메소드로 AMRMClientAsync 오브젝트를 통해 비동기 적으로 수행 할 수 있습니다. Event Handler는 클라이언트에 명시적으로 설정해야합니다.

b)는 컨테이너가 할당 될 때 컨테이너를 시작하는 실행 가능한 object를 실행함으로써 수행 될 수 있습니다. 이 컨테이너를 시작하는 과정에서 AM은 command line argument, 환경 등과 같은 실행 정보가있는 ContainerLaunchContext를 지정해야합니다.

응용 프로그램 실행 중에 ApplicationMaster는 NMClientAsync 객체를 통해 NodeManager와 통신합니다. 모든 컨테이너 이벤트는 NMClientAsync와 연결된 NMClientAsync.CallbackHandler에 의해 처리됩니다. 일반적인 Callback handler는 클라이언트 시작, 중지, 상태 업데이트 및 오류를 처리합니다. 또한 ApplicationMaster는 AMRMClientAsync.CallbackHandlergetProgress() 메소드를 호출하여 ResourceManager에 실행 진행률을 보고합니다.

Asynchronous 클라이언트 외에도 특정 워크 플로 (AMRMClient 및 NMClient)에 대한 Synchronous 버전이 있습니다. Asynchronous 클라이언트는 (주관적으로) 보다 간단한 사용법 때문에 권장되며 이 문서에서는 주로 Asynchronous 클라이언트에 대해 다룹니다. Synchronous 클라이언트에 대한 자세한 내용은 AMRMClientNMClient를 참조하십시오.

Interfaces

다음은 중요한 인터페이스입니다.

Client <–> ResourceManager
YarnClient 객체를 사용합니다.

ApplicationMaster <–> ResourceManager
AMRMClientAsync object를사용하여 AMRMClientAsync.CallbackHandler에 의해 이벤트를 비동기적으로 처리합니다.

ApplicationMaster <–> NodeManager
컨테이너들을 실행 합니다. NMClientAsync object를 사용하여 노드 관리자와 통신하고 NMClientAsync.CallbackHandler에 의해 컨테이너 이벤트를 처리합니다.

YARN 응용 프로그램 (ApplicationClientProtocol, ApplicationMasterProtocol 및 ContainerManagementProtocol)의 세가지 주요 프로토콜은 여전히 유효합니다. 세가지 클라이언트는 YARN 응용 프로그램에 대한보다 단순한 프로그래밍 모델을 제공하기 위해 이 세가지 프로토콜을 래핑합니다.

매우 드문 경우지만 프로그래머는 응용 프로그램을 구현하기 위해 3개의 프로토콜을 직접 사용할 수 있습니다. 그러나 일반적인 사용 사례에서는 이러한 동작이 더 이상 권장되지 않습니다.

간단한 Yarn Application 작성

Client 작성

클라이언트가 해야 할 첫 번째 단계는 YarnClient를 초기화하고 시작하는 것입니다.

1
2
3
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
yarnClient.start();

일단 클라이언트가 설정되면 클라이언트는 application을 작성하고 application ID를 가져와야합니다.

1
2
YarnClientApplication app = yarnClient.createApplication();
GetNewApplicationResponse appResponse = app.getNewApplicationResponse();

새 응용 프로그램에 대한 YarnClientApplication의 응답에는 클러스터의 최소/최대 자원 기능과 같은 클러스터에 대한 정보도 들어 있습니다. 이는 ApplicationMaster가 시작될 컨테이너의 스펙을 올바르게 설정할 수 있도록하기 위해 필요합니다. 자세한 내용은 GetNewApplicationResponse를 참조하십시오.

클라이언트의 주된 요지는 RM이 AM을 시작하는 데 필요한 모든 정보를 정의하는 ApplicationSubmissionContext를 설정하는 것입니다. 클라이언트는 컨텍스트에 다음을 설정해야합니다.

  • Application 정보 : ID, 이름
  • Queue, Priority 정보 : 응용 프로그램이 전송 될 대기열, 응용 프로그램에 할당 될 우선 순위입니다.
  • User : 응용 프로그램을 제출하는 사용자
  • ContainerLaunchContext : AM이 시작되고 실행될 컨테이너를 정의하는 정보입니다. 이전에 언급했듯이 ContainerLaunchContext는 로컬 리소스 (바이너리, jar, 파일 등), 환경 설정 (CLASSPATH 등), 실행할 명령 및 보안 토큰 (RECT)과 같은 응용 프로그램을 실행하는 데 필요한 모든 필수 정보를 정의합니다. ).

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    // application 제출 컨텍스트 설정
    ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
    ApplicationId appId = appContext.getApplicationId();
    appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
    appContext.setApplicationName(appName);
    // Application Master에 대한 로컬 리소스 설정.
    // 필요한 경우 로컬 파일 또는 아카이브.
    // 이 시나리오에서 Application Master의 jar 파일은 로컬 자원의 일부.
    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
    LOG.info("Copy App Master jar from local filesystem and add to local environment");
    // Application Master jar파일을 hdfs filesystem으로 복사
    // 대상 jar 경로를 가리 키도록 로컬 리소스를 생성.
    FileSystem fs = FileSystem.get(conf);
    addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.toString(), localResources, null);
    // 필요한 경우 log4j properties를 설정
    if (!log4jPropFile.isEmpty()) {
    addToLocalResources(fs, log4jPropFile, log4jPath, appId.toString(), localResources, null);
    }
    // 쉘 스크립트는 실행될 최종 컨테이너에서 사용할 수 있어야 합니다.
    // 이를 위해서는 우선 yarn 프레임 워크에서 볼 수있는 파일 시스템에 복사해야합니다.
    // Shell 스크립트는 Application Master에서 필요하지 않으므로 Application Master를 위해 로컬 리소스로 설정할 필요가 없습니다.
    String hdfsShellScriptLocation = "";
    long hdfsShellScriptLen = 0;
    long hdfsShellScriptTimestamp = 0;
    if (!shellScriptPath.isEmpty()) {
    Path shellSrc = new Path(shellScriptPath);
    String shellPathSuffix = appName + "/" + appId.toString() + "/" + SCRIPT_PATH;
    Path shellDst = new Path(fs.getHomeDirectory(), shellPathSuffix);
    fs.copyFromLocalFile(false, true, shellSrc, shellDst);
    hdfsShellScriptLocation = shellDst.toUri().toString();
    FileStatus shellFileStatus = fs.getFileStatus(shellDst);
    hdfsShellScriptLen = shellFileStatus.getLen();
    hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();
    }
    if (!shellCommand.isEmpty()) {
    addToLocalResources(fs, null, shellCommandPath, appId.toString(), localResources, shellCommand);
    }
    if (shellArgs.length > 0) {
    addToLocalResources(fs, null, shellArgsPath, appId.toString(), localResources, StringUtils.join(shellArgs, " "));
    }
    // env 변수를 설정하여 Application Master가 실행될 env에 설정합니다.
    LOG.info("Set the environment for the application master");
    Map<String, String> env = new HashMap<String, String>();
    // shell 스크립트의 위치를 env에 넣는다.
    // env info를 사용하여 Application Master는 Shell 스크립트를 실행하기 위해 실행될 최종 컨테이너에 대한 올바른 로컬 리소스를 만듭니다
    env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
    env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
    env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));
    // AppMaster.jar 위치를 클래스 경로에 추가.
    // 어떤 점에서 우리는 env에 특정 hadoop 클래스 경로를 추가 할 필요가 없습니다.
    // 이제 classpath를 포함하여 필요한 모든 classpath를 "."로 설정하십시오.
    StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$())
    .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
    for (String c : conf.getStrings(
    YarnConfiguration.YARN_APPLICATION_CLASSPATH,
    YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
    classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
    classPathEnv.append(c.trim());
    }
    classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./log4j.properties");
    // AM을 실행하는 데 필요한 명령어를 설정
    Vector<CharSequence> vargs = new Vector<CharSequence>(30);
    // Java 실행명렁어 설정
    LOG.info("Setting up app master command");
    vargs.add(Environment.JAVA_HOME.$$() + "/bin/java");
    // Xmx 메모리 설정
    vargs.add("-Xmx" + amMemory + "m");
    // Set class name
    vargs.add(appMasterMainClass);
    // AM paramter 설정
    vargs.add("--container_memory " + String.valueOf(containerMemory));
    vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
    vargs.add("--num_containers " + String.valueOf(numContainers));
    vargs.add("--priority " + String.valueOf(shellCmdPriority));
    for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
    vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
    }
    if (debugFlag) {
    vargs.add("--debug");
    }
    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
    // 최종 명령어 생성
    StringBuilder command = new StringBuilder();
    for (CharSequence str : vargs) {
    command.append(str).append(" ");
    }
    LOG.info("Completed setting up app master command " + command.toString());
    List<String> commands = new ArrayList<String>();
    commands.add(command.toString());
    // AM을 위한 Container launch 컨텍스트 설정
    ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(localResources, env, commands, null, null, null);
    // 리소스 유형 요구사항 설정
    // 이제는 메모리와 vCore가 모두 지원되므로 메모리 및 vCore 요구 사항을 설정.
    Resource capability = Resource.newInstance(amMemory, amVCores);
    appContext.setResource(capability);
    // 서비스 데이터는 응용 프로그램에 전달 될 수있는 binary BLOB입니다.
    // 이 시나리오에서는 필요 없음
    // amContainer.setServiceData(serviceData);
    // Setup security tokens
    if (UserGroupInformation.isSecurityEnabled()) {
    // Note: Credentials 클래스는 HDFS 및 MapReduce에 대해 LimitedPrivate로 표시됩니다.
    Credentials credentials = new Credentials();
    String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
    if (tokenRenewer == null | | tokenRenewer.length() == 0) {
    throw new IOException("Can't get Master Kerberos principal for the RM to use as renewer");
    }
    // 지금은 기본 파일 시스템에 대한 토큰만 얻습니다.
    final Token<?> tokens[] = fs.addDelegationTokens(tokenRenewer, credentials);
    if (tokens != null) {
    for (Token<?> token : tokens) {
    LOG.info("Got dt for " + fs.getUri() + "; " + token);
    }
    }
    DataOutputBuffer dob = new DataOutputBuffer();
    credentials.writeTokenStorageToStream(dob);
    ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
    amContainer.setTokens(fsTokens);
    }
    appContext.setAMContainerSpec(amContainer);

설정 프로세스가 완료되면 클라이언트는 지정된 우선 순위 및 대기열을 사용하여 응용 프로그램을 제출할 준비가됩니다.

1
2
3
4
5
6
7
8
9
10
11
// AM의 priority 설정
Priority pri = Priority.newInstance(amPriority);
appContext.setPriority(pri);
// RM에서 application이 제출 될 대기열을 설정.
appContext.setQueue(amQueue);
// Application Mansger에 application 제출
// SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
yarnClient.submitApplication(appContext);

이 시점에서 RM은 Application을 수락하고 백그라운드에서 필요한 사양의 컨테이너를 할당 한 다음, 할당 된 컨테이너에서 AM을 설정하고 시작하는 프로세스를 진행합니다.
클라이언트가 실제 작업의 진행 상황을 추적 할 수 있는 여러 가지 방법이 있습니다.
RM과 통신하여 YarnClient의 getApplicationReport() 메소드를 통해 애플리케이션의 보고서를 요청할 수 있습니다.

1
2
// Get application report for the appId we are interested in
ApplicationReport report = yarnClient.getApplicationReport(appId);

RM으로 부터 받은 ApplicationReport는 아래와 같이 구성되어 있습니다.

  • 일반 응용 프로그램 정보 : 응용 프로그램 ID, 응용 프로그램이 제출 된 큐, 응용 프로그램을 제출 한 사용자 및 응용 프로그램의 시작 시간.
  • ApplicationMaster 상세 정보 : AM이 실행되는 호스트, 클라이언트의 요청을 수신하는 rpc 포트 (있는 경우) 및 클라이언트가 AM과 통신해야하는 토큰.
  • 응용 프로그램 추적 정보 : 응용 프로그램에서 진행 추적의 일부 양식을 지원하는 경우 클라이언트가 진행 상황을 모니터링하기 위해 볼 수있는 ApplicationReport의 getTrackingUrl() 메서드를 통해 사용할 수있는 추적 URL을 설정할 수 있습니다.
  • 응용 프로그램 상태 : ResourceManager에서 볼 수있는 응용 프로그램 상태는 ApplicationReport.getYarnApplicationState를 통해 사용할 수 있습니다. YarnApplicationState가 FINISHED로 설정된 경우 클라이언트는 ApplicationReport.getFinalApplicationStatus를 참조하여 응용 프로그램 작업 자체의 실제 성공/실패를 확인해야합니다. 실패한 경우 ApplicationReport.getDiagnostics가 실패에 대해 더 많은 정보를 제공하는 데 유용 할 수 있습니다.
  • ApplicationMaster가 지원하면 클라이언트는 응용 프로그램 보고서에서 얻은 host:rpcport 정보를 통해 AM 자체에 진행 상태 업데이트를 직접 쿼리 할 수 있습니다. 가능한 경우 보고서에서 얻은 추적 URL을 사용할 수도 있습니다.

특정 상황에서 응용 프로그램이 너무 오래 걸리거나 다른 요인으로 인해 클라이언트가 응용 프로그램을 종료하려고 할 수 있습니다. YarnClient는 클라이언트가 ResourceManager를 통해 AM에 kill 신호를 보낼 수있게하는 killApplication 호출을 지원합니다. 그렇게 설계된 경우 ApplicationMaster는 클라이언트가 활용할 수있는 rpc 계층을 통해 중단 호출을 지원할 수도 있습니다.

1
yarnClient.killApplication(appId);

ApplicationMaster (AM) 작성

AM은 실제 작업 소유자입니다. AM은 RM에 의해 시작되며 클라이언트를 통해 AM 감독 및 완료 임무가 주어진 모든 필요한 정보와 자원을 제공 받게됩니다.
멀티 테넌시 특성을 고려할 때 다른 컨테이너와 물리적 호스트를 공유 할 수있는 컨테이너 내에서 AM이 시작될 때 미리 설정된된 포트을 listen 할수 있다는 것과 같은 가정은 할 수 없습니다.

AM이 시작되면 Environment를 통해 여러 parameter를 사용할 수 있습니다. 여기에는 AM 컨테이너의 ContainerId, 응용 프로그램 제출 시간 및 ApplicationMaster를 실행하는 NM(NodeManager) 호스트에 대한 세부 정보가 포함됩니다. ApplicationConstants는 parameter 이름을 참조합니다.

RM과의 모든 상호 작용에는 ApplicationAttemptId가 필요합니다 (실패한 경우 응용 프로그램마다 여러 번 시도 할 수 있음). ApplicationAttemptId는 AM의 컨테이너 ID에서 가져올 수 있습니다. Environment에서 얻은 값을 object로 변환하는 helper API가 있습니다.

1
2
3
4
5
6
7
8
Map<String, String> envs = System.getenv();
String containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV);
if (containerIdString == null) {
// 컨테이너 ID는 항상 프레임 워크에 의해 env에 설정됩니다.
throw new IllegalArgumentException("ContainerId not set in the environment");
}
ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();

AM이 완전히 초기화 된 후에 ResourceManager와 NodeManager에서 두개의 클라이언트를 시작할 수 있습니다. 여기에서는 커스터마이징 된 Event handler를 사용하여 설정하고 이 Event handler는 나중에 대해 자세히 설명합니다.

1
2
3
4
5
6
7
8
9
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
amRMClient.init(conf);
amRMClient.start();
containerListener = createNMCallbackHandler();
nmClientAsync = new NMClientAsyncImpl(containerListener);
nmClientAsync.init(conf);
nmClientAsync.start();

AM은 AM이 살아 있고 여전히 실행 중임을 알리기 위해 RM에게 하트비트를 Emit 해야합니다. RM의 timeout 만료는 YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS를 통해 액세스 할 수있는 구성 설정에 의해 정의되며 기본값은 YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS로 정의됩니다. ApplicationMaster는 하트비트를 시작하기 위해 ResourceManager에 등록해야합니다.

1
2
3
4
// RM에 자신을 등록
// 이렇게 하면 RM에 heartbeat을 emit 할수 있습니다.
appMasterHostname = NetUtils.getHostname();
RegisterApplicationMasterResponse response = amRMClient.registerApplicationMaster(appMasterHostname, appMasterRpcPort, appMasterTrackingUrl);

등록 응답에 포함 된 경우 최대 리소스를 확인할수 있습니다. 그리고 이를 사용하여 응용 프로그램의 요청을 확인할 수도 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 자원 관리자가 볼 수있는 클러스터 기능(capability)에 대한 정보를 덤프합니다.
int maxMem = response.getMaximumResourceCapability().getMemory();
LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores);
// 리소스 요청은 최대 값을 초과 할 수 없습니다.
if (containerMemory > maxMem) {
LOG.info("Container memory specified above max threshold of cluster."
+ " Using max value." + ", specified=" + containerMemory + ", max=" + maxMem);
containerMemory = maxMem;
}
if (containerVirtualCores > maxVCores) {
LOG.info("Container virtual cores specified above max threshold of cluster."
+ " Using max value." + ", specified=" + containerVirtualCores + ", max=" + maxVCores);
containerVirtualCores = maxVCores;
}
List<Container> previousAMRunningContainers = response.getContainersFromPreviousAttempts();
LOG.info("Received " + previousAMRunningContainers.size() + " previous AM's running containers on AM registration.");

작업 요구 사항에 따라 AM은 해당 작업을 실행할 컨테이너 세트를 요청할 수 있습니다. 이제 얼마나 많은 컨테이너가 필요한지 계산할 수 있고, 계산된 컨테이너를 요청할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
List<Container> previousAMRunningContainers = response.getContainersFromPreviousAttempts();
List<Container> previousAMRunningContainers = response.getContainersFromPreviousAttempts();
LOG.info("Received " + previousAMRunningContainers.size() + " previous AM's running containers on AM registration.");
int numTotalContainersToRequest = numTotalContainers - previousAMRunningContainers.size();
// RM에게 컨테이너를 위한 SETUP 문의 & Request 전송
// 할당량이 완전히 할당 될 때까지 컨테이너에 대한 RM 폴링을 계속
// 모든 컨테이너가 시작되고 쉘 스크립트의 성공/실패 여부와 관계없이 실행될 때까지 루핑.
for (int i = 0; i < numTotalContainersToRequest; ++i) {
ContainerRequest containerAsk = setupContainerAskForRM();
amRMClient.addContainerRequest(containerAsk);
}

setupContainerAskForRM()에서 다음 두 가지를 설정해야합니다.

Resource capability : 현재 YARN은 메모리 기반 리소스 요구 사항을 지원하므로 요청에서 필요한 메모리 양을 정의해야합니다. 이 값은 MB 단위로 정의되며 클러스터의 최대 용량보다 작아야하며 최소 용량의 정확한 배수 여야합니다. 메모리 자원은 타스크 컨테이너에 부과 된 실제 메모리 한계에 해당합니다. 또한 코드에 표시된 것처럼 Computation based resource(vCore)도 지원합니다.

Priority : 컨테이너 집합을 요구할 때, AM은 각 집합에 대해 다른 우선 순위를 정의 할 수 있습니다. 예를 들어 Map-Reduce AM은 Map Task에 필요한 컨테이너에 우선 순위를 높이고 Reduce Task 컨테이너에 대해서는 우선 순위를 낮출수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private ContainerRequest setupContainerAskForRM() {
// host들에 대한 요구사항을 setup
// 분산 Shell Application에서 *를 이용하면 어떤 host든 가능
// 요청의 우선순위를 설정
Priority pri = Priority.newInstance(requestPriority);
// 리소스 타입 요구사항 설정
// 이제는, 메모리와 CPU가 지원되므로 메모리와 CPU 요구 사항을 설정합니다.
Resource capability = Resource.newInstance(containerMemory, containerVirtualCores);
ContainerRequest request = new ContainerRequest(capability, null, null, pri);
LOG.info("Requested container ask: " + request.toString());
return request;
}

컨테이너 할당 요청이 Application manager에게 보내진 후 컨테이너는 AMRMClientAsync 클라이언트의 Event handler에 의해 Asynchronous하게 시작됩니다. Handler는 AMRMClientAsync.CallbackHandler 인터페이스를 구현 해야합니다.
할당 된 컨테이너가있을 때 Handler는 컨테이너를 시작하는 코드를 실행하는 쓰레드를 설정합니다. 이 글에서는 LaunchContainerRunnable이라는 이름을 사용하여 설명합니다. 이 글의 다음 부분에서 LaunchContainerRunnable 클래스에 대해 설명 할 예정입니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public void onContainersAllocated(List<Container> allocatedContainers) {
LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size());
numAllocatedContainers.addAndGet(allocatedContainers.size());
for (Container allocatedContainer : allocatedContainers) {
LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(allocatedContainer, containerListener);
Thread launchThread = new Thread(runnableLaunchContainer);
// 따로 유지할 별도의 스레드에서 컨테이너를 실행하고 시작합니다.
// main thread가 unblocked 됩니다.
// 모든 컨테이너는 한 번에 할당되지 않을 수 있습니다.
launchThreads.add(launchThread);
launchThread.start();
}
}

Event handler는 heartbeat에서 프로그램의 진행 상황을 보고합니다.

1
2
3
4
5
6
@Override
public float getProgress() {
// 다음 heartbeat에서 RM에 전달할 진행률 설정
float progress = (float) numCompletedContainers.get()/numTotalContainers;
return progress;
}

컨테이너 실행 스레드는 실제로 NM에서 컨테이너를 시작합니다. 컨테이너가 AM에 할당 된 후에는 할당 된 컨테이너에서 실행될 최종 작업에 대해 ContainerLaunchContext를 설정할 때 클라이언트가 수행 한 것과 비슷한 프로세스를 따라야합니다. ContainerLaunchContext가 정의되면 AM은 NMClientAsync를 통해 시작할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 할당 된 컨테이너에서 실행하기 위해 필요한 명령을 설정
Vector<CharSequence> vargs = new Vector<CharSequence>(5);
// 실행 가능한 command를 설정
vargs.add(shellCommand);
// shell script path 설정
if (!scriptPath.isEmpty()) {
vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath : ExecShellStringPath);
}
// shell command의 args를 설정(있는 경우)
vargs.add(shellArgs);
// log redirect params 추가
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
// 최종 commmand 생성
StringBuilder command = new StringBuilder();
for (CharSequence str : vargs) {
command.append(str).append(" ");
}
List<String> commands = new ArrayList<String>();
commands.add(command.toString());
// ContainerLaunchContext를 설정하고 생성자에 대한 로컬 리소스, 환경, 명령 및 토큰을 설정합니다.
// Note for tokens: 컨테이너에 토큰을 설정하십시오.
// 오늘날 일반적인 셸 명령의 경우 distribute-shell의 컨테이너에는 토큰이 필요하지 않습니다.
// NodeManagers에서 distributed file-system의 모든 파일을 다운로드 할 수 있도록 토큰을 설정합니다.
// 토큰은 분산 쉘 내부에서 예를 들어, "hadoop dfs"명령을 실행하는 경우에 유용합니다.
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
localResources, shellEnv, commands, null, allTokens.duplicate(), null);
containerListener.addContainer(container.getId(), container);
nmClientAsync.startContainerAsync(container, ctx);

NMClientAsync object는 Event handler와 함께 컨테이너 시작, 중지, 상태 업데이트, 오류 발생등의 컨테이너 이벤트를 처리합니다.
ApplicationMaster가 작업이 완료되었다고 판단하면, AM-RM 클라이언트를 통해 자체 등록을 취소하고 클라이언트를 중지 해야합니다.

1
2
3
4
5
6
7
8
try {
amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
} catch (YarnException ex) {
LOG.error("Failed to unregister application", ex);
} catch (IOException e) {
LOG.error("Failed to unregister application", e);
}
amRMClient.stop();

FAQ

응용 프로그램의 jar 파일을 필요로하는 YARN 클러스터의 모든 노드에 배포하려면 어떻게합니까?

LocalResource를 사용하여 Application request에 리소스를 추가 할 수 있습니다. 그러면 YARN이 리소스를 ApplicationMaster 노드에 배포하게됩니다. 리소스가 tgz, zip 또는 jar 인 경우 YARN에 압축을 풀 수 있습니다. 그런 다음 압축 해제 된 폴더를 클래스 경로에 추가하기 만하면됩니다. 예를 들어, Application request를 만들때 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
File packageFile = new File(packagePath);
Url packageUrl = ConverterUtils.getYarnUrlFromPath(FileContext.getFileContext.makeQualified(new Path(packagePath)));
packageResource.setResource(packageUrl);
packageResource.setSize(packageFile.length());
packageResource.setTimestamp(packageFile.lastModified());
packageResource.setType(LocalResourceType.ARCHIVE);
packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
resource.setMemory(memory);
containerCtx.setResource(resource);
containerCtx.setCommands(ImmutableList.of(
"java -cp './package/*' some.class.to.Run "
+ "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout "
+ "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"));
containerCtx.setLocalResources(Collections.singletonMap("package", packageResource));
appCtx.setApplicationId(appId);
appCtx.setUser(user.getShortUserName);
appCtx.setAMContainerSpec(containerCtx);
yarnClient.submitApplication(appCtx);

위에 보시다시피, setLocalResources는 리소스의 이름을 map으로 가져옵니다. 이 이름은 application의 현재 폴더 (pwd)에서 system link가 되기 때문에 ./package/*를 사용하여 참조 가능합니다.

Note:Java의 classpath (cp) 인수는 매우 민감합니다. 정확한 구문을 정확하게 입력하십시오.

패키지가 AM에 배포되면 AM이 새 컨테이너를 시작할 때마다 동일한 프로세스를 수행해야합니다 (리소스를 컨테이너로 보내려는 경우). 이경우에도 코드는 동일합니다. 컨테이너 ctx와 함께 리소스 URL을 보낼 수 있도록 AM에 패키지 경로 (HDFS 또는 로컬)를 제공하면됩니다.

ApplicationMaster의 ApplicationAttemptId는 어떻게 가져올수 있나요?

ApplicationAttemptId는 Environment를 통해 AM에 전달되며 Environment의 값은 ConverterUtils 도우미 함수를 통해 ApplicationAttemptId object로 변환 할수 있습니다.

왜 내 컨테이너를 NodeManager가 죽이나요?

이는 요청한 컨테이너 메모리 크기를 초과하는 높은 메모리 사용량 때문일 수 있습니다.
이 문제를 일으킬 수있는 여러 가지 이유가 있습니다. 먼저 NodeManager가 컨테이너를 죽일 때 덤프하는 프로세스 트리를 살펴보십시오. 관심있는 두 가지 사항은 실제 메모리와 가상 메모리입니다. 실제 메모리 제한을 초과하면 앱이 너무 많은 실제 메모리를 사용하고 있습니다. Java 응용 프로그램을 실행중인 경우 -hprof를 사용하여 힙에서 차지하는 공간을 확인할 수 있습니다. 가상 메모리를 초과 한 경우에는 클러스터 전체의 구성 변수 인 yarn.nodemanager.vmem-pmem-ratio의 값을 늘려야 할 수 있습니다.

어떻게 native library들을 포함시킬수 있나요?

컨테이너를 실행할때 command에서 -Djava.library.path를 설정하면 Hadoop이 사용하는 기본 라이브러리가 올바르게 로드되지 않고 오류가 발생할 수 있습니다. 대신 LD_LIBRARY_PATH를 사용하는 것이 더 깔끔합니다.

Sample Code

공유하기