kangfoo's blog

工作学习笔记,生活掠影。

Springbatch入门

| 评论

  1. 批处理 在企业级应用系统当中,面对日益复杂的义务及一定规模的数据量,频繁的人机操作会引入一定的时间成本和管理风险。可以采取定时读取大批数据,在执行相应的工作流程,并归档。我首先想到的就是直接使用批处理进行解决。以解决我可能要面对的与特定时间周期相关、数据量大、尽量少人工干涉、自动完成、事后督察等工作。这些工作可以用 存储过程 + shell 等方式实现,但作为应用程序而言,我倾向于使用JAVA api. jdk 7 才有原生的API支持(这是 JAVA 7 Batch Processing Tutorial)。貌似6要费费劲。我一直是spring 的粉丝。SpringBatch 自然会上场的。

  2. spring batch 一般的批处理都分有三个阶段 读数据(我的数据目前大部分来自于文件) 处理数据(业务逻辑) 写数据(将业务结果写入数据库) 这些过程又必选考虑效率、事物的粒度、监控、资源开销。读和写、业务处理一般都是独立的模块可直接解耦。谷歌了一番,看中了 spring batch。

那么 spring batch 可以给我们带来什么好处? Spring Batch作为Spring的一个顶级子项目,是一款优秀的大数据量并行处理框架。通过Spring Batch可以构建出轻量级的健壮的并行处理应用,支持事务、并发、监控,提供统一的接口管理和任务管理。

谷歌文档一堆呀。先列举下我认为不错的。

好吧工具算是找的差不多了。 也要开始我的第一个 demo 了。具体的概念先放放。东西弄出来了,在慢慢细嚼。

  1. 首先 git clone maven-springbatch-archetype maven 插件。同时请确保你自己的版本,我当前使用的 1.4-SNAPSHOT ,按照 readme 一步步执行吧。

  2. 生成我们自己的样板工程代码结构

    mvn archetype:generate \
    -DarchetypeGroupId=com.dtzq \
    -DarchetypeArtifactId=maven-springbatch-archetype \
    -DarchetypeVersion=1.4-SNAPSHOT \
    -DgroupId=com.kangfoo.study.hygeia \
    -DartifactId=springbatch.test \
    -Dversion=1.0-SNAPSHOT \
    -Dpackage=com.kangfoo.study.hygeia.springbatch.test
    
  3. 题外话。第一次在 github 向他人维护的项目提交代码,弄了会儿,玩转了。 主要借鉴花20分钟写的-大白话讲解如何给github上项目贡献代码 先记录在案。

  4. spring batch + quartz

Hadoop Pipes & Streaming

| 评论

申明:本文大部分出自于 开源力量 LouisT 老师的开源力量培训课-Hadoop Development课件 和 Apache 官方文档。

Streaming

  • Streaming 是 hadoop 里面提供的一个工具
  • Streaming 框架允许任何程序语言实现的程序在 Hadoop MapReduce 中使用,方便任何程序向 Hadoop 平台移植,具有很强的扩展性;
  • mapper 和 reducer 会从标准输入中读取用户数据,一行一行处理后发送给标准输出。Streaming 工具会创建 MapReduce 作业,发送给各个 tasktracker,同时监控整个作业的执行过程;
  • 如果一个文件(可执行或者脚本)作为 mapper,mapper 初始化时,每一个 mapper 任务会把该文件作为一个单独进程启动,mapper 任务运行时,它把输入切法成行并把每一行提供给可执行文件进程的标准输入。同 时,mapper 收集可执行文件进程标准输出的内容,并把收到的每一行内容转化成 key/value,作为 mapper的输出。默认情况下,一行中第一个 tab 之前的部分作为 key,之后的(不包括)作为value。如果没有 tab,整行作为 key 值,value值为null。对于reducer,类似;

Streaming 优点

  1. 开发效率高,便于移植。Hadoop Streaming 使用 Unix 标准流作为 Hadoop 和应用程序之间的接口。在单机上可按照 cat input | mapper | sort | reducer > output 进行测试,若单机上测试通过,集群上一般控制好内存也可以很好的执行成功。

  2. 提高运行效率。对内存要求较高,可用C/C++控制内存。比纯java实现更好。

Streaming缺点

  1. Hadoop Streaming 默认只能处理文本数据,(0.21.0之后可以处理二进制数据)。

  2. Steaming 中的 mapper 和 reducer 默认只能想标准输出写数据,不能方便的多路输出。

更详细内容请参考于: http://hadoop.apache.org/docs/r1.2.1/streaming.html

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper /bin/cat \
    -reducer /bin/wc

streaming示例

perl 语言的streaming示例 代码

-rw-rw-r--. 1 hadoop hadoop     48 2月  22 10:47 data
-rw-rw-r--. 1 hadoop hadoop 107399 2月  22 10:41 hadoop-streaming-1.2.1.jar
-rw-rw-r--. 1 hadoop hadoop    186 2月  22 10:45 mapper.pl
-rw-rw-r--. 1 hadoop hadoop    297 2月  22 10:55 reducer.pl
##
$ ../bin/hadoop jar hadoop-streaming-1.2.1.jar -mapper mapper.pl -reducer reducer.pl -input /test/streaming -output /test/streamingout1 -file mapper.pl -file reducer.pl 

Hadoop pipes

  1. Hadoop pipes 是 Hadoop MapReduce 的 C++ 的接口代称。不同于使用标准输入和输出来实现 map 代码和 reduce 代码之间的 Streaming。
  2. Pipes 使用套接字 socket 作为 tasktracker 与 C++ 版本函数的进程间的通讯,未使用 JNI。
  3. 与 Streaming 不同,Pipes 是 Socket 通讯,Streaming 是标准输入输出。

编译 Hadoop Pipes

编译c++ pipes( 确保操作系统提前安装好了 openssl,zlib,glib,openssl-devel) Hadoop更目录下执行 ant -Dcompile.c++=yes examples

具体请参见《Hadoop Pipes 编译》

Hadoop官方示例:

hadoop/src/examples/pipes/impl
 config.h.in
 sort.cc
wordcount-nopipe.cc
wordcount-part.cc
wordcount-simple.cc

运行前需要把可执行文件和输入数据上传到 hdfs:

$ ./bin/hadoop fs -mkdir /test/pipes/input
$ ./bin/hadoop fs -put a.txt /test/pipes/input 
$ ./bin/hadoop fs -cat /test/pipes/input/a.txt 
hello hadoop hello hive hello hbase hello zk

上传执行文件,重新命名为/test/pipes/exec

$ ./bin/hadoop fs -put ./build/c++-examples/Linux-amd64-64/bin/wordcount-simple /test/pipes/exec

在编译好的文件夹目录下执行

$ cd hadoop/build/c++-examples/Linux-amd64-64/bin
$ ../../../../bin/hadoop pipes -Dhadoop.pipes.java.recordreader=true -Dhadoop.pipes.java.recordwriter=true -reduces 4 -input /test/pipes/input -output /test/pipes/input/output1 -program /test/pipes/execs

执行结果如下:

$ ./bin/hadoop fs -cat /test/pipes/input/output1/part-00000 hbase 1 
$ ./bin/hadoop fs -cat /test/pipes/input/output1/part-00001 hello 4 hive 1 
$ ./bin/hadoop fs -cat /test/pipes/input/output1/part-00002 hadoop 1 zk 1 
$ ./bin/hadoop fs -cat /test/pipes/input/output1/part-00003

参考博客:

Hadoop MapReduce Sort

| 评论

排序是 MapReduce 的核心。排序可分为四种排序:普通排序、部分排序、全局排序、辅助排序

普通排序

Mapreduce 本身自带排序功能;Text 对象是不适合排序的;IntWritable,LongWritable 等实现了WritableComparable 类型的对象都是可以排序的。

部分排序

map 和 reduce 处理过程中包含了默认对 key 的排序,那么如果不要求全排序,可以直接把结果输出,每个输出文件中包含的就是按照key执行排序的结果。

控制排序顺序

键的排序是由 RawComparator 控制的,规则如下:

  1. 若属性 mapred.output.key.comparator.class 已设置,则使用该类的实例。调用 JobConf 的 setOutputKeyComparatorClass() 方法进行设置。
  2. 否则,键必须是 WritableComparable 的子类,并使用针对该键类的已登记的 comparator.
  3. 如果没有已登记的 comparator ,则使用 RawComparator 将字节流反序列化为一个对象,再由 WritableComparable 的 compareTo() 方法进行操作。

全局排序(对所有数据排序)

Hadoop 没有提供全局数据排序,而全局排序是非常普遍的需求。

实现方案

  • 首先,创建一系列的排好序的文件;
  • 其次,串联这些文件;
  • 最后,生成一个全局排序的文件。

主要思路是使用一个partitioner来描述全局排序的输出。该方法关键在于如何划分各个分区。

例,对整数排序,[0,10000] 的在 partition 0 中,(10000,20000] 在 partition 1 中… …即第n个reduce 所分配到的数据全部大于第 n-1 个 reduce 中的数据。每个 reduce 的结果都是有序的。
然后再将所有的输出文件顺序合并成一个大的文件,那么就实现了全局排序。

在比较理想的数据分布均匀的情况下,每个分区内的数据量要基本相同。

但实际中数据往往分布不均匀,出现数据倾斜,这时按照此方法进行的分区划分数据就不适用,可对数据进行采样。

采样器

通过对 key 空间进行采样,可以较为均匀的划分数据集。采样的核心思想是只查看一小部分键,获取键的相似分布,并由此构建分区。采样器是在 map 阶段之前进行的, 在提交 job 的 client 端完成的。

Sampler接口

Sampler 接口是 Hadoop 的采样器,它的 getSample() 方法返回一组样本。此接口一般不由客户端调用,而是由 InputSampler 类的静态方法 writePartitionFile() 调用,以创建一个顺序文件来存储定义分区的键。

Sampler接口声明如下:

  public interface Sampler<K,V> {
   K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException;
  }

继承 Sample 的类还有 IntervalSampler 间隔采样器,RandomSampler 随机采样器,SplitSampler 分词采样器。它们都是 InputSampler 的静态内部类。

getSample() 方法根据 job 的配置信息以及输入格式获得抽样结果,三个采样类各自有不同的实现。

IntervalSampler 根据一定的间隔从 s 个分区中采样数据,非常适合对排好序的数据采样。

public static class IntervalSampler<K,V> implements Sampler<K,V> {
    private final double freq;// 哪一条记录被选中的概率
    private final int maxSplitsSampled;// 采样的最大分区数
    /**
     * For each split sampled, emit when the ratio of the number of records
     * retained to the total record count is less than the specified
     * frequency.
     */
    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
    public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());// 1. 得到输入分区数组
      ArrayList<K> samples = new ArrayList<K>();
      int splitsToSample = Math.min(maxSplitsSampled, splits.length);
      int splitStep = splits.length / splitsToSample; // 2. 分区采样时的间隔splitStep = 输入分区总数 除以 splitsToSample的 商;
      long records = 0;
      long kept = 0;
      for (int i = 0; i < splitsToSample; ++i) {
        RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep], // 3. 采样下标为i * splitStep的数据
            job, Reporter.NULL);
        K key = reader.createKey();
        V value = reader.createValue();
        while (reader.next(key, value)) {// 6. 循环读取下一条记录
          ++records;
          if ((double) kept / records < freq) { // 4. 如果当前样本数与已经读取的记录数的比值小于freq,则将这条记录添加到样本集合
            ++kept;
            samples.add(key);// 5. 将记录添加到样本集合中
            key = reader.createKey();
          }
        }
        reader.close();
      }
      return (K[])samples.toArray();
    }
  }
… … 
}

RandomSampler 是常用的采样器,它随机地从输入数据中抽取 Key

  public static class RandomSampler<K,V> implements Sampler<K,V> {
    private double freq;// 一个Key被选中的 概率
    private final int numSamples;// 从所有被选中的分区中获得的总共的样本数目
    private final int maxSplitsSampled;// 需要检查扫描的最大分区数目
/**
     * Randomize the split order, then take the specified number of keys from
     * each split sampled, where each key is selected with the specified
     * probability and possibly replaced by a subsequently selected key when
     * the quota of keys from that split is satisfied.
     */
    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
    public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());// 1. 获取所有的输入分区
      ArrayList<K> samples = new ArrayList<K>(numSamples);// 2. 确定需要抽样扫描的分区数目
      int splitsToSample = Math.min(maxSplitsSampled, splits.length);// 3. 取最小的为采样的分区数
      Random r = new Random();
      long seed = r.nextLong();
      r.setSeed(seed);
      LOG.debug("seed: " + seed);
      // shuffle splits 4. 对输入分区数组shuffle排序
      for (int i = 0; i < splits.length; ++i) {
        InputSplit tmp = splits[i];
        int j = r.nextInt(splits.length);// 5. 打乱其原始顺序
        splits[i] = splits[j];
        splits[j] = tmp;
      }
      // our target rate is in terms of the maximum number of sample splits,
      // but we accept the possibility of sampling additional splits to hit
      // the target sample keyset
// 5. 然后循环逐 个扫描每个分区中的记录进行采样,
      for (int i = 0; i < splitsToSample ||
                     (i < splits.length && samples.size() < numSamples); ++i) {
        RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
            Reporter.NULL);
       // 6. 取出一条记录
        K key = reader.createKey();
        V value = reader.createValue();
        while (reader.next(key, value)) {
          if (r.nextDouble() <= freq) {
            if (samples.size() < numSamples) {// 7. 判断当前的采样数是否小于最大采样数
              samples.add(key); //8. 小于则这条记录被选中,放进采样集合中,
            } else {
              // When exceeding the maximum number of samples, replace a
              // random element with this one, then adjust the frequency
              // to reflect the possibility of existing elements being
              // pushed out
              int ind = r.nextInt(numSamples);// 9. 从[0,numSamples]中选择一个随机数
              if (ind != numSamples) {
                samples.set(ind, key);// 10. 替换掉采样集合随机数对应位置的记录,
              }
              freq *= (numSamples - 1) / (double) numSamples;// 11. 调小频率
            }
            key = reader.createKey();// 12. 下一条纪录的key
          }
        }
        reader.close();
      }
      return (K[])samples.toArray();// 13. 返回
    }
  }
… … 
}

SplitSampler 从 s 个分区中采样前 n 个记录,是采样随机数据的一种简便方式。

  public static class SplitSampler<K,V> implements Sampler<K,V> {
    private final int numSamples;// 最大采样数
    private final int maxSplitsSampled;// 最大分区数
    … … 
    /**
     * From each split sampled, take the first numSamples / numSplits records.
     */
    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
    public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
      ArrayList<K> samples = new ArrayList<K>(numSamples);
      int splitsToSample = Math.min(maxSplitsSampled, splits.length);// 1. 采样的分区数
      int splitStep = splits.length / splitsToSample; // 2. 分区采样时的间隔 = 分片的长度 与 输入分片的总数的 商
      int samplesPerSplit = numSamples / splitsToSample; // 3. 每个分区的采样数 
      long records = 0;
      for (int i = 0; i < splitsToSample; ++i) {
        RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep], // 4.采样下标为i * splitStep的数据
            job, Reporter.NULL);
        K key = reader.createKey();
        V value = reader.createValue();
        while (reader.next(key, value)) {
          samples.add(key);// 5. 将记录添加到样本集合中
          key = reader.createKey();
          ++records;
          if ((i+1) * samplesPerSplit <= records) { // 6. 当前样本数大于当前的采样分区所需要的样本数,则停止对当前分区的采样。
            break;
          }
        }
        reader.close();
      }
      return (K[])samples.toArray();
    }
  }

Hadoop为顺序文件提供了一个 TotalOrderPartitioner 类,可以用来实现全局排序;TotalOrderPartitioner 源代码理解。TotalOrderPartitioner 内部定义了多个字典树(内部类)。

interface Node<T> 
// 特里树,利用字符串的公共前缀来节约存储空间,最大限度地减少无谓的字符串比较,查询效率比哈希表高
static abstract class TrieNode implements Node<BinaryComparable> 
static class InnerTrieNode extends TrieNode 
static class LeafTrieNode extends TrieNode
… … 

由 TotalOrderPartitioner 调用 getPartition() 方法返回分区,由 buildTrieRec() 构建特里树.

 private TrieNode buildTrieRec(BinaryComparable[] splits, int lower,
      int upper, byte[] prefix, int maxDepth, CarriedTrieNodeRef ref) {
… … 
}

采样器使用示例

  1. 新建文件,名为 random.txt,里面每行存放一个数据。可由 RandomGenerator 类生成准备数据
  2. 执行 TestTotalOrderPartitioner.java

辅助排序

先按 key 排序,在按 相同的 key 不同的 value 再排序。可实现对值分组的效果。

Hadoop MapReduce Join

| 评论

在 Hadoop 中可以通过 MapReduce,Pig,hive,Cascading编程进行大型数据集间的连接操作。连接操作如果由 Mapper 执行,则称为“map端连接”;如果由 Reduce 执行,则称为“Reduce端连接”。

连接操作的具体实现技术取决于数据集的规模以及分区方式。
若一个数据集很大而另一个数据集很小,以至于可以分发到集群中的每一个节点之中,则可以执行一个 MapReduce 作业,将各个数据集的数据放到一起,从而实现连接。
若两个数据规模均很大,没有哪个数据集可以完全复制到集群的每个节点,可以使用 MapReduce 作业进行连接,使用 Map 端连接还是 Reduce 端连接取决于数据的组织方式。

Map端连接将所有的工作在 map 中操作,效率高但是不通用。而 Reduce 端连接利用了 shuff 机制,进行连接,效率不高。

DistributedCache 能够在任务运行过程中及时地将文件和存档复制到任务节点进行本地缓存以供使用。各个文件通常只复制到一个节点一次。可用 api 或者命令行在需要的时候将本地文件添加到 hdfs 文件系统中。

本文中的示例 出自于 开源力量 LouisT 老师的开源力量培训课-Hadoop Development课件。

Map端连接

Map 端联接是指数据到达 map 处理函数之前进行合并的。它要求 map 的输入数据必须先分区并以特定的方式排序。各个输入数据集被划分成相同数量的分区,并均按相同的键排序(连接键)。同一键的所有输入纪录均会放在同一个分区。以满足 MapReduce 作业的输出。

若作业的 Reduce 数量相同、键相同、输入文件是不可切分的,那么 map 端连接操作可以连接多个作业的输出。

在 Map 端连接效率比 Reduce 端连接效率高(Reduce端Shuff耗时),但是要求比较苛刻。

基本思路

  1. 将需要 join 的两个文件,一个存储在 HDFS 中,一个使用 DistributedCache.addCacheFile() 将需要 join 另一个文件加入到所有 Map 的缓存里(DistributedCache.addCacheFile() 需要在作业提交前设置);
  2. 在 Map 函数里读取该文件,进行 Join;
  3. 将结果输出到 reduce 端;

使用步骤

  1. 在 HDFS 中上传文件(文本文件、压缩文件、jar包等);
  2. 调用相关API添加文件信息;
  3. task运行前直接调用文件读写API获取文件;

Reduce端Join

reduce 端联接比 map 端联接更普遍,因为输入的数据不需要特定的结构;效率低(所有数据必须经过shuffle过程)。

基本思路

  1. Map 端读取所有文件,并在输出的内容里加上标识代表数据是从哪个文件里来的;
  2. 在 reduce 处理函数里,对按照标识对数据进行保存;
  3. 然后根据 Key 的 Join 来求出结果直接输出;

示例程序

使用 MapReduce map 端join 或者 reduce 端 join 实现如下两张表 emp, dep 中的 SQL 联合查询的数据效果。

Table EMP:(新建文件EMP,第一行属性名不要)
----------------------------------------
Name      Sex      Age     DepNo
zhang      male     20           1     
li              female  25           2
wang       female  30           3
zhou        male     35           2
----------------------------------------
Table Dep:(新建文件DEP,第一行属性名不要)
DepNo     DepName
     1            Sales
     2            Dev
     3            Mgt
------------------------------------------------------------     
SQL:
select name,sex ,age, depName from emp inner join DEP on EMP.DepNo = Dep.DepNo
----------------------------------------
实现效果:
$ ./bin/hadoop fs -cat /reduceSideJoin/output11/part-r-00000
zhang male 20 sales
li female 25 dev
wang female 30 dev
zhou male 35 dev

Map 端 Join 的例子:TestMapSideJoin
Reduce 端 Join 的例子:TestReduceSideJoin

Hadoop MapReduce 计数器

| 评论

计数器是一种收集系统信息有效手段,用于质量控制或应用级统计。可辅助诊断系统故障。计数器可以比日志更方便的统计事件发生次数。

内置计数器

Hadoop 为每个作业维护若干内置计数器,主要用来记录作业的执行情况。

内置计数器包括

  • MapReduce 框架计数器(Map-Reduce Framework)
  • 文件系统计数器(FielSystemCounters)
  • 作业计数器(Job Counters)
  • 文件输入格式计数器(File Output Format Counters)
  • 文件输出格式计数器(File Input Format Counters)

计数器由其关联的 task 进行维护,定期传递给 tasktracker,再由 tasktracker 传给 jobtracker。因此,计数器能够被全局地聚集。内置计数器实际由 jobtracker 维护,不必在整个网络发送。

一个任务的计数器值每次都是完整传输的,仅当一个作业执行成功之后,计数器的值才完整可靠的。

自定义Java计数器

MapReduce 允许用户自定义计数器,MapReduce 框架将跨所有 map 和 reduce 聚集这些计数器,并在作业结束的时候产生一个最终的结果。

计数器的值可以在 mapper 或者 reducer 中添加。多个计数器可以由一个 java 枚举类型来定义,以便对计数器分组。一个作业可以定义的枚举类型数量不限,个个枚举类型所包含的数量也不限。

枚举类型的名称即为组的名称,枚举类型的字段即为计数器名称。

在 TaskInputOutputContext 中的 counter

 public Counter getCounter(Enum<?> counterName) {
    return reporter.getCounter(counterName);
  }
  public Counter getCounter(String groupName, String counterName) {
    return reporter.getCounter(groupName, counterName);
  }

计数器递增

org.apache.hadoop.mapreduce.Counter类

  public synchronized void increment(long incr) {
    value += incr;
  }

计数器使用

  • WebUI 查看(50030);
  • 命令行方式:hadoop job [-counter ];
  • 使用Hadoop API。 通过job.getCounters()得到Counters,而后调用counters.findCounter()方法去得到计数器对象;可参见《Hadoop权威指南》第8章 示例 8-2 MissingTemperaureFields.java

命令行方式示例

$ ./bin/hadoop job -counter  job_201402211848_0004 FileSystemCounters HDFS_BYTES_READ
177

自定义计数器

统计词汇行中词汇数超过2个或少于2个的行数。 源代码: TestCounter.javaTestCounter.java

输入数据文件值 counter.txt:

hello world
hello
hello world 111
hello world 111 222

执行参数

hdfs://master11:9000/counter/input/a.txt hdfs://master11:9000/counter/output1

计数器统计(hadoop eclipse 插件执行)结果:

2014-02-21 00:03:38,676 INFO  mapred.JobClient (Counters.java:log(587)) -   ERROR_COUNTER
2014-02-21 00:03:38,677 INFO  mapred.JobClient (Counters.java:log(589)) -     Above_2=2
2014-02-21 00:03:38,677 INFO  mapred.JobClient (Counters.java:log(589)) -     BELOW_2=1