大数据技术 – MapReduce 应用的配置和单元测试

Java基础

浏览数:216

2019-8-20

上一章的 MapReduce 应用中,我们使用了自定义配置,并用 GenericOptionsParser 处理命令行输入的配置,这种方式简单粗暴。但不是 MapReduce 应用常见的写法,本章第一部分将介绍 MapReduce 应用常见的写法,并详细介绍自定义配置以及命令行选项,通过自定义配置我们可以灵活的控制 MapReduce 应用而不需要修改代码并打包。第二部分将介绍开发 MapReduce 应用的单元测试,单元测试的重要性不言而喻,是每个程序员必备技能。

带有自定义配置的 MapReduce 应用程序更常见的写法是实现 Tool 接口, 通过 ToolRunner 来运行应用程序。ToolRunner 内部会调用 GenericOptionsParser 类处理命令行选项,包括自定义配置。代码如下:

package com.cnblogs.duma.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCountDriver extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {
//        Configuration conf = new Configuration();
//        GenericOptionsParser optionParser = new GenericOptionsParser(conf, args); //识别命令行参数中的自定义配置
//        String[] remainingArgs = optionParser.getRemainingArgs(); //获取处理自定义配置外的其他参数
        Job job = Job.getInstance(getConf(), "WordCount"); //第二参数为程序的名字
        job.setJarByClass(getClass()); //需要设置类名

        job.setMapperClass(WordCountMapper.class); //设置 map 任务的类
//        job.setCombinerClass(WordCountReducer.class);
        job.setReducerClass(WordCountReducer.class); // 设置 reduce 任务的类

        job.setOutputKeyClass(Text.class);  //设置输出的 key 类型
        job.setOutputValueClass(IntWritable.class); //设置输出的 value 类型

        FileInputFormat.addInputPath(job, new Path(args[0])); //增加输入文件
        FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置输出目录

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new WordCountDriver(), args); //内部调用 GenericOptionsParser 处理配置
        System.exit(exitCode);
    }
}

在本例子中我们通过 ToolRunner 的 run 方法启动应用程序, run 方法第一个参数就是 Tool 类型的对象, 因此我们在驱动程序实现 Tool 接口,并实现了 Tool 接口的 run 方法, 该方法的处理跟之前版本的驱动程序 main 方法处理的逻辑一样,区别是不需要我们自己初始化 Configuration,也不需要我们自己用 GenericOptionsParser 对象处理配置,只需要调用 getConf 方法即可。getConf 方法是 Configurable 接口中定义的方法,Tool 继承了该接口,因此驱动程序中应该有 getConf 方法的实现, 为什么我们的驱动程序没有实现 getConf 方法呢? 因为我们的驱动程序继承了 Configured 类, 该类实现了 Configurable 接口,因此驱动程序的 getConf 方法是直接从 Configured 类继承过来的。我们可以看下 ToolRunner.run 方法做了什么处理,核心代码如下:

public static int run(Configuration conf, Tool tool, String[] args) throws Exception {
        if (CallerContext.getCurrent() == null) {
            CallerContext ctx = (new Builder("CLI")).build();
            CallerContext.setCurrent(ctx);
        }

        if (conf == null) {
            conf = new Configuration();
        }

        GenericOptionsParser parser = new GenericOptionsParser(conf, args);
        tool.setConf(conf);
        String[] toolArgs = parser.getRemainingArgs();
        return tool.run(toolArgs);
    }

可以看到 run 方法里面的处理逻辑跟我们之前自己处理的逻辑是一样的, 都是用 GenericOptionsParser 对象里的方法处理。代码最后调用 tool 对象的 run 方法, 那自然调用的就是驱动程序的 run 方法。我们之前添加配置通过命令行 -D 选项来实现, 除了 -D Tool 还支持以下的命令行选项:

选项名称 描述      
-D <property>=<value> 使用 key -value 方式指定配置
-conf <configuration file> 指定配置文件,格式如 core-site.xml
-fs <file:///> or hdfs://namenode:port 指定文件系统URL,覆盖之前配置的 fs.defaulFS
-files <逗号分隔的文件列表> 指定任务用到的本地文件, 如果多个用逗号分隔
-archive <逗号分隔的档案文件> 指定任务用到的档案文件,如果多个用逗号分隔
-libjars <逗号分隔的jar文件> 指定本地文件jar文件,会把他们加入到 MapReduce 任务的calsspath,如果任务用到第三方jar报可以用该选项

为 MapReduce 应用指定配置需要注意优先级, -conf 选项如果有多个配置文件且多个文件中有相同的配置的 key ,则后面的配置文件的值会覆盖前面的配置文件, 除非该配置的 final 标签的值为 true,表示不能被覆盖。其次需要注意 -D 配置的优先级高于配置文件。

以上便是开发 MapReduce 需要用到的配置和命令行选项相关的知识,接下来我们看看 Hadoop中如何编写单元测试。我之前开发 Hadoop 的时候还没出现 MapReduce 的单元测试框架,并且开发 MapReduce 应用基本都在 Windows 环境下开发,所以测试非常不方便,记得之前网上有一个工具可以在 Windows 平台下运行 Hadoop 任务, 可以勉强用来做单元测试,但终究也不太方便。最近发现 apache 开源了一个 MapReduce 应用的单元测试框架,实际用了一把,比较方便, 可以在本地直接测试 map 或 reduce 任务, 还能进行调试,非常好用。个人认为单元测试在工程实践中重要的一环, 一方面不需要打包,提交任务等一系列繁琐的操作过程就能测试代码,高效且能提高程序健壮性。另一方面,如果代码改动了,只需要增加些测试用例,重跑下单元测试即可。短期看编写测试代码花费了时间, 长期看是节省时间的。今天介绍的 MapReduce 单元测试框架叫 mrunit ,apache 开源项目,目前官网显示该项目已经退役,但并不影响我们使用,首先在 pom 文件中添加依赖

<dependency>
      <groupId>org.apache.mrunit</groupId>
      <artifactId>mrunit</artifactId>
      <version>1.1.0</version>
      <classifier>hadoop2</classifier> <!--选择hadoop1或hadoop2-->
      <scope>test</scope>
</dependency>

我们以 word count map 任务为例,编写 WordCountMapper 的单元测试,代码如下

package com.cnblogs.duma;

import com.cnblogs.duma.mapreduce.WordCount;
import com.cnblogs.duma.mapreduce.WordCountDriver;
import com.cnblogs.duma.mapreduce.WordCountMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;

public class WordCountMapperTest {
    MapDriver<Object, Text, Text, IntWritable> mapDriver = null;

    @Before
    public void setUp() {
        WordCountMapper wdMapper = new WordCountMapper(); //初始化要测试的mapper
        mapDriver = MapDriver.newMapDriver(wdMapper); //初始化 MapDriver
        Configuration conf = mapDriver.getConfiguration();
        conf.set("wordcount.filter.word", "hadoop"); //指定配置信息
    }

    @Test
    public void test() throws IOException {
        mapDriver.setMapper(new WordCountMapper());
        mapDriver.withInput(new IntWritable(1), new Text("a hadoop"))
                .withOutput(new Text("hadoop"), new IntWritable(1))
                .runTest();
    }

}

可以看到 mrunit 使用还是比较方便。有两点需要注意,第一,如果是 hadoop2 的测试用例, 需要要引用 org.apache.hadoop.mrunit.mapreduce.MapDriver。第二,withOutput 可以写多个,且顺序跟期望输出的顺序一致。这是 map 任务的测试用例, reduce 任务用法与之类似, 不再赘述。

总结

本章内容主要介绍了 MapReduce 任务常见的写法、Tool 接口支持的命令行选项、配置相关的知识以及单元测试。尤其是单元测试,方便、易用、高效。有意识编写单元测试可以让开发更规范同时可以提高工程质量。

作者:渡码