Scala与大数据

2017/11/20 Scala

Scala与大数据

用Scala改善MapReduce

MapReduce 的 Java API 是非常底层的但很难使用,它需要特别的专业知识来实现一些算法,才能获得良好的性能。MapReduce 模型包含 map 步骤,在该步骤中我们通过读入文件,将数据转为 key 值对的形式,来满足算法的要求。key 值对在 shuffle 阶段被混洗,安排相同的 key 在一起,然后执行最后的 reduce 处理步骤。许多算法都需要好多个 MapReduce 的 job 串在一起。不幸的是,MapReduce 在每个 job 结束后,会将数据刷到磁盘中,即使该序列中的下一个作业需要将数据读回内存。反复执行磁盘 I/O 是 MapReduce 处理大型数据集时效率不高的主要原因。
在 MapReduce 中,map 其实表示平坦映射 (flat map),这是因为对于每一个输入(比如,文本文件中的一个)会产生零到多个输出的 key 值对。 Reduce 的含义与通常认为的相同。但是,试想一下,如果 Scala 的容器只有 flatMap 和 reduce 这两个组合子,会如何呢?许多你想完成的转换会很难实现。另外,你需要了解如何在大型数据集上有效地实现转换。其结果如下:在原则上,你可以在 MapReduce 实现几乎所有的算法,但在实践 中,这需要特殊的专业知识和具有挑战性的编程工作。 Cascading(http://cascading.org )是 Java 中最有名的 API,它支持对 Hadoop MapReduce 中的典型数据问题进行抽象,也隐藏了许多底层的 MapReduce 细节。Twitter 发明了 Scalding(https://github.com/twitter/scalding )。Scalding 是基于 Cascading 的 Scala API,已经变得非常流行。 下面我们来看一个典型的算法 Word Count,它是 Hadoop 的“Hello World”。
例一,MR版本:

class WordCountMapper extends MapReduceBase
    implements Mapper<IntWritable, Text, Text, IntWritable> {
    static final IntWritable one = new IntWritable(1); 
    static final Text word = new Text;

    @Override 
    public void map(IntWritable key, Text valueDocContents, OutputCollector<Text, IntWritable> output, Reporter reporter) {
        String[] tokens = valueDocContents.toString.split("\\s+"); 
        for (String wordString: tokens) {
            if (wordString.length > 0) {
                 word.set(wordString.toLowerCase); 
                 output.collect(word, one);
            } 
        }
    } 
}
class WordCountReduce extends MapReduceBase
    implements Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text keyWord, 
    java.util.Iterator<IntWritable> counts, 
    OutputCollector<Text, IntWritable> output, 
    Reporter reporter) {
        int totalCount = 0; while (counts.hasNext) {
            totalCount += counts.next.get; 
        }
    output.collect(keyWord, new IntWritable(totalCount)); }
}

例二,Cascading 提供了一个直观的管道 (pipe)模型:

public class CascadingWordCount {
    public static void main( String[] args ) {
        String input = args[0]; String output = args[1];

        Tap docTap = new Hfs( new TextDelimited( true, "\t" ), input ); Tap wcTap = new Hfs( new TextDelimited( true, "\t" ), output );

        Properties properties = new Properties();
        AppProps.setApplicationJarClass( properties, Main.class ); 
        HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );

        Fields token = new Fields( "token" ); 
        Fields text = new Fields( "text" ); 
        RegexSplitGenerator splitter =
        new RegexSplitGenerator( token, "[ \\[\\]\\(\\),.]" ); 

        Pipe docPipe =new Each( "token", text, splitter, Fields.RESULTS );
        Pipe wcPipe = new Pipe( "wc", docPipe );

        wcPipe = new GroupBy( wcPipe, token );
        wcPipe = new Every( wcPipe, Fields.ALL, new Count(), Fields.ALL );
        // 将所有的tap、pipe等连接到flow。 
        FlowDef flowDef = FlowDef.flowDef()
        .setName( "wc" ).addSource( docPipe, docTap ) .addTailSink( wcPipe, wcTap );
// 运行flow。
        Flow wcFlow = flowConnector.connect( flowDef ); wcFlow.complete();
    } 
}

例三,Scalding 的版本:

class WordCount(args : Args) extends Job(args) {

        TextLine(args("input")) 
        .flatMap('line -> 'word) {
            line: String => line.trim.toLowerCase.split("""\s+""")
        }
        .groupBy('word){ group => group.size('count) } 
        .write(Tsv(args("output")))
}

我们从冗长、繁琐的程序演变为一个简单的脚本。当你可以写出如此简洁的程序的时候,整个软件开发过程都改变了。

备注: 今天先更新这么多。

Show Disqus Comments

Search

    Table of Contents