2015Hadoop数据处理实战视频教程笔记
2015Hadoop数据处理实战视频教程笔记hadoop_hdfs_分布式文件系统
Hadoop是Google的集群系统的开源实现
-Google集群系统:GFS(Google File System)、MapReduce、BigTable
-Hadoop主要由HDFS(hadoop distributed File System Hadoop分布式文件系统)、MapReduce和Hbase组成
-Hadoop的初衷是为了解决Nutch海量数据爬取和存储的需要
-Hadoop与2005年秋天作为Lucene的子项目Nutch的一部分正式引入Apache基金会。
-名称起源:Doug Cutting儿子的黄色大象玩具的名字
--------------------------------------------------------------------
两大核心设计
MapReduce HDFS
Map:任务分解 NameNode
Reduce:结果的汇总 DataNode
Client
------------------------------------------------------------------------
|Hadoop---大数据vs传统数据 |
| | 传统数据 | 大数据 |
|数据量 | GB-》TB | TB-》PB |
|速度 | 数据量稳定,增长不快 | 持续实时产生数据,年增长率超过60%|
|多样化 | 主要为结构化数据 | 半结构化,非结构化,多维数据 |
|价值 | 统计和报表 | 数据挖掘和预测性分析 |
------------------------------------------------------------------------
"大数据"指数据集的大小超过了现有典型的数据库软件和工具的处理能力。与此同时,及时捕捉、存储、聚合、管理这些大数据以及对数据的深度分析和新技术和新能力,正在快速增长,就像预测计算芯片增长速度的摩尔定律一样。
-------------------------------------------------------------------------------------------------------------------------------------------------------
并行关系数据库 | MPP or Hadoop
1、多个独立的关系数据库服务器,访问共享的存储资源池 |1、由大量独立的服务器通过网络互连形成集群,每个服务器带存储。
优势: |2、优势:计算与存储融合,支持横向扩展,更好的扩展性
采用多个关系数据库服务器,多个存储,与原有的架构相比,扩展了存储容量和计算能力 |3、劣势:解决数据冲突时需要节点间协作
劣势: |4、适用范围:
计算与存储分离,数据访问存在竞争和带宽瓶颈 | 数据仓库和离线数据分析(MPP,Hadoop/HBase)
支持的关系数据库服务器数量有限 | 大规模在线实时应用(单行事务处理能满足的场景)(HBase)
只能向上扩展,不能横向扩展 |
适合复杂的需要事务处理的应用 |
----------------------------------------------------------------------------------------------------------------------------------------------------
Core:一套分布式文件系统以及支持Map-Reduce的计算框架
Avro:定义了一种用于支持大数据应用的数据格式,并为这种格式提供了不同的编程语言的支持
HDFS:Hadoop分布式文件系统
Map/Reduce:是一种使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集
ZooKeeper:是高可用的和可靠的分布式协同系统
Pig:建立于Hadoop Core之上为并行计算环境提供了一套数据工作流语言和执行框架
Hive:是为提供简单的数据操作而设计的下一代分布式数据仓库。它提供了简单的类似SQL的语法的HiveQL语言进行数据查询
Hbase:建立于Hadoop Core之上提供一个可扩展的数据库系统
Flume:一个分布式、可靠、和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,用于收集数据
Mahout:是一套具有可扩充能力的机器学习类库
Sqoop:是Apache下用于RDBMS和HDFS互相导数据的工具
------------------------------------------------
HDFS介绍
HDFS为了做到可靠性(reliability)创建了多份数据块(data blocks)的复制(replicas),并将它们放置在服务器群的计算节点中(compute nodes),MapReduce就可以在它们所在的节点上处理这些数据了。
------------------------------------------------------------------------
NameNode |DataNode |
存储元数据 |存储文件内容 |
元数据保存在内存中 |文件内容保存在磁盘 |
保存文件,block,datanode |维护了block id到datanode本地文件的映射关系 |
之间的映射关系 | |
------------------------------------------------------------------------
一个名字节点和多个数据节点
数据复制(冗余机制)
--存放的位置(机架感知策略)
故障检测
--数据节点
心跳包(检测是否宕机)
块报告(安全模式下检测)
数据完整性检测(校验和比较)
--名字节点(日志文件、镜像文件)
空间回收机制
HDFS优点:
--高容错性
数据自动保存多个副本
副本丢失后,自动恢复
--适合批处理
移动计算而非数据
数据位置暴露给计算框架
--适合大数据处理
GB、TB、甚至PB级数据
百万规模以上的文件数量
10k+节点
--可构建在廉价机器上
通过多副本提高可靠性
提供了容错和恢复机制
HDFS缺点:
--低延迟数据访问
比如毫秒级
低延迟与高吞吐率
--小文件存取
占用NameNode大量内存
寻到时间超过读取时间
--并发写入、文件随机修改
一个文件只能有一个写者
仅支持append
--------------------------------------------------------------
HDFS架构
HDFS Cient<----->NameNode<----->Seconddary NameNode
|
|
|
DataNode DataNode DataNode DataNode
--------------------------------------------------------------
HDFS数据存储单元(block)
-文件被切分成固定大小的数据块
默认数据块大小为64MB,可配置
若文件大小不到64MB,则单独存成一个block
一个文件存储方式
按大小被切分成若干个block,存储到不同节点上
默认情况下每个block都有三个副本
Block大小和副本数通过Client端上传文件时设置,文件上传成功后副本数可以变更,Block Size不可变更。
NameNode(NN)
-NameNode主要功能:接受客户端的读写服务
NameNode保存metadate信息包括
文件owership和permissions
文件包含那些块
Block保存在哪个DataNode(由DataNode启动时上报)
-NameNode的metadate信息在启动后会加载到内存
metadata存储到磁盘文件名为“fsimage”
Block的位置信息不会保存到“fsimage”
edits记录对metadata的操作日志
SecondaryNameNode(SNN)
-它不是NN的备份(但可以做备份),它的主要工作是帮助NN合并editslog,减少NN启动时间。
-SNN执行合并时机
根据配置文件设置的时间间隔fs.checkpoint.period默认3600秒
根据配置文件设置edits log大小 fs.checkpoint.size 规定edits文件的最大值默认是64MB
DataNode(DN)
-存储数据(Block)
-启动DN线程的时候会向NN汇报block信息
-通过向NN发送心跳保持与其联系(3秒一次),如果NN 10分钟没有收到DN的心跳,则认为其已经lost,并copy其上的block到其它DN
Block的副本放置策略
-第一副本:放置在上传文件的DN;
如果是集群外提交,则随机挑选一台磁盘不太满,CPU不太忙的节点。
-第二副本:放置在于第一个副本不同的机架的节点上。
-第三个副本:与第二个副本相同机架的节点。
-更多副本:随机节点
HDFS文件权限
-与linux文件权限类似
r:read,w:write;x:execute,权限x对于文件忽略,对于文件夹表示是否允许访问其内容
-如果Linux系统用户zhangsan使用hadoop命令创建一个文件,那么这个文件在HDFS中owner就是zhangsan。
-HDFS的权限目的:阻止好人做错事,而不是阻止坏人做坏事。HDFS相信,你告诉我你是谁,我就认为你是谁。
安全模式
-namenode启动的时候,首先将映射文件(fsimage)载入内存,并执行编辑日志(edits)中的各项操作。
-一旦在内存中成功建立文件系统元数据的映射,则创建一个新的fsimage文件(这个操作不需要SecondaryNameNode)和一个空的编辑日志。
-此刻namenode运行在安全模式。即namenode的文件系统对于客户端来说是只读的。(显示目录,显示文件内容等。写、删除、重命名都会失败)。
-在此阶段Namenode收集各个datanode的报告,当数据块达到最小副本数以上时,会被认为是“安全”的,在一定比例(可设置)的数据块被确定为“安全”后,再过若干时间,安全模式结束。
-当检测到副本数不足的数据块时,该块会被复制直到达到最小副本数,系统中数据的位置并不是由namenode维护的,而是以块列表形式存储在datanode中。
HDFS安装
-伪分布式安装
-完全分布式安装
下载
解压
检查java和ssh的免密码登陆
修改core-site.xml
修改hdfs-site.xml
修改 masters文件和slaves文件
格式化namenode
Start-hdfs.sh启动
***********10 11 12 13 14hadoop1.2.1 安装视频***************************
Hadoop核心组件-MR
-Hadoop分布式计算框架(MapReduce)
MapReduce设计理念
-何为分布式计算
-移动计算,而不是移动数据。
MapR
Hadoop计算框架Shuffler
-在mapper和reducer中间的一个步骤
-可以把mapper的输出按照某种key值重新切分和组合成n份,把key值符合某种范围的输出送到特定的reducer那里去处理
-可以简化reducer过程
Hadoop计算框架shuffle过程详解
-每个map task都有一个内存缓冲区(默认是100MB),存储着map的输出结果
-当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘(Spill)
-溢写是由单独线程来完成,不影响往缓冲区写map结果的线程(spill.percent,默认是0.8)
-当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)
input
|
|
V
|---------------|
|Runningmap task|
|---------------|
|
|
V
partition
|
|
V
|--------------|
|MemoryBuffer |
|--------------|
|
|
V
Spill:Sort & Combiner
|
|
V
|--------------|
|merge |
|--------------|
Hadoop计算框架shuffle过程详解
-假如client设置过Combiner,那么现在就是使用Combiner的时候了。将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。(reducel、word1、[8]).
-当整个map task结束后再对磁盘中这个map task产生的所有临时文件做合并(Merge),对于“word1”就是像这样的:{“world”,[5,8,2,...]},假如有Combiner,{world[15]},最终产生一个文件。
-reduce 从tasktracker copy数据
-copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活,它基于JVM的heap size设置
-merge有三种形式:1)内存到内存 2)内存到磁盘3)磁盘到磁盘。merge从不同tasktracker上拿到的数据,{word[15,17,2]}
-参考博客:
MapReduce是怎么解决负载均衡和数据倾斜问题的?
map不可能产生数据倾斜
reduce容易产生数据倾斜,设计好Partitions,就可以避免倾斜。
MapReduce的Split大小
-max.split(100M)
-min.split(10M)
-block(64M)
-max(min.split,min(max.split,block))
MapReduce的架构
-一主多从架构
-主JobTracker:
负责调度分配每一个子任务task运行于TaskTracker上,如果发现有失败的task就重新分配其任务到其他节点。每一个hadoop集群中只一个JobTracker一般它运行在
Master节点上。
-从TaskTracker
TaskTracker主动与JobTracker通信,接收作业,并负责直接执行每一个任务,为了减少网络带宽TaskTracker最好运行在HDFS的
DataNode上
程序实例:
WcMapper.java
import org.apache.hadoop.mapreduce.Mapper;
public class WcMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
//每次调用map方法会传入split中一行数据key:该行数据所在文件中的位置下标,value:这行数据
protected void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{
String line = value.toString();
StringTokenizer st = new StringTokenizer(value.toString());
while(st.hasMoreTokens()){
String world = st.nextToken();
context.write(new Text(world),IntWritable(1));//map的输出
}
}
}
public class WcReduce extends Reduce<Text,IntWritable,Text,IntWritable>{
protected void reduce(Text arg0,Iterable<IntWritable> itertable,Context context) throws IOException,InterruptedException{
int sum = 0;
for(IntWritable i : iterable){
sum = sum + i.get();
}
context.write(key,new IntWritable(sum));
}
}
JobRun.java
public class JobRun{
public static void main(String[] args){
Configuration conf = new Configuration();
conf.set("mapred.job.tracker","node1:9001");
try{
Job job = new Job(conf);
job.setJarByClass(JobRun.class);
job.setMapperClass(WcMapper.class);
job.setReducerClass(WcReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//job.setNumReduceTasks(1);//设置reduce任务的个数
//mapreduce 输入数据所在的目录或者文件
FileInputFormat.addInputPath(job,new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true)? 0 : 1);
}catch(Exception e){
e.printStackTrace();
}
}
}
执行:./hadoop jar /root/wc.jar com.bjsxt.mr.JobRun
--------------------------------------------
test.txt
hello hadoop world lucene abc
hdfs mapper reduce hadoop hello
world efg location
-------------------------------------
qq好友推荐
test2
hadoop hello
hdfs world
tom cat
cat dog
hello world
----------------------------
Test2Mapper.java
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class Test2Mapper extends Mapper<LongWritable,Text,Text,Text>{
protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
String line = value.toString();
StringTokenizer st = new StringTokenizer(line,"\t");
while(st.hasMoreTokens()){
String line = value.toString();
String[] ss = line.split("\t");
context.write(new Text(ss[0]),new Text(ss[1]));
context.write(new Text(ss[1]),new Text(ss[0]));
}
}
}
public class Test2Reduce extends Reducer<Text,Text,Text,Text>{
protected void reduce(Text key,Iterable<Text> i,Context arg2) throws IOException,InterruptedException{
Set<String> set = new HashSet<String>();
for(Text t:i){
set.add(t.toString());
}
if(set.size()>1){
for(Iterator j=set.iterator();j.hasNext();){
String name = (String)j.next();
for(int k = 0;k < set.iterator();k.hasNext();){
String other = (String)k.next();
if(!name.equals(other)){
arg2.write(new Text(name),new Text(other));
}
}
}
}
}
}
Hadoop2.0产生背景
-Hadoop 1.0中HDFS和MapReduce在高可用、扩展性等方面存在问题
-HDFS存在的问题
NameNode单点故障,难以应用于在线场景
NameNode压力过大,且内存受限,影响系统扩展性
MapRedue存在的问题
JobTracker访问压力大,影响系统扩展性
难以支持除MapReduce之外的计算框架,比如Spark、Storm等
-------------------------------------------
Hadoop 1.x 和 2.x |
|
HADOOP1.0 HADOOP2.0 |
MapReduce MapReduce Others |
HDFS YARN |
HDFS |
---------------------------------------- |
-Hadoop 2.x由HDFS、MapReduce和YARN三个分支构成;
HDFS:NN Federation、HA;
MapReduce:运行在YARN上的MR;
YARN:资源管理系统
HDFS 2.x
-解决HDFS1.0中单点故障和内存受限问题。
-解决单点故障
HDFS HA:通过主备NameNode解决
如果主NameNode发生故障,则切换到备NameNode上
-解决内存受限问题
HDFS Federation(联邦)
水平扩展,支持多个NameNode
每个NameNode分管一部分目录
所有NameNode共享所有DataNode存储资
-2.x仅是架构上发生了变化,使用方式不变
-对HDFS使用者透明
-HDFS 1.x中的命令和API扔可以使用
HDFS 2.0 HA
-主备NameNode
-解决单点故障
主NameNode对外提供服务,备NameNode同步主NameNode元数据,以待切换
所有DataNode同时向两个NameNode汇报数据块信息
-两种切换选择
手动切换:通过命令实现主备之间的切换,可以用HDFS升级等场合
自动切换:基于Zookeeper实现
-基于Zookeeper自动切换方案
Zookeeper FailoverController:监控NameNode健康状态,
并向Zookeeper注册NameNode
NameNode挂掉后,ZKFC为NameNode竞争锁,获得ZKFC锁的NameNode变为active
HDFS 2.x Federation
-通过多个namenode/namespace把元数据的存储和管理分散到多个节点中,使到namenode/namespace可以通过增加机器来进行水平扩展。
-能把单个namenode的负载分散到多个节点中,在HDFS数据规模较大的时候不会也降低HDFS的性能。可以通过多个namespace来隔离不同类型的应用,把不同类型应用的
HDFS元数据和管理分派到不同的namenode中。
YARN
-YARN:Yet Another Resource Negotiator;
-Hadoop 2.0新引入的资源管理系统,直接从MRv1演化而来的;
核心思想:将MRv1中JobTracker的资源管理和任务调度两个功能分开,分别由ResourceManager和ApplicationMaster进程实现
ResourceManager:负责整个集群的资源管理和调度
ApplicationMaster:负责应用程序相关的事务,比如任务调度,任务监控和容错等
-YARN的引入,使得多个计算框架可运行在一个集群中
每个应用程序对应一个ApplicationMaster
目前多个计算框架可以运行在YARN上,比如MapReduce、Spark、Storm等
MapReduce On YARN
-MapReduce On YARN:MRv2
-将MapReduce作业直接运行在YARN上,而不是由JobTracker和TaskTracker构建的MRv1系统中
-基本功能模块
YARN:负责资源管理和调度
MRAppMaster:负责任务切分、任务调度、任务监控和容错等
MapTask/ReduceTask:任务驱动引擎,与MRv1一致
-每个MapReduce作业对应一个MRAppMaster
MRAppMaster任务调度
YARN将资源分配给MRAppMaster
MRAppMaster进一步将资源分配给内部的任务
-MRAppMaster容错
失败后,由YARN重新启动
任务失败后,MRAppMaster重新申请资源
hadoop2.x 安装
vi /etc/hosts
scp /etc/hosts root@node2:/etc/hosts
scp /etc/hosts root@node3:/etc/hosts
scp /etc/hosts root@node4:/etc/hosts
scp ~/.ssh/id_dsa.pub root@node4:~/
免登陆:
ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
cat ~/.ssh/id_dsa.pub>>~/.ssh/authorized_keys
安装JDK
tar -zxvf /hadoop-2.5.1.tar
ln -sf /root/hadoop-2.5.1 /home/hadoop-2.5
./hadoop-daemon.sh start journalnode
jps
ls
cd /opt/hadoop2/
./stop-dfs.sh
./start-dfs.sh
./hdfs dfs -put /root/...
i.修改dataDir=/opt/zookeeper
ii. server.1=hadoop1:2888:3888
server.2=hadoop2:2888:3888
server.3=hadoop3:2888:3888
c) 在dataDir目录中创建一个myid的文件,文件内容为1、2、3
4、配置hadoop中的slaves
5、启动三个zookeper:./zkServer.sh start
6 启动三个JournalNode:./hadoop-daemon.sh start journalnode
7、在其中一个namenode上格式化:hdfs namenode -format
8、把刚刚格式化之后的元数据拷贝到另外一个namenode上
a)启动刚刚格式化的namenode
b)在没有格式化的namenode上执行:hdfs namenode -bootstrapStandby
c)启动第二个namenode
9、在其中一个namenode上初始化zkfc:hdfs zkfc -formatZK
10、停止上面节点:stop-dfs.sh
11、全面启动:start-dfs.sh
yarn-site.xml
<configuration>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>master</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
</configuration>
-------------------------------
datanode节点
jps
DataNode
Jps
QuorumPeerMain
JournalNode
NodeManager
-------------------------------
data.txt
1949-10-01 14:21:02/t34℃
1949-10-02 14:01:02/t36℃
1950-01-01 11:21:02/t32℃
1950-10-01 12:21:02/t37℃
1951-12-01 12:21:02/t23℃
1950-10-02 12:21:02/t41℃
1950-10-03 12:21:02/t27℃
1951-07-01 12:21:02/t45℃
1951-07-02 12:21:02/t46℃
----------------------------
readme.txt
1、计算在1949-1955年,每年温度最高的时间。
2、计算在1949-1955年,每年温度最高前十天。
思路:
1、按照年份的升序排序,同时每一年中温度降序排序。
2、按照年份分组,每一年对应一个reduce任务
mapper输出:key 为封装对象,
目的:
自定排序
自定义分区
自定义分组
------------------------------------------------------------------------------------------------------------
KeyPair.java
package com.wd;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class KeyPair implements WritableComparable<KeyPair>{
private int year;
private int hot;
public int getYear(){
return year;
}
public void setYear(int year){
this.year = year;
}
public int getHot(){
return hot;
}
public void setHot(int hot){
this.hot = hot;
}
public void readFields(DataInput in) throws IOException{
this.year = in.readInt();
this.hot = in.readInt();
}
public void write(DataOutput out) throws IOException{
out.writeInt(year);
out.writeInt(hot);
}
public int compareTo(KeyPair o){
int res = Integerpare(year,o.getYear());
if(res!=0){
return res;
}
return Integerpare(hot,o.getHot());
}
public String toString(){
return year + "\t" + hot;
}
}
--------------------------------------------------------------------------------------------------
SortHot.java
package com.wd;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class SortHot extends WritableComparator{
public SortHot(){
super(KeyPair.class,true);
}
public int compare(WritableComparable a,WritableComparable b){
KeyPair o1 = (KeyPair)a;
KeyPair o2 = (KeyPair)b;
int res = Integerpare(o1.getYear(),o2.getYear());
if(res!=0){
return res;
}
return -Integerpare(o1.getHot(),o2.getHot());//降序排序
}
}
---------------------------------------------------------------------------------------------------------
FirstPartition.java
package com.wd;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class FirstPartition extends Partitioner<KeyPair,Text>{
public int getPartition(KeyPair key,Text value,int num){
return (key.getYear()*127)%num;
}
}
-------------------------------------------------------------------------------------------------------------
GroupHot.java
package com.wd;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class GroupHot extends WritableComparator{
public GroupHot(){
super(KeyPair.class,true);
}
public int compare(WritableComparable a,WritableComparable b){
KeyPair o1 = (KeyPair) a;
KeyPair o2 = (KeyPair) b;
return Integerpare(o1.getYear(), o2.getYear());
}
}
---------------------------------------------------------------------------------------------------------------
RunJob.java
package com.wd;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//hadoop jar /home/cloudera/wd.jar com.wd.RunJob
public class RunJob{
public static SimpleDateFormat SDF = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
static class HotMapper extends Mapper<LongWritable,Text,KeyPair,Text>{
protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
String line = value.toString();
System.out.println("line=" + line);
System.out.println("℃");
System.out.println("---------------------------------------");
String[] ss = line.split("/t");
System.out.println("ss=" + ss.length);
if(ss.length==2){
try{
Date date = SDF.parse(ss[0]);
System.out.println(date);
Calendar c = Calendar.getInstance();
c.setTime(date);
int year = c.get(1);
System.out.println("ss[1]" + ss[1]);
String hot = ss[1].substring(0,ss[1].indexOf("℃"));
System.out.print("hot=" + hot);
KeyPair kp = new KeyPair();
kp.setYear(year);
kp.setHot(Integer.parseInt(hot));
context.write(kp,value);
}catch(Exception e){
e.printStackTrace();
}
}
}
}
static class HotReduce extends Reducer<KeyPair,Text,KeyPair,Text>{
protected void reduce(KeyPair kp,Iterable<Text> value,Context context)throws IOException,InterruptedException{
for(Text v:value)
context.write(kp,v);
}
}
public static void main(String[] args){
Configuration conf = new Configuration();
try{
Job job = new Job(conf);
job.setJobName("hot");
job.setJarByClass(RunJob.class);
job.setMapperClass(HotMapper.class);
job.setReducerClass(HotReduce.class);
job.setMapOutputKeyClass(KeyPair.class);
job.setMapOutputValueClass(Text.class);
job.setNumReduceTasks(4);//reduce数量
job.setPartitionerClass(FirstPartition.class);
job.setSortComparatorClass(SortHot.class);
job.setGroupingComparatorClass(GroupHot.class);
FileInputFormat.addInputPath(job,new Path("hdfs://192.168.1.198:8020/wd/input/"));
FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.1.198:8020/wd/output3/"));
System.exit(job.waitForCompletion(true)?0:1);
}catch(Exception e){
e.printStackTrace();
}
}
}
-------------------------------------------------------------------------------------------------------
hdfs dfs -cat /usr/output/hot/part-r-00000 | head -n1
1、在新浪微博中给九阳豆浆机打广告。广告精准推送。找到那些关注豆浆机的人,这些用户登录之后弹出九阳广告。并且安装关注度由高到低排序。
关注度权重公式
W = TF * Log(N/DF)
TF:当前关键字在该片微博内容中出现次数
DF:当前的关键字在所有微博中出现微博条数,比如:九阳,在某条微博中出现了4次。指计算为一条。
N:微博总条数。
广告推送实例:
FirstReduce.java
//第一个job输出结果为:
//九阳-001 2
//九阳-002 1
//count 19223
public class FirstReduce extends Reducer<Text,IntWritable,Text,IntWritable>{
protected void reduce(Text arg0,Iterable<IntWritable> arg1,Context arg2) throws IOException,InterruptedException{
int sum = 0;
for(IntWritable i : arg1){
sum = sum + i.get();
}
if(arg0.equals(new Text("count"))){
System.out.println(arg0.toString() + "_____________" + sum)
}
arg2.write(arg0,new IntWritable(sum))
}
}
---------------------------------------------
FirstPartition.java
/**
* 如果key为count 单独一个分区
* 其他的key:平均分配三个区
* @author root
*
*/
public class FirstPartition extends HashPartitioner<Text,IntWritable>{
public int getPartition(Text key,IntWritable value,int reduceCount){
if(key.equals(new Text("count")))
return 3;
else
return super.getPartition(key,value,reduceCount-1);
}
}
---------------------------------------------
TwoMapper.java
//统计每个词的DF
public class TwoMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
FileSplit fs = (FileSplit)context.getInputSplit();
if(!fs.getPath().getName().contains("part-r-00003")){
String[] v = value.toString().trim().split("\t");
if(v.length >= 2){
String[] ss = v[0].split("_");
if(ss.length >= 2){
String w = ss[0];
context.write(new Text(w),new IntWritable(1));
}
}else{
System.out.println(value.toString() + "----------------------");
}
}
}
}
---------------------------------------
TwoJob.java
public class TwoJob{
public static void main(String[] args){
Configuration config = new Configuration();
config.set("yarn.resourcemanager,hostname","192.168.1.198");
try{
Job job = new Job(config);
job.setJarByClass(TwoJob.class);
job.setJobName("webo2");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(TwoMapper.class);
job.setCombinerClass(TwoReduce.class);
job.setReduceClass(TwoReduce.class);
FileInputFormat.addInputPath(job,new Path("/usr/weibo/output"));
FileOutputFormat.setOutputPath(job,new Path("/usr/weibo/output"));
boolean f = job.waitForCompletion(true);
if(f){
System.out.println("执行job成功");
}
}catch(Exception e){
e.printStackTrace();
}
}
}
--------------------------------------------------
LastMapper.java
public class LastMapper extends Mapper<LongWritable,Text,Text,Text>{
public static Map<String,Integer> cmap = null;
public static Map<String,Integer> df = null;
//在map方法执行之前
protected void setup(Context context) throws IOException,InterruptedException{
if(cmap == null || cmap.size() == 0 || df == null || df.size())
URI[] ss = context.getCacheFiles();
if(ss != null){
for(int i=0;i<ss.length;i++){
URI uri = ss[i];
if(uri.getPath().endsWith("part-r-00003")){
Path path = new Path(uri.getPath());
BufferedReader br = new BufferedReader(new FileRead());
String line = br.readLine();
if(line.startsWith("count")){
String[] ls = line.split("\t");
cmap = new HashMap<String,Integer>();
cmap.put(ls[0],Integer.parseInt(ls[1].trim()))
}
br.close();
}else if(uri.getPath().endsWith("part-r-00000")){
df = new HashMap<String,Integer>();
Path path = new Path(uri.getPath());
BufferedReader br = new BufferedReader(new FileReader());
String line;
while((line = br.readLine()) != null){
String[] ls = line.split("\t");
df.put(ls[0],Integer.parseInt(ls[1],trim()));
}
br.close();
}
}
}
}
protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
FileSplit fs = (FileSplit)context.getInputSplit();
if(!fs.getPath().getName().contains("part-r-00003")){
String[] v = value.toString().trim().split("\t");
if(v.length >= 2){
int tf = Integer.parseInt(v[1].trim());
String[] ss = v[0].split("_");
if(ss.length >= 2){
String w = ss[0];
String id = ss[1];
double s = tf*Math.log(cmap.get("count")/df.get(w));
NumberFormat nf = NumberFormat.getInstance();
nf.setMaximumFractionDigits(5);
context.write(new Text(id),new Text(w + ":" +nf.format))
}
}else{
System.out.println(value.toString() + "--------------------------");
}
}
}
}
-------------------------------------------
LastReduce.java
//第二个job输出的结果:
九阳 2223
豆浆 2344
LastJob.java
public static void main(String[] args){
Configuration config = new Configuration();
config.set("yarn.resourcemanager","192.168.1.198");
try{
Job job = new Job(config);
job.setJarByClass(LastJob.class);
job.setJobName("weibo3");
job.addCacheFile(new Path("/usr/weibo/output1/part-r-00003"));
job.addCacheFile(new Path("/usr/weibo/output1/part-r-00000"));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
}
}
-----------------------hive------------------
Hive
Hive:数据仓库
Hive:解释器、编译器、优化器等
Hive运行时,元数据存储在关系型数据库里面。
1、Hive数据类型
hive -e 直接执行sql
hive -f /home/my/hive-script.sql 执行sql文件
hive -S -e 'select a.col from tab1 a' > a.txt 把结果重定向到一个文件中
create database_name
drop database_name
use database_name
yum install mysql-server
service mysqld start
netstat -nplt | grep 3306
create table t_emp(id int,name string,age int,dept_name string)
row format delimited
fields terminated by ',';
desc tablename;
create table dept_count(num int) partitioned by(dname string);
insert into table dept_count partition(dname='销售部') select dept_name,count(1) from t_emp where dname = '销售部' group by dept_name;
export table t_temp to '/usr/input/emp.txt';
create table t_stu(
userid int,
name string,
age int,
sex int,classid int)
row format delimited fields terminated by ','
stored as textfile;
)
create table t_class(
cid int,
name string,
teacher string)
row format delimited fields terminated by ','
stored as textfile;
load data inpath '/pub/student.txt' into table t_stu;
---------WordCount.java--------------------------------------------------------------------------------------------
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount{
public static class TokenizerMapper extends Mapper<Object,Text,Text,IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()){
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
private IntWritable result = new IntWritable();
public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
int sum = 0;
for(IntWritable val : values){
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
//conf.set("mapred.job.tracker", "192.168.1.30:8022");
String[] ioArgs = new String[] { "hdfs://192.168.1.199:8020/input", "hdfs://192.168.1.199:8020/output1"};
String[] otherArgs = new GenericOptionsParser(conf,ioArgs).getRemainingArgs();
if(otherArgs.length !=2){
System.out.println("Usage:wordcount<in><out>");
System.exit(2);
}
Job job = new Job(conf,"word count");
/* job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);*/
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
---------------------------------------开发实战-------------------------------------------------------------------------------------------------
下面介绍MapReduce的主要的六个类,只有了解了这六个类的作用,才能在编写程序中知道哪个类是要自己实现,哪些类可以调用默认的类,才能真正的做到游刃有余,关于需要自己编写的类(用户制定类)可以参考:.html
1、InputFormat类。该类的作用是将输入的文件和数据分割成许多小的split文件,并将split的每个行通过LineRecorderReader解析成<Key,Value>,通过job.setInputFromatClass()函数来设置,默认的情况为类TextInputFormat,其中Key默认为字符偏移量,value是该行的值。
2、Map类。根据输入的<Key,Value>对生成中间结果,默认的情况下使用Mapper类,该类将输入的<Key,Value>对原封不动的作为中间按结果输出,通过job.setMapperClass()实现。实现Map函数。
3、Combine类。实现combine函数,该类的主要功能是合并相同的key键,通过job.setCombinerClass()方法设置,默认为null,不合并中间结果。实现map函数
4、Partitioner类。 该该主要在Shuffle过程中按照Key值将中间结果分成R份,其中每份都有一个Reduce去负责,可以通过job.setPartitionerClass()方法进行设置,默认的使用hashPartitioner类。实现getPartition函数
5、Reducer类。 将中间结果合并,得到中间结果。通过job.setReduceCalss()方法进行设置,默认使用Reducer类,实现reduce方法。
6、OutPutFormat类,该类负责输出结果的格式。可以通过job.setOutputFormatClass()方法进行设置。默认使用TextOUtputFormat类,得到<Key,value>对。
note:hadoop主要是上面的六个类进行mapreduce操作,使用默认的类,处理的数据和文本的能力很有限,具体的项目中,用户通过改写这六个类(重载六个类)。
public static void main(String[] args)throws IOException {
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(Mapper.class);
job.setCombinerClass(null);
job.setPartitionerClass(HashPartitioner.class);
job.setReducerClass(Reducer.class);
job.setOutputFormatClass(TextOutFormat.class);
}
-------------------MapReduce,组合式,迭代式,链式--------------------------------------------------------------------------------------------------------
一些复杂的任务难以用一次mapreduce处理完成,需要多次mapreduce才能完成任务,例如Pagrank,Kmeans算法都需要多次的迭代,关于mapreduce迭代在mahout中运用较多。有兴趣的可以参考一下mahout的源码。
在map/reduce迭代过程中,思想还是比较简单,就像类似for循环一样,前一个mapreduce的输出结果,作为下一个mapreduce的输入,任务完成后中间结果都可以删除。如代码所以:
Configuration conf1 = new Configuration();
Job job1 = new Job(conf1,"job1");
.....
FileInputFormat.addInputPath(job1,InputPaht1);
FileOutputFromat.setOoutputPath(job1,Outpath1);
job1.waitForCompletion(true);
//sub Mapreduce
Configuration conf2 = new Configuration();
Job job2 = new Job(conf1,"job1");
.....
FileInputFormat.addInputPath(job2,Outpath1);
FileOutputFromat.setOoutputPath(job2,Outpath2);
job2.waitForCompletion(true);
//sub Mapreduce
Configuration conf3 = new Configuration();
Job job3 = new Job(conf1,"job1");
.....
FileInputFormat.addInputPath(job3,Outpath2);
FileOutputFromat.setOoutputPath(job3,Outpath3);
job3.waitForCompletion(true);
.....
下面列举一个mahout怎样运用mapreduce迭代的,下面的代码快就是mahout中kmeans的算法的代码,在main函数中用一个while循环来做mapreduce的迭代,其中:runIteration()是一次mapreduce的过程。
但个人感觉现在的mapreduce迭代设计不太满意的地方。
1. 每次迭代,如果所有Job(task)重复创建,代价将非常高。
2.每次迭代,数据都写入本地和读取本地,I/O和网络传输的代价比较大。
好像Twister和Haloop的模型能过比较好的解决这些问题,但他们抽象度不够高,支持的计算有限。
期待着下个版本hadoop更好的支持迭代算法。
//main function
while (!converged && iteration <= maxIterations) {
log.info("K-Means Iteration {}", iteration);
// point the output to a new directory per iteration
Path clustersOut = new Path(output, AbstractCluster.CLUSTERS_DIR + iteration);
converged = runIteration(conf, input, clustersIn, clustersOut, measure.getClass().getName(), delta);
// now point the input to the old output directory
clustersIn = clustersOut;
iteration++;
}
private static boolean runIteration(Configuration conf,
Path input,
Path clustersIn,
Path clustersOut,
String measureClass,
String convergenceDelta)
throws IOException, InterruptedException, ClassNotFoundException {
conf.set(KMeansConfigKeys.CLUSTER_PATH_KEY, clustersIn.toString());
conf.set(KMeansConfigKeys.DISTANCE_MEASURE_KEY, measureClass);
conf.set(KMeansConfigKeys.CLUSTER_CONVERGENCE_KEY, convergenceDelta);
Job job = new Job(conf, "KMeans Driver running runIteration over clustersIn: " + clustersIn);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(ClusterObservations.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Cluster.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapperClass(KMeansMapper.class);
job.setCombinerClass(KMeansCombiner.class);
job.setReducerClass(KMeansReducer.class);
FileInputFormat.addInputPath(job, input);
FileOutputFormat.setOutputPath(job, clustersOut);
job.setJarByClass(KMeansDriver.class);
HadoopUtil.delete(conf, clustersOut);
if (!job.waitForCompletion(true)) {
throw new InterruptedException("K-Means Iteration failed processing " + clustersIn);
}
FileSystem fs = FileSystem.get(clustersOut.toUri(), conf);
return isConverged(clustersOut, conf, fs);
}
----------2.依赖关系组合式MapReduce----------------------------------------------
我们可以设想一下MapReduce有3个子任务job1,job2,job3构成,其中job1和job2相互独立,job3要在job1和job2完成之后才执行。这种关系就叫复杂数据依赖关系的组合时mapreduce。hadoop为这种组合关系提供了一种执行和控制机制,hadoop通过job和jobControl类提供具体的编程方法。Job除了维护子任务的配置信息,还维护子任务的依赖关系,而jobControl控制整个作业流程,把所有的子任务作业加入到JobControl中,执行JobControl的run()方法即可运行程序。
Configuration job1conf = new Configuration();
Job job1 = new Job(job1conf,"Job1");
.........//job1 其他设置
Configuration job2conf = new Configuration();
Job job2 = new Job(job2conf,"Job2");
.........//job2 其他设置
Configuration job3conf = new Configuration();
Job job3 = new Job(job3conf,"Job3");
.........//job3 其他设置
job3.addDepending(job1);//设置job3和job1的依赖关系
job3.addDepending(job2);
JobControl JC = new JobControl("123");
JC.addJob(job1);//把三个job加入到jobcontorl中
JC.addJob(job2);
JC.addJob(job3);
JC.run();
----------3.链式MapReduce---------------------------
首先看一下例子,来说明为什么要有链式MapReduce,假设在统计单词是,会出现这样的词,make,made,making等,他们都属于一个词,在单词累加的时候,都归于一个词。解决的方法为用一个单独的Mapreduce任务可以实现,单增加了多个Mapreduce作业,将增加整个作业处理的周期,还增加了I/O操作,因而处理效率不高。
一个较好的办法就是在核心的MapReduce之外,增加一个辅助的Map过程,然后将这个辅助的Map过程和核心的Mapreudce过程合并为一个链式的Mapreduce,从而完成整个作业。hadoop提供了专门的链式ChainMapper和ChainReducer来处理链式任务,ChainMapper允许一个Map任务中添加多个Map的子任务,ChainReducer可以在Reducer执行之后,在加入多个Map的子任务。其调用形式如下:
ChainMapper.addMapper(...);
ChainReducer.addMapper(...);
//addMapper()调用的方法形式如下:
public static void addMapper(JOb job,
Class<? extends Mapper> mclass,
Class<?> inputKeyClass,
Class<?> inputValueClass,
Class<?> outputKeyClass,
Class<?> outputValueClass,
Configuration conf
){
}
其中,ChainReducer专门提供了一个setRreducer()方法来设置整个作业唯一的Reducer。
note:这些Mapper和Reducer之间传递的键和值都必须保持一致。
下面举个例子:用ChainMapper把Map1加如并执行,然后用ChainReducer把Reduce和Map2加入到Reduce过程中。代码如下:Map1.class 要实现map方法
public void function throws IOException {
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJobName("ChianJOb");
// 在ChainMapper里面添加Map1
Configuration map1conf = new Configuration(false);
ChainMapper.addMapper(job, Map1.class, LongWritable.class, Text.class,
Text.class, Text.class, true, map1conf);
// 在ChainReduce中加入Reducer,Map2;
Configuration reduceConf = new Configuration(false);
ChainReducer.setReducer(job, Reduce.class, LongWritable.class,
Text.class, Text.class, Text.class, true, map1conf);
Configuration map2Conf = new Configuration();
ChainReducer.addMapper(job, Map2.class, LongWritable.class, Text.class,
Text.class, Text.class, true, map1conf);
job.waitForCompletion(true);
}
===================================================================================================================================
---------------MapReduce,DataJoin,链接多数据源-
主要介绍用DataJoin类来链接多数据源,先看一下例子,假设二个数据源customs和orders
customer ID Name PhomeNumber
1 赵一 025-5455-566
2 钱二 025-4587-565
3 孙三 021-5845-5875
客户的订单号:
Customer ID order ID Price Data
2 1 93 2008-01-08
3 2 43 2012-01-21
1 3 43 2012-05-12
2 4 32 2012-5-14
问题:现在要生成订单
customer ID name PhomeNumber Price Data
2 钱二 025-4587-565 93 2008-01-08
上面是一个例子,下面介绍一下hadoop中DataJoin类具体的做法。
首先,需要为不同数据源下的每个数据定义一个数据标签,这一点不难理解,就是标记数据的出处。
其次,需要为每个待链接的数据记录确定一个链接主键,这一点不难理解。DataJoin类库分别在map阶段和Reduce阶段提供一个处理框架,尽可能帮助程序员完成一些处理的工作,仅仅留下一些必须工作,由程序完成。
Map阶段
DataJoin类库里有一个抽象基类DataJoinMapperBase,该基类实现了map方法,该方法为对每个数据源下的文本的记录生成一个带表见的数据记录对象。但是程序必须指定它是来自于哪个数据源,即Tag,还要指定它的主键是什么即GroupKey。如果指定了Tag和GroupKey,那么map将会生成一下的记录,customer表为例
customers 1 赵一 025-5455-566; customers 2 钱二 025-4587-565;
Map过程中Tag和GroupKey都是程序员给定,所以要肯定要就有接口供程序员去实现,DataJoinMapperBase实现下面3个接口。
abstract Text gernerateInputTag(String inuptFile), 看方法名就知道是设置Tag。
abstract Text generateGroupKey(TaggedMapOutput lineRecord), 该方法是设置GroupKey,其中,lineRecord是数据源中的一行数据,该方法可以在这一行数据上设置任意的GroupKey为主键。
abstract TaggedMapOutput generateMapOutput(object value), 该抽象方法用于把数据源中的原始数据记录包装成一个带标签的数据源。TaggedMapOutputs是一行记录的数据类型。代码如下:
import org.apache.hadoop.contrib.utils.join.*;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.io.Text;
public class MapClass extends DataJoinMapperBase{
@Override
protected Text generateGroupKey(TaggedMapOutput arg0) {
String line = ((Text)arg0.getData()).toString();
String[] tokens = line.split(",");
String groupKey = tokens[0];
return new Text(groupKey);
}
@Override
protected Text generateInputTag(String arg0) {
String dataSource = arg0.split("-")[0];
return new Text(dataSource);
}
@Override
protected TaggedMapOutput generateTaggedMapOutput(Object arg0) {
TaggedWritable tw = new TaggedWritable((Text)arg0);
tw.setTag(this.inputTag);
return tw;
}
}
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
public class TaggedWritable extends TaggedMapOutput{
private Writable data;
public TaggedWritable(Writable data) {
this.tag = new Text("");
this.data = data;
}
@Override
public Writable getData() {
return data;
}
@Override
public void readFields(DataInput arg0) throws IOException {
this.tag.readFields(arg0);
this.data.readFields(arg0);
}
@Override
public void write(DataOutput arg0) throws IOException {
this.tag.write(arg0);
this.data.write(arg0);
}
}
每个记录的数据源标签可以由generateInputTag()产生,通过setTag()方法设置记录的Tag。
note:1.该记录不是关系数据库,是文本文件,2. TaggedMapOutput在import org.apache.hadoop.contrib.utils.join.*头文件中,有的时候在eclipse下,每个这个头文件,这时 只要找到你的hadoop的目录下contrib/datajoin文件加,把jar文件导入eclipse中即可。
Reduce 阶段
DataJoinReduceBase中已经实现reduce()方法,具有同一GroupKey的数据分到同一Reduce中,通过reduce的方法将对来自不同的数据源和据用相同的GroupKey做一次叉积组合。这个比较难懂,举个例子:
customers 2 钱二 025-4587-565;
orders 2 1 93 2008-01-08;
orders 2 4 32 2012-5-14
按照map()结果的数据,就是下表给出的结果(3个记录),他们都有一个共同的GroupKey,带来自于二个数据源,所以叉积的结果为
customers 2 钱二 025-4587-565
orders 2 1 93 2008-01-08
customers 2 钱二 025-4587-565
orders 2 4 32 2012-5-14
如果Reduce阶段看懂了,基本上这个就搞定了,Reduce是系统做的,不需要用户重载,接下来的工作就是要实现一个combine()函数,它的作用是将每个叉积合并起来,形成订单的格式。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ReduceClass extends DataJoinReducerBase{
@Override
protected TaggedMapOutput combine(Object[] tags, Object[] values) {
if(tags.length<2)return null;
StringBuffer joinData = new StringBuffer();
int count=0;
for(Object value: values){
joinData.append(",");
TaggedWritable tw = (TaggedWritable)value;
String recordLine = ((Text)tw.getData()).toString();
String[] tokens = recordLine.split(",",2);
if(count==0) joinData.append(tokens[0]);
joinData.append(tokens[1]);
}
TaggedWritable rtv = new TaggedWritable(new Text(new String(joinData)));
rtv.setTag((Text)tags[0]);
return rtv;
}
public static void main(String[] args){
Configuration conf = new Configuration();
JobConf job = new JobConf(conf, ReduceClass.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setJobName("DataJoin");
job.setMapperClass(MapClass.class);
job.setReducerClass(ReduceClass.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TaggedWritable.class);
job.set("mapred.textoutputformat.separator", ",");
JobClient.runJob(job);
}
}
=========================================================================================================
---------------------Hadoop,MapReduce操作Mysql----------------------------
这一个博客介绍一下MapReduce怎样读取关系数据库的数据,选择的关系数据库为MySql,因为它是开源的软件,所以大家用的比较多。以前上学的时候就没有用过开源的软件,直接用盗版,也相当与免费,且比开源好用,例如向oracle,windows7等等。现在工作了,由于公司考虑成本的问题,所以都用成开源的,ubuntu,mysql等,本人现在支持开源,特别像hadoop这样的东西,真的太好了,不但可以使用软件,也可以读到源代码。话不说多了。
hadoop技术推出一首曾遭到关系数据库研究者的挑衅和批评,认为MapReduce不具有关系数据库中的结构化数据存储和处理能力。为此,hadoop社区和研究人员做了多的努力,在hadoop0.19版支持MapReduce访问关系数据库,如:mysql,MySQL、PostgreSQL、Oracle 等几个数据库系统。
1. 从Mysql读出数据
Hadoop访问关系数据库主要通过一下接口实现的:DBInputFormat类,包所在位置:org.apache.hadoop.mapred.lib.db 中。DBInputFormat 在 Hadoop 应用程序中通过数据库供应商提供的 JDBC接口来与数据库进行交互,并且可以使用标准的 SQL 来读取数据库中的记录。学习DBInputFormat首先必须知道二个条件。
在使用 DBInputFormat 之前,必须将要使用的 JDBC 驱动拷贝到分布式系统各个节点的$HADOOP_HOME/lib/目录下。
MapReduce访问关系数据库时,大量频繁的从MapReduce程序中查询和读取数据,这大大的增加了数据库的访问负载,因此,DBInputFormat接口仅仅适合读取小数据量的数据,而不适合处理数据仓库。要处理数据仓库的方法有:利用数据库的Dump工具将大量待分析的数据输出为文本,并上传的Hdfs中进行处理,处理的方法可参考:.html
DBInputFormat 类中包含以下三个内置类
protected class DBRecordReader implementsRecordReader<LongWritable, T>:用来从一张数据库表中读取一条条元组记录。
2.public static class NullDBWritable implements DBWritable,Writable:主要用来实现 DBWritable 接口。DBWritable接口要实现二个函数,第一是write,第二是readFileds,这二个函数都不难理解,一个是写,一个是读出所有字段。原型如下:
public void write(PreparedStatement statement) throwsSQLException;
public void readFields(ResultSet resultSet) throws SQLException;
protected static class DBInputSplit implements InputSplit:主要用来描述输入元组集合的范围,包括 start 和 end 两个属性,start 用来表示第一条记录的索引号,end 表示最后一条记录的索引号.
下面对怎样使用 DBInputFormat 读取数据库记录进行详细的介绍,具体步骤如下:
DBConfiguration.configureDB (JobConf job, StringdriverClass, String dbUrl, String userName, String passwd)函数,配置JDBC 驱动,数据源,以及数据库访问的用户名和密码。MySQL 数据库的 JDBC 的驱动为“com.mysql.jdbc.Driver”,数据源为“jdbc:mysql://localhost/testDB”,其中testDB为访问的数据库。useName一般为“root”,passwd是你数据库的密码。
DBInputFormat.setInput(JobConf job, Class<?extends DBWritable> inputClass, String tableName, String conditions,String orderBy, String... fieldNames),这个方法的参数很容易看懂,inputClass实现DBWritable接口。,string tableName表名, conditions表示查询的条件,orderby表示排序的条件,fieldNames是字段,这相当与把sql语句拆分的结果。当然也可以用sql语句进行重载。etInput(JobConf job, Class<?extends DBWritable> inputClass, String inputQuery, StringinputCountQuery)。
编写MapReduce函数,包括Mapper 类、Reducer 类、输入输出文件格式等,然后调用JobClient.runJob(conf)。
上面讲了理论,下面举个例子:假设 MySQL 数据库中有数据库student,假设数据库中的字段有“id”,“name”,“gender","number"。
第一步要实现DBwrite和write数据接口。代码如下:
复制代码
public class StudentRecord implements Writable, DBWritable{
int id;
String name;
String gender;
String number;
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
this.id = in.readInt();
this.gender = Text.readString(in);
this.name = in.readString();
this.number = in.readString();
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeInt(this.id);
Text.writeString(out,this.name);
out.writeInt(this.gender);
out.writeInt(this.number);
}
@Override
public void readFields(ResultSet result) throws SQLException {
// TODO Auto-generated method stub
this.id = result.getInt(1);
this.name = result.getString(2);
this.gender = result.getString(3);
this.number = result.getString(4);
}
@Override
public void write(PreparedStatement stmt) throws SQLException{
// TODO Auto-generated method stub
stmt.setInt(1, this.id);
stmt.setString(2, this.name);
stmt.setString(3, this.gender);
stmt.setString(4, this.number);
}
@Override
public String toString() {
// TODO Auto-generated method stub
return new String(this.name + " " + this.gender + " " +this.number);
}
复制代码
第二步,实现Map和Reduce类
复制代码
public class DBAccessMapper extends MapReduceBase implements
Mapper<LongWritable, TeacherRecord, LongWritable, Text> {
@Override
public void map(LongWritable key, TeacherRecord value,
OutputCollector<LongWritable, Text> collector, Reporter reporter)
throws IOException {
// TODO Auto-generated method stub
new collector.collect(new LongWritable(value.id), new Text(value
.toString()));
}
}
复制代码
第三步:主函数的实现,函数
复制代码
public class DBAccessReader {
public static void main(String[] args) throws IOException {
JobConf conf = new JobConf(DBAccessReader.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
conf.setInputFormat(DBInputFormat.class);
FileOutputFormat.setOutputPath(conf, new Path("dboutput"));
DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver",
"jdbc:mysql://localhost/school","root","123456");
String [] fields = {"id", "name", "gender", "number"};
DBInputFormat.setInput(conf, StudentRecord.class,"Student",null "id", fields);
conf.setMapperClass(DBAccessMapper.class);
conf.setReducerClass(IdentityReducer.class);
JobClient.runJob(conf);
}
}
复制代码
2.写数据
往往对于数据处理的结果的数据量一般不会太大,可能适合hadoop直接写入数据库中。hadoop提供了相应的数据库直接输出的计算发结果。
DBOutFormat: 提供数据库写入接口。
DBRecordWriter:提供向数据库中写入的数据记录的接口。
DBConfiguration:提供数据库配置和创建链接的接口。
DBOutFormat提供一个静态方法setOutput(job,String table,String ...filedNames);该方法的参数很容易看懂。假设要插入一个Student的数据,其代码为
复制代码
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
JobConf conf = new JobConf();
conf.setOutputFormat(DBOutputFormat.class);
DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver",
"jdbc:mysql://localhost/school","root","123456");
DBOutputFormat.setOutput(conf,"Student", 456, "liqizhou", "man", "20004154578");
JobClient.runJob(conf);
===================================================================================================================
-------------MapReduce操作HBase--------------------------------------------------------
运行HBase时常会遇到个错误,我就有这样的经历。
ERROR: org.apache.hadoop.hbase.MasterNotRunningException: Retried 7 times
检查日志:org.apache.hadoop.ipc.RPC$VersionMismatch: Protocol org.apache.hadoop.hdfs.protocol.ClientProtocol version mismatch. (client = 42, server = 41)
如果是这个错误,说明RPC协议不一致所造成的,解决方法:将hbase/lib目录下的hadoop-core的jar文件删除,将hadoop目录下的hadoop-0.20.2-core.jar拷贝到hbase/lib下面,然后重新启动hbase即可。第二种错误是:没有启动hadoop,先启用hadoop,再启用hbase。
在Eclipse开发中,需要加入hadoop所有的jar包以及HBase二个jar包(hbase,zooKooper)。
HBase基础可见帖子:.html
建表,通过HBaseAdmin类中的create静态方法来创建表。
HTable类是操作表,例如,静态方法put可以插入数据,该类初始化时可以传递一个行键,静态方法getScanner()可以获得某一列上的所有数据,返回Result类,Result类中有个静态方法getFamilyMap()可以获得以列名为key,值为value,这刚好与hadoop中map结果是一样的。
复制代码
package test;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
public class Htable {
/**
* @param args
*/
public static void main(String[] args) throws IOException {
// TODO Auto-generated method stub
Configuration hbaseConf = HBaseConfiguration.create();
HBaseAdmin admin = new HBaseAdmin(hbaseConf);
HTableDescriptor htableDescriptor = new HTableDescriptor("table"
.getBytes()); //set the name of table
htableDescriptor.addFamily(new HColumnDescriptor("fam1")); //set the name of column clusters
admin.createTable(htableDescriptor); //create a table
HTable table = new HTable(hbaseConf, "table"); //get instance of table.
for (int i = 0; i < 3; i++) { //for is number of rows
Put putRow = new Put(("row" + i).getBytes()); //the ith row
putRow.add("fam1".getBytes(), "col1".getBytes(), "vaule1"
.getBytes()); //set the name of column and value.
putRow.add("fam1".getBytes(), "col2".getBytes(), "vaule2"
.getBytes());
putRow.add("fam1".getBytes(), "col3".getBytes(), "vaule3"
.getBytes());
table.put(putRow);
}
for(Result result: table.getScanner("fam1".getBytes())){//get data of column clusters
for(Map.Entry<byte[], byte[]> entry : result.getFamilyMap("fam1".getBytes()).entrySet()){//get collection of result
String column = new String(entry.getKey());
String value = new String(entry.getValue());
System.out.println(column+","+value);
}
}
admin.disableTable("table".getBytes()); //disable the table
admin.deleteTable("table".getBytes()); //drop the tbale
}
}
复制代码
以上代码不难看懂。
下面介绍一下,用mapreduce怎样操作HBase,主要对HBase中的数据进行读取。
现在有一些大的文件,需要存入HBase中,其思想是先把文件传到HDFS上,利用map阶段读取<key,value>对,可在reduce把这些键值对上传到HBase中。
复制代码
package test;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MapperClass extends Mapper<LongWritable,Text,Text,Text>{
public void map(LongWritable key,Text value,Context context)thorws IOException{
String[] items = value.toString().split(" ");
String k = items[0];
String v = items[1];
context.write(new Text(k), new Text(v));
}
}
复制代码
Reduce类,主要是将键值传到HBase表中
复制代码
package test;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.Text;
public class ReducerClass extends TableReducer<Text,Text,ImmutableBytesWritable>{
public void reduce(Text key,Iterable<Text> values,Context context){
String k = key.toString();
StringBuffer str=null;
for(Text value: values){
str.append(value.toString());
}
String v = new String(str);
Put putrow = new Put(k.getBytes());
putrow.add("fam1".getBytes(), "name".getBytes(), v.getBytes());
}
}
复制代码
由上面可知ReducerClass继承TableReduce,在hadoop里面ReducerClass继承Reducer类。它的原型为:TableReducer<KeyIn,Values,KeyOut>可以看出,HBase里面是读出的Key类型是ImmutableBytesWritable。
Map,Reduce,以及Job的配置分离,比较清晰,mahout也是采用这种构架。
复制代码
package test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
public class Driver extends Configured implements Tool{
@Override
public static void run(String[] arg0) throws Exception {
// TODO Auto-generated method stub
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum.", "localhost");
Job job = new Job(conf,"Hbase");
job.setJarByClass(TxtHbase.class);
Path in = new Path(arg0[0]);
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.addInputPath(job, in);
job.setMapperClass(MapperClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
TableMapReduceUtil.initTableReducerJob("table", ReducerClass.class, job);
job.waitForCompletion(true);
}
}
复制代码
Driver中job配置的时候没有设置 job.setReduceClass(); 而是用 TableMapReduceUtil.initTableReducerJob("tab1", THReducer.class, job); 来执行reduce类。
主函数
复制代码
package test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
public class TxtHbase {
public static void main(String [] args) throws Exception{
Driver.run(new Configuration(),new THDriver(),args);
}
}
复制代码
读取数据时比较简单,编写Mapper函数,读取<key,value>值就行了。
复制代码
package test;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableMap;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class MapperClass extends MapReduceBase implements
TableMap<Text, Text> {
static final String NAME = "GetDataFromHbaseTest";
private Configuration conf;
public void map(ImmutableBytesWritable row, Result values,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
StringBuilder sb = new StringBuilder();
for (Entry<byte[], byte[]> value : values.getFamilyMap(
"fam1".getBytes()).entrySet()) {
String cell = value.getValue().toString();
if (cell != null) {
sb.append(new String(value.getKey())).append(new String(cell));
}
}
output.collect(new Text(row.get()), new Text(sb.toString()));
}
复制代码
要实现这个方法 initTableMapJob(String table, String columns, Class<? extends TableMap> mapper, Class<? extends org.apache.hadoop.io.WritableComparable> outputKeyClass, Class<? extends org.apache.hadoop.io.Writable> outputValueClass, org.apache.hadoop.mapred.JobConf job, boolean addDependencyJars)。
复制代码
package test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
public class Driver extends Configured implements Tool{
@Override
public static void run(String[] arg0) throws Exception {
// TODO Auto-generated method stub
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum.", "localhost");
Job job = new Job(conf,"Hbase");
job.setJarByClass(TxtHbase.class);
job.setInputFormatClass(TextInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
TableMapReduceUtilinitTableMapperJob("table", args0[0],MapperClass.class, job);
job.waitForCompletion(true); }
}
复制代码
主函数
复制代码
package test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
public class TxtHbase {
public static void main(String [] args) throws Exception{
Driver.run(new Configuration(),new THDriver(),args);
}
}
=====================================================================================================================================
发布评论