hadoop实例---多表关联

标签: hadoop 实例 关联 | 发表时间:2013-08-04 16:20 | 作者:a331251021
出处:http://blog.csdn.net

多表关联和单表关联类似,它也是通过对原始数据进行一定的处理,从其中挖掘出关心的信息。如下

输入的是两个文件,一个代表工厂表,包含工厂名列和地址编号列;另一个代表地址表,包含地址名列和地址编号列。要求从输入数据中找出工厂名和地址名的对应关系,输出工厂名-地址名表

样本如下:

factory:

factoryname addressed
Beijing Red Star 1
Shenzhen Thunder 3
Guangzhou Honda 2
Beijing Rising 1
Guangzhou Development Bank 2
Tencent 3
Back of Beijing 1

address:

addressID addressname
1 Beijing
2 Guangzhou
3 Shenzhen
4 Xian


结果:

factoryname     addressname
Beijing Red Star        Beijing
Beijing Rising  Beijing
Bank of Beijing         Beijing
Guangzhou Honda         Guangzhou
Guangzhou Development Bank      Guangzhou
Shenzhen Thunder        Shenzhen
Tencent         Shenzhen


代码如下:

import java.io.IOException;

import java.util.*;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

 

public class MTjoin {

 

    public static int time = 0;

 

    /*

     * 在map中先区分输入行属于左表还是右表,然后对两列值进行分割,

     * 保存连接列在key值,剩余列和左右表标志在value中,最后输出

     */

    public static class Map extends Mapper<Object, Text, Text, Text> {

 

        // 实现map函数

        public void map(Object key, Text value, Context context)

                throws IOException, InterruptedException {

            String line = value.toString();// 每行文件

            String relationtype = new String();// 左右表标识

 

            // 输入文件首行,不处理

            if (line.contains("factoryname") == true

                    || line.contains("addressed") == true) {

                return;

            }

 

            // 输入的一行预处理文本

            StringTokenizer itr = new StringTokenizer(line);

            String mapkey = new String();

            String mapvalue = new String();

            int i = 0;

            while (itr.hasMoreTokens()) {

                // 先读取一个单词

                String token = itr.nextToken();

                // 判断该地址ID就把存到"values[0]"

                if (token.charAt(0) >= '0' && token.charAt(0) <= '9') {

                    mapkey = token;

                    if (i > 0) {

                        relationtype = "1";

                    } else {

                        relationtype = "2";

                    }

                    continue;

                }

 

                // 存工厂名

                mapvalue += token + " ";

                i++;

            }

 

            // 输出左右表

            context.write(new Text(mapkey), new Text(relationtype + "+"+ mapvalue));

        }

    }

 

    /*

     * reduce解析map输出,将value中数据按照左右表分别保存,

  * 然后求出笛卡尔积,并输出。

     */

    public static class Reduce extends Reducer<Text, Text, Text, Text> {

 

        // 实现reduce函数

        public void reduce(Text key, Iterable<Text> values, Context context)

                throws IOException, InterruptedException {

 

            // 输出表头

            if (0 == time) {

                context.write(new Text("factoryname"), new Text("addressname"));

                time++;

            }

 

            int factorynum = 0;

            String[] factory = new String[10];

            int addressnum = 0;

            String[] address = new String[10];

 

            Iterator ite = values.iterator();

            while (ite.hasNext()) {

                String record = ite.next().toString();

                int len = record.length();

                int i = 2;

                if (0 == len) {

                    continue;

                }

 

                // 取得左右表标识

                char relationtype = record.charAt(0);

 

                // 左表

                if ('1' == relationtype) {

                    factory[factorynum] = record.substring(i);

                    factorynum++;

                }

 

                // 右表

                if ('2' == relationtype) {

                    address[addressnum] = record.substring(i);

                    addressnum++;

                }

            }

 

            // 求笛卡尔积

            if (0 != factorynum && 0 != addressnum) {

                for (int m = 0; m < factorynum; m++) {

                    for (int n = 0; n < addressnum; n++) {

                        // 输出结果

                        context.write(new Text(factory[m]),

                                new Text(address[n]));

                    }

                }

            }

 

        }

    }

 

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        // 这句话很关键

  //      conf.set("mapred.job.tracker", "192.168.1.2:9001");

 
	//可使用args
  //      String[] ioArgs = new String[] { "MTjoin_in", "MTjoin_out" };

        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        if (otherArgs.length != 2) {

            System.err.println("Usage: Multiple Table Join <in> <out>");

            System.exit(2);

        }

 

        Job job = new Job(conf, "Multiple Table Join");

        job.setJarByClass(MTjoin.class);

 

        // 设置Map和Reduce处理类

        job.setMapperClass(Map.class);

        job.setReducerClass(Reduce.class);

 

        // 设置输出类型

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);

 

        // 设置输入和输出目录

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}

 javac -classpath hadoop-core-1.1.2.jar:/opt/hadoop-1.1.2/lib/commons-cli-1.2.jar -d firstProject firstProject/MTJoin.java
jar -cvf MTJoin.jar -C firstProject/ .     

删除已经存在的output

hadoop fs -rmr output
hadoop fs -mkdir input
hadoop fs -put factory input
 hadoop fs -put address input

运行

hadoop jar  MTJoin.jar MTJoin input output


查看结果

 hadoop fs -cat output/part-r-00000










 

作者:a331251021 发表于2013-8-4 16:20:52 原文链接
阅读:72 评论:0 查看评论

相关 [hadoop 实例 关联] 推荐:

hadoop实例---多表关联

- - CSDN博客研发管理推荐文章
多表关联和单表关联类似,它也是通过对原始数据进行一定的处理,从其中挖掘出关心的信息. 输入的是两个文件,一个代表工厂表,包含工厂名列和地址编号列;另一个代表地址表,包含地址名列和地址编号列. 要求从输入数据中找出工厂名和地址名的对应关系,输出工厂名-地址名表. factoryname addressed Beijing Red Star 1 Shenzhen Thunder 3 Guangzhou Honda 2 Beijing Rising 1 Guangzhou Development Bank 2 Tencent 3 Back of Beijing 1.

Hadoop序列化机制及实例

- - CSDN博客推荐文章
将结构化对象转换成字节流以便于进行网络传输或写入持久存储的过程. 将字节流转换为一系列结构化对象的过程. 1、作为一种 持久化格式. 2、作为一种 通信的数据格式. 3、作为一种数据拷贝、克隆机制. Java序列化和反序列化. 1、创建一个对象实现了 Serializable. 2、序列化: ObjectOutputStream.writeObject(序列化对象).

hadoop实现单表和多表关联

- - CSDN博客云计算推荐文章
转载请注明: http://hanlaiming.freetzi.com/?p=123. 在mapreduce上编写简单应用后,开始学习稍微高级一点的单表关联和多表关联. 在学习过程中我参考了这篇文章,谢谢 http://www.cnblogs.com/xia520pi/archive/2012/06/04/2534533.html,里面很多基本的内容很实用.

关联规则二项集hadoop实现

- - CSDN博客推荐文章
近期看mahout的关联规则源码,颇为头痛,本来打算写一个系列分析关联规则的源码的,但是后面看到有点乱了,可能是稍微有点复杂吧,所以就打算先实现最简单的二项集关联规则. 算法的思想还是参考上次的图片:. 针对原始输入计算每个项目出现的次数;. 按出现次数从大到小(排除出现次数小于阈值的项目)生成frequence list file;.

hadoop 处理不同的输入文件,文件关联

- - CSDN博客云计算推荐文章
file1和file2进行关联,想要的结果:. 2、将file1的key、value颠倒 ;file1和file2的key相同,file1的value做key,file2的value做value ,输出. if("file1".equals(fileName)){//加标记. // 设置Map和Reduce处理类.

Hadoop Streaming 编程

- - 学着站在巨人的肩膀上
Hadoop Streaming是Hadoop提供的一个编程工具,它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer,例如:. 采用shell脚本语言中的一些命令作为mapper和reducer(cat作为mapper,wc作为reducer). 本文安排如下,第二节介绍Hadoop Streaming的原理,第三节介绍Hadoop Streaming的使用方法,第四节介绍Hadoop Streaming的程序编写方法,在这一节中,用C++、C、shell脚本 和python实现了WordCount作业,第五节总结了常见的问题.

Hadoop使用(一)

- Pei - 博客园-首页原创精华区
Hadoop使用主/从(Master/Slave)架构,主要角色有NameNode,DataNode,secondary NameNode,JobTracker,TaskTracker组成. 其中NameNode,secondary NameNode,JobTracker运行在Master节点上,DataNode和TaskTracker运行在Slave节点上.

Hadoop MapReduce技巧

- - 简单文本
我在使用Hadoop编写MapReduce程序时,遇到了一些问题,通过在Google上查询资料,并结合自己对Hadoop的理解,逐一解决了这些问题. Hadoop对MapReduce中Key与Value的类型是有要求的,简单说来,这些类型必须支持Hadoop的序列化. 为了提高序列化的性能,Hadoop还为Java中常见的基本类型提供了相应地支持序列化的类型,如IntWritable,LongWritable,并为String类型提供了Text类型.

Hadoop TaskScheduler浅析

- - kouu&#39;s home
TaskScheduler,顾名思义,就是MapReduce中的任务调度器. 在MapReduce中,JobTracker接收JobClient提交的Job,将它们按InputFormat的划分以及其他相关配置,生成若干个Map和Reduce任务. 然后,当一个TaskTracker通过心跳告知JobTracker自己还有空闲的任务Slot时,JobTracker就会向其分派任务.

HADOOP安装

- - OracleDBA Blog---三少个人自留地
最近有时间看看hadoop的一些东西,而且在测试的环境上做了一些搭建的工作. 首先,安装前需要做一些准备工作. 使用一台pcserver作为测试服务器,同时使用Oracle VM VirtualBox来作为虚拟机的服务器. 新建了三个虚拟机以后,安装linux,我安装的linux的版本是redhat linux 5.4 x64版本.