MapReduceReduce端join与Map端Join算法实现
文章目录
- 1、reduce端join算法实现
- 2 map端join算法实现
1、reduce端join算法实现
1、需求:
订单数据表t_order:
id | date | pid | amount |
---|---|---|---|
1001 | 20150710 | P0001 | 2 |
1002 | 20150710 | P0001 | 3 |
1002 | 20150710 | P0002 | 3 |
商品信息表t_product
id | pname | category_id | price |
---|---|---|---|
P0001 | 小米5 | 1000 | 2000 |
P0002 | 锤子T1 | 1000 | 3000 |
假如数据量巨大,两表的数据是以文件的形式存储在HDFS中,需要用mapreduce程序来实现一下
SQL查询运算:
select a.id,a.date,b.name,b.category_id,b.price from t_order a join t_product b on a.pid = b.id
- 第一步:定义OrderBean
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class OrderJoinBean implements Writable {private String id;private String date;private String pid;private String amount;private String name;private String categoryId;private String price;@Overridepublic String toString() {return id+"\t"+date+"\t"+pid+"\t"+amount+"\t"+name+"\t"+categoryId+"\t"+price;}public OrderJoinBean() {}public OrderJoinBean(String id, String date, String pid, String amount, String name, String categoryId, String price) {this.id = id;this.date = date;this.pid = pid;this.amount = amount;this.name = name;this.categoryId = categoryId;this.price = price;}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getDate() {return date;}public void setDate(String date) {this.date = date;}public String getPid() {return pid;}public void setPid(String pid) {this.pid = pid;}public String getAmount() {return amount;}public void setAmount(String amount) {this.amount = amount;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getCategoryId() {return categoryId;}public void setCategoryId(String categoryId) {this.categoryId = categoryId;}public String getPrice() {return price;}public void setPrice(String price) {this.price = price;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(id+"");out.writeUTF(date+"");out.writeUTF(pid+"");out.writeUTF(amount+"");out.writeUTF(name+"");out.writeUTF(categoryId+"");out.writeUTF(price+"");}@Overridepublic void readFields(DataInput in) throws IOException {this.id = in.readUTF();this.date = in.readUTF();this.pid = in.readUTF();this.amount = in.readUTF();this.name = in.readUTF();this.categoryId = in.readUTF();this.price = in.readUTF();}
}
- 第二步:定义map类
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;public class OrderJoinMap extends Mapper<LongWritable,Text, Text,OrderJoinBean> {private OrderJoinBean orderJoinBean = new OrderJoinBean();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//通过获取文件名来区分两个不同的文件String[] split = value.toString().split(",");FileSplit inputSplit = (FileSplit) context.getInputSplit();String name = inputSplit.getPath().getName();System.out.println("获取文件名为"+name);if(name.contains("orders")){//订单数据orderJoinBean.setId(split[0]);orderJoinBean.setDate(split[1]);orderJoinBean.setPid(split[2]);orderJoinBean.setAmount(split[3]);context.write(new Text(split[2]),orderJoinBean);}else{//商品数据orderJoinBean.setName(split[1]);orderJoinBean.setCategoryId(split[2]);orderJoinBean.setPrice(split[3]);context.write(new Text(split[0]),orderJoinBean);}}
}
- 第三步:自定义reduce类
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class OrderJoinReduce extends Reducer<Text,OrderJoinBean,OrderJoinBean, NullWritable> {private OrderJoinBean orderJoinBean;@Overrideprotected void reduce(Text key, Iterable<OrderJoinBean> values, Context context) throws IOException, InterruptedException {orderJoinBean = new OrderJoinBean();for (OrderJoinBean value : values) {System.out.println(value.getId());//相同的key的对象都发送到了这里,在这里将数据拼接完整if(null !=value.getId() && !value.getId().equals("null") ){orderJoinBean.setId(value.getId());orderJoinBean.setDate(value.getDate());orderJoinBean.setPid(value.getPid());orderJoinBean.setAmount(value.getAmount());}else{orderJoinBean.setName(value.getName());orderJoinBean.setCategoryId(value.getCategoryId());orderJoinBean.setPrice(value.getPrice());}}context.write(orderJoinBean,NullWritable.get());}
}
- 第四步:开发main方法入口
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;public class OrderJoinMain extends Configured implements Tool {@Overridepublic int run(String[] args) throws Exception {Job job = Job.getInstance(super.getConf(), OrderJoinMain.class.getSimpleName());job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job,new Path("file:///E:\\cache\\mapReduceTestCache\\Test.txt"));job.setMapperClass(OrderJoinMap.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(OrderJoinBean.class);job.setReducerClass(OrderJoinReduce.class);job.setOutputKeyClass(OrderJoinBean.class);job.setOutputValueClass(NullWritable.class);job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job,new Path("file:///E:\\cache\\mapReduceResultCache\\TestResult01"));boolean b = job.waitForCompletion(true);return b?0:1;}public static void main(String[] args) throws Exception {ToolRunner.run(new Configuration(),new OrderJoinMain(),args);}
}
缺点:这种方式中,join的操作是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜
解决方案: map端join实现方式
2 map端join算法实现
1、原理阐述
适用于关联表中有小表的情形;
可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果,可以大大提高join操作的并发度,加快处理速度
2、实现示例
–先在mapper类中预先定义好小表,进行join
–引入实际场景中的解决方案:一次加载数据库或者用
- 第一步:定义mapJoin
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;public class JoinMap extends Mapper<LongWritable, Text,Text,Text> {HashMap<String,String> b_tab = new HashMap<String, String>();String line = null;/*map端的初始化方法当中获取缓存文件,一次性加载到map当中来*/@Overridepublic void setup(Context context) throws IOException, InterruptedException {//这种方式获取所有的缓存文件// URI[] cacheFiles1 = DistributedCache.getCacheFiles(context.getConfiguration());Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());URI[] cacheFiles = DistributedCache.getCacheFiles(context.getConfiguration());FileSystem fileSystem = FileSystem.get(cacheFiles[0], context.getConfiguration());FSDataInputStream open = fileSystem.open(new Path(cacheFiles[0]));BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(open));while ((line = bufferedReader.readLine())!=null){String[] split = line.split(",");b_tab.put(split[0],split[1]+"\t"+split[2]+"\t"+split[3]);}fileSystem.close();IOUtils.closeStream(bufferedReader);}@Overridepublic void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//这里读的是这个map task所负责的那一个切片数据(在hdfs上)String[] fields = value.toString().split(",");String orderId = fields[0];String date = fields[1];String pdId = fields[2];String amount = fields[3];//获取map当中的商品详细信息String productInfo = b_tab.get(pdId);context.write(new Text(orderId), new Text(date + "\t" + productInfo+"\t"+amount));}
}
- 第二步:定义程序运行main方法
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.net.URI;public class MapSideJoin extends Configured implements Tool {@Overridepublic int run(String[] args) throws Exception {Configuration conf = super.getConf();//注意,这里的缓存文件的添加,只能将缓存文件放到hdfs文件系统当中,放到本地加载不到DistributedCache.addCacheFile(new URI("hdfs://192.168.100.201:8020/cachefile/pdts.txt"),conf);Job job = Job.getInstance(conf, MapSideJoin.class.getSimpleName());job.setJarByClass(MapSideJoin.class);job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job,new Path("file:///E:\\cache\\mapReduceTestCache\\Test.txt"));job.setMapperClass(JoinMap.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job,new Path("file:///E:\\cache\\mapReduceResultCache\\TestResult01")) ;boolean b = job.waitForCompletion(true);return b?0:1;}public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();ToolRunner.run(configuration,new MapSideJoin(),args);}
}
MapReduceReduce端join与Map端Join算法实现
文章目录
- 1、reduce端join算法实现
- 2 map端join算法实现
1、reduce端join算法实现
1、需求:
订单数据表t_order:
id | date | pid | amount |
---|---|---|---|
1001 | 20150710 | P0001 | 2 |
1002 | 20150710 | P0001 | 3 |
1002 | 20150710 | P0002 | 3 |
商品信息表t_product
id | pname | category_id | price |
---|---|---|---|
P0001 | 小米5 | 1000 | 2000 |
P0002 | 锤子T1 | 1000 | 3000 |
假如数据量巨大,两表的数据是以文件的形式存储在HDFS中,需要用mapreduce程序来实现一下
SQL查询运算:
select a.id,a.date,b.name,b.category_id,b.price from t_order a join t_product b on a.pid = b.id
- 第一步:定义OrderBean
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class OrderJoinBean implements Writable {private String id;private String date;private String pid;private String amount;private String name;private String categoryId;private String price;@Overridepublic String toString() {return id+"\t"+date+"\t"+pid+"\t"+amount+"\t"+name+"\t"+categoryId+"\t"+price;}public OrderJoinBean() {}public OrderJoinBean(String id, String date, String pid, String amount, String name, String categoryId, String price) {this.id = id;this.date = date;this.pid = pid;this.amount = amount;this.name = name;this.categoryId = categoryId;this.price = price;}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getDate() {return date;}public void setDate(String date) {this.date = date;}public String getPid() {return pid;}public void setPid(String pid) {this.pid = pid;}public String getAmount() {return amount;}public void setAmount(String amount) {this.amount = amount;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getCategoryId() {return categoryId;}public void setCategoryId(String categoryId) {this.categoryId = categoryId;}public String getPrice() {return price;}public void setPrice(String price) {this.price = price;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(id+"");out.writeUTF(date+"");out.writeUTF(pid+"");out.writeUTF(amount+"");out.writeUTF(name+"");out.writeUTF(categoryId+"");out.writeUTF(price+"");}@Overridepublic void readFields(DataInput in) throws IOException {this.id = in.readUTF();this.date = in.readUTF();this.pid = in.readUTF();this.amount = in.readUTF();this.name = in.readUTF();this.categoryId = in.readUTF();this.price = in.readUTF();}
}
- 第二步:定义map类
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;public class OrderJoinMap extends Mapper<LongWritable,Text, Text,OrderJoinBean> {private OrderJoinBean orderJoinBean = new OrderJoinBean();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//通过获取文件名来区分两个不同的文件String[] split = value.toString().split(",");FileSplit inputSplit = (FileSplit) context.getInputSplit();String name = inputSplit.getPath().getName();System.out.println("获取文件名为"+name);if(name.contains("orders")){//订单数据orderJoinBean.setId(split[0]);orderJoinBean.setDate(split[1]);orderJoinBean.setPid(split[2]);orderJoinBean.setAmount(split[3]);context.write(new Text(split[2]),orderJoinBean);}else{//商品数据orderJoinBean.setName(split[1]);orderJoinBean.setCategoryId(split[2]);orderJoinBean.setPrice(split[3]);context.write(new Text(split[0]),orderJoinBean);}}
}
- 第三步:自定义reduce类
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class OrderJoinReduce extends Reducer<Text,OrderJoinBean,OrderJoinBean, NullWritable> {private OrderJoinBean orderJoinBean;@Overrideprotected void reduce(Text key, Iterable<OrderJoinBean> values, Context context) throws IOException, InterruptedException {orderJoinBean = new OrderJoinBean();for (OrderJoinBean value : values) {System.out.println(value.getId());//相同的key的对象都发送到了这里,在这里将数据拼接完整if(null !=value.getId() && !value.getId().equals("null") ){orderJoinBean.setId(value.getId());orderJoinBean.setDate(value.getDate());orderJoinBean.setPid(value.getPid());orderJoinBean.setAmount(value.getAmount());}else{orderJoinBean.setName(value.getName());orderJoinBean.setCategoryId(value.getCategoryId());orderJoinBean.setPrice(value.getPrice());}}context.write(orderJoinBean,NullWritable.get());}
}
- 第四步:开发main方法入口
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;public class OrderJoinMain extends Configured implements Tool {@Overridepublic int run(String[] args) throws Exception {Job job = Job.getInstance(super.getConf(), OrderJoinMain.class.getSimpleName());job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job,new Path("file:///E:\\cache\\mapReduceTestCache\\Test.txt"));job.setMapperClass(OrderJoinMap.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(OrderJoinBean.class);job.setReducerClass(OrderJoinReduce.class);job.setOutputKeyClass(OrderJoinBean.class);job.setOutputValueClass(NullWritable.class);job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job,new Path("file:///E:\\cache\\mapReduceResultCache\\TestResult01"));boolean b = job.waitForCompletion(true);return b?0:1;}public static void main(String[] args) throws Exception {ToolRunner.run(new Configuration(),new OrderJoinMain(),args);}
}
缺点:这种方式中,join的操作是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜
解决方案: map端join实现方式
2 map端join算法实现
1、原理阐述
适用于关联表中有小表的情形;
可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果,可以大大提高join操作的并发度,加快处理速度
2、实现示例
–先在mapper类中预先定义好小表,进行join
–引入实际场景中的解决方案:一次加载数据库或者用
- 第一步:定义mapJoin
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;public class JoinMap extends Mapper<LongWritable, Text,Text,Text> {HashMap<String,String> b_tab = new HashMap<String, String>();String line = null;/*map端的初始化方法当中获取缓存文件,一次性加载到map当中来*/@Overridepublic void setup(Context context) throws IOException, InterruptedException {//这种方式获取所有的缓存文件// URI[] cacheFiles1 = DistributedCache.getCacheFiles(context.getConfiguration());Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());URI[] cacheFiles = DistributedCache.getCacheFiles(context.getConfiguration());FileSystem fileSystem = FileSystem.get(cacheFiles[0], context.getConfiguration());FSDataInputStream open = fileSystem.open(new Path(cacheFiles[0]));BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(open));while ((line = bufferedReader.readLine())!=null){String[] split = line.split(",");b_tab.put(split[0],split[1]+"\t"+split[2]+"\t"+split[3]);}fileSystem.close();IOUtils.closeStream(bufferedReader);}@Overridepublic void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//这里读的是这个map task所负责的那一个切片数据(在hdfs上)String[] fields = value.toString().split(",");String orderId = fields[0];String date = fields[1];String pdId = fields[2];String amount = fields[3];//获取map当中的商品详细信息String productInfo = b_tab.get(pdId);context.write(new Text(orderId), new Text(date + "\t" + productInfo+"\t"+amount));}
}
- 第二步:定义程序运行main方法
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.net.URI;public class MapSideJoin extends Configured implements Tool {@Overridepublic int run(String[] args) throws Exception {Configuration conf = super.getConf();//注意,这里的缓存文件的添加,只能将缓存文件放到hdfs文件系统当中,放到本地加载不到DistributedCache.addCacheFile(new URI("hdfs://192.168.100.201:8020/cachefile/pdts.txt"),conf);Job job = Job.getInstance(conf, MapSideJoin.class.getSimpleName());job.setJarByClass(MapSideJoin.class);job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job,new Path("file:///E:\\cache\\mapReduceTestCache\\Test.txt"));job.setMapperClass(JoinMap.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job,new Path("file:///E:\\cache\\mapReduceResultCache\\TestResult01")) ;boolean b = job.waitForCompletion(true);return b?0:1;}public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();ToolRunner.run(configuration,new MapSideJoin(),args);}
}
发布评论