MapReduceJava代码应用Snappy压缩算法
操作流程
Snappy算法在本地模式里边没有,所以必须得去集群里跑这个代码
首先请看操作记录
[root@hadoop01 home]# hadoop fs -mkdir /aaaaa
在集群新建一个/aaaaa目录[root@hadoop01 home]# hadoop fs -put a.txt /aaaaa/
随便建一个a.txt输入任意内容,然后上传到集群的/aaaaa目录下[root@hadoop01 home]# hadoop fs -ls /aaaaa
看一看里边有啥
Found 1 items
-rw-r--r-- 2 root supergroup 50 2019-11-18 09:34 /aaaaa/a.txt[root@hadoop01 home]# rz
上传我们代码打成的jar包到linux
rz waiting to receive.zmodem trl+C ȡ100% 26014 KB 26014 KB/s 00:00:01 0 Errors[root@hadoop01 home]# ll
//查看是否上传成功。test5.jar就是上传的jar
总用量 52020
-rw-r--r-- 1 root root 50 11月 18 09:33 a.txt
-rw-r--r-- 1 root root 26623563 11月 16 14:48 test4.jar
-rw-r--r-- 1 root root 26639104 11月 18 09:27 test5.jar[root@hadoop01 home]# hadoop jar test5.jar com.czxy.day20191118.demo02.MoreFileDriver
运行这个jar
19/11/18 09:35:22 INFO client.RMProxy: Connecting to ResourceManager at hadoop01/192.168.100.201:8032
19/11/18 09:35:23 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
19/11/18 09:35:24 INFO input.FileInputFormat: Total input paths to process : 1
19/11/18 09:35:24 INFO mapreduce.JobSubmitter: number of splits:1
19/11/18 09:35:24 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1574040817860_0001
19/11/18 09:35:24 INFO impl.YarnClientImpl: Submitted application application_1574040817860_0001
19/11/18 09:35:25 INFO mapreduce.Job: The url to track the job: http://hadoop01:8088/proxy/application_1574040817860_0001/
19/11/18 09:35:25 INFO mapreduce.Job: Running job: job_1574040817860_0001
19/11/18 09:35:32 INFO mapreduce.Job: Job job_1574040817860_0001 running in uber mode : true
19/11/18 09:35:32 INFO mapreduce.Job: map 100% reduce 0%
19/11/18 09:35:34 INFO mapreduce.Job: map 100% reduce 100%
19/11/18 09:35:34 INFO mapreduce.Job: Job job_1574040817860_0001 completed successfully
19/11/18 09:35:34 INFO mapreduce.Job: Counters: 52File System CountersFILE: Number of bytes read=320FILE: Number of bytes written=496FILE: Number of read operations=0FILE: Number of large read operations=0FILE: Number of write operations=0HDFS: Number of bytes read=400HDFS: Number of bytes written=165860HDFS: Number of read operations=35HDFS: Number of large read operations=0HDFS: Number of write operations=9Job Counters Launched map tasks=1Launched reduce tasks=1Other local map tasks=1Total time spent by all maps in occupied slots (ms)=0Total time spent by all reduces in occupied slots (ms)=0TOTAL_LAUNCHED_UBERTASKS=2NUM_UBER_SUBMAPS=1NUM_UBER_SUBREDUCES=1Total time spent by all map tasks (ms)=292Total time spent by all reduce tasks (ms)=1475Total vcore-milliseconds taken by all map tasks=0Total vcore-milliseconds taken by all reduce tasks=0Total megabyte-milliseconds taken by all map tasks=0Total megabyte-milliseconds taken by all reduce tasks=0Map-Reduce FrameworkMap input records=11Map output records=11Map output bytes=116Map output materialized bytes=144Input split bytes=97Combine input records=0Combine output records=0Reduce input groups=1Reduce shuffle bytes=144Reduce input records=11Reduce output records=11Spilled Records=22Shuffled Maps =1Failed Shuffles=0Merged Map outputs=1GC time elapsed (ms)=0CPU time spent (ms)=1340Physical memory (bytes) snapshot=766976000Virtual memory (bytes) snapshot=6122287104Total committed heap usage (bytes)=556793856Shuffle ErrorsBAD_ID=0CONNECTION=0IO_ERROR=0WRONG_LENGTH=0WRONG_MAP=0WRONG_REDUCE=0File Input Format Counters Bytes Read=50File Output Format Counters Bytes Written=889[root@hadoop01 home]# hadoop fs -ls /ccccc
运行完后,看看生成的/ccccc目录下的结果,是以.snappy结尾的,成功
Found 2 items
-rw-r--r-- 2 root supergroup 0 2019-11-18 09:35 /ccccc/_SUCCESS
-rw-r--r-- 2 root supergroup 84 2019-11-18 09:35 /ccccc/part-r-00000.snappy[root@hadoop01 home]# hadoop fs -cat /ccccc/part-r-00000.snappy
看看里边的内容是不是被压缩过了,都是乱码,显然压缩成功了。
tLta.txt f
324 e
dr fa cd
xz r234reaFvzcxv
asf:ihello.
[root@hadoop01 home]#
java代码
MoreFileDriver
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;public class MoreFileDriver extends Configured implements Tool {@Overridepublic int run(String[] args) throws Exception {//在configuration中使用压缩算法Configuration conf = new Configuration();//设置Map输出的数据使用的压缩算法conf.set("mapreduce.map.out.compress","true");conf.set("mapreduce.map.out.compress","org.apache.hadoop.io.compress.SnappyCodec");//设置Reduce输出的数据使用的压缩算法conf.set("mapreduce.output.fileoutputformat.compress","true");conf.set("mapreduce.output.fileoutputformat.compress.type","RECORD");conf.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");Job job = Job.getInstance(conf,"MoreFile");job.setJarByClass(MoreFileDriver.class);job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job,new Path("/aaaaa"));job.setMapperClass(MoreFileMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setReducerClass(MoreFileReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job,new Path("/ccccc"));return job.waitForCompletion(true)?0:1;}public static void main(String[] args) throws Exception {ToolRunner.run(new MoreFileDriver(),args);}
}
MoreFileMapper
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;public class MoreFileMapper extends Mapper<LongWritable, Text,Text,Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//通过context可以获取这行文本所属的文件名称FileSplit inputSplit = (FileSplit)context.getInputSplit();String filename= inputSplit.getPath().getName();context.write(new Text(filename),value);}
}
MoreFileReducer
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;public class MoreFileReducer extends Reducer<Text,Text,Text,Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {//遍历Value进行输出for (Text value : values) { context.write(key,value);}}
}
运行结果
[root@hadoop01 home]# hadoop fs -ls /ccccc
Found 2 items
-rw-r--r-- 2 root supergroup 0 2019-11-18 09:35 /ccccc/_SUCCESS
-rw-r--r-- 2 root supergroup 84 2019-11-18 09:35 /ccccc/part-r-00000.snappy
[root@hadoop01 home]# hadoop fs -cat /ccccc/part-r-00000.snappy
tLta.txt f
324 e
dr fa cd
xz r234reaFvzcxv
asf:ihello.
[root@hadoop01 home]#
请看第一步操作流程的具体内容,结合代码,理解snappy压缩。
总结
最重要的代码都在这里。
在Driver类中,配置Configuration,即可实现压缩算法。
所以压缩算法的java包名是必须要记忆的。
//在configuration中使用压缩算法
Configuration conf = new Configuration();//设置Map输出的数据使用的压缩算法
conf.set("mapreduce.map.out.compress","true");
conf.set("mapreduce.map.out.compress","org.apache.hadoop.io.compress.SnappyCodec");
//设置Reduce输出的数据使用的压缩算法
conf.set("mapreduce.output.fileoutputformat.compress","true");
conf.set("mapreduce.output.fileoutputformat.compress.type","RECORD");
conf.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");
MapReduceJava代码应用Snappy压缩算法
操作流程
Snappy算法在本地模式里边没有,所以必须得去集群里跑这个代码
首先请看操作记录
[root@hadoop01 home]# hadoop fs -mkdir /aaaaa
在集群新建一个/aaaaa目录[root@hadoop01 home]# hadoop fs -put a.txt /aaaaa/
随便建一个a.txt输入任意内容,然后上传到集群的/aaaaa目录下[root@hadoop01 home]# hadoop fs -ls /aaaaa
看一看里边有啥
Found 1 items
-rw-r--r-- 2 root supergroup 50 2019-11-18 09:34 /aaaaa/a.txt[root@hadoop01 home]# rz
上传我们代码打成的jar包到linux
rz waiting to receive.zmodem trl+C ȡ100% 26014 KB 26014 KB/s 00:00:01 0 Errors[root@hadoop01 home]# ll
//查看是否上传成功。test5.jar就是上传的jar
总用量 52020
-rw-r--r-- 1 root root 50 11月 18 09:33 a.txt
-rw-r--r-- 1 root root 26623563 11月 16 14:48 test4.jar
-rw-r--r-- 1 root root 26639104 11月 18 09:27 test5.jar[root@hadoop01 home]# hadoop jar test5.jar com.czxy.day20191118.demo02.MoreFileDriver
运行这个jar
19/11/18 09:35:22 INFO client.RMProxy: Connecting to ResourceManager at hadoop01/192.168.100.201:8032
19/11/18 09:35:23 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
19/11/18 09:35:24 INFO input.FileInputFormat: Total input paths to process : 1
19/11/18 09:35:24 INFO mapreduce.JobSubmitter: number of splits:1
19/11/18 09:35:24 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1574040817860_0001
19/11/18 09:35:24 INFO impl.YarnClientImpl: Submitted application application_1574040817860_0001
19/11/18 09:35:25 INFO mapreduce.Job: The url to track the job: http://hadoop01:8088/proxy/application_1574040817860_0001/
19/11/18 09:35:25 INFO mapreduce.Job: Running job: job_1574040817860_0001
19/11/18 09:35:32 INFO mapreduce.Job: Job job_1574040817860_0001 running in uber mode : true
19/11/18 09:35:32 INFO mapreduce.Job: map 100% reduce 0%
19/11/18 09:35:34 INFO mapreduce.Job: map 100% reduce 100%
19/11/18 09:35:34 INFO mapreduce.Job: Job job_1574040817860_0001 completed successfully
19/11/18 09:35:34 INFO mapreduce.Job: Counters: 52File System CountersFILE: Number of bytes read=320FILE: Number of bytes written=496FILE: Number of read operations=0FILE: Number of large read operations=0FILE: Number of write operations=0HDFS: Number of bytes read=400HDFS: Number of bytes written=165860HDFS: Number of read operations=35HDFS: Number of large read operations=0HDFS: Number of write operations=9Job Counters Launched map tasks=1Launched reduce tasks=1Other local map tasks=1Total time spent by all maps in occupied slots (ms)=0Total time spent by all reduces in occupied slots (ms)=0TOTAL_LAUNCHED_UBERTASKS=2NUM_UBER_SUBMAPS=1NUM_UBER_SUBREDUCES=1Total time spent by all map tasks (ms)=292Total time spent by all reduce tasks (ms)=1475Total vcore-milliseconds taken by all map tasks=0Total vcore-milliseconds taken by all reduce tasks=0Total megabyte-milliseconds taken by all map tasks=0Total megabyte-milliseconds taken by all reduce tasks=0Map-Reduce FrameworkMap input records=11Map output records=11Map output bytes=116Map output materialized bytes=144Input split bytes=97Combine input records=0Combine output records=0Reduce input groups=1Reduce shuffle bytes=144Reduce input records=11Reduce output records=11Spilled Records=22Shuffled Maps =1Failed Shuffles=0Merged Map outputs=1GC time elapsed (ms)=0CPU time spent (ms)=1340Physical memory (bytes) snapshot=766976000Virtual memory (bytes) snapshot=6122287104Total committed heap usage (bytes)=556793856Shuffle ErrorsBAD_ID=0CONNECTION=0IO_ERROR=0WRONG_LENGTH=0WRONG_MAP=0WRONG_REDUCE=0File Input Format Counters Bytes Read=50File Output Format Counters Bytes Written=889[root@hadoop01 home]# hadoop fs -ls /ccccc
运行完后,看看生成的/ccccc目录下的结果,是以.snappy结尾的,成功
Found 2 items
-rw-r--r-- 2 root supergroup 0 2019-11-18 09:35 /ccccc/_SUCCESS
-rw-r--r-- 2 root supergroup 84 2019-11-18 09:35 /ccccc/part-r-00000.snappy[root@hadoop01 home]# hadoop fs -cat /ccccc/part-r-00000.snappy
看看里边的内容是不是被压缩过了,都是乱码,显然压缩成功了。
tLta.txt f
324 e
dr fa cd
xz r234reaFvzcxv
asf:ihello.
[root@hadoop01 home]#
java代码
MoreFileDriver
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;public class MoreFileDriver extends Configured implements Tool {@Overridepublic int run(String[] args) throws Exception {//在configuration中使用压缩算法Configuration conf = new Configuration();//设置Map输出的数据使用的压缩算法conf.set("mapreduce.map.out.compress","true");conf.set("mapreduce.map.out.compress","org.apache.hadoop.io.compress.SnappyCodec");//设置Reduce输出的数据使用的压缩算法conf.set("mapreduce.output.fileoutputformat.compress","true");conf.set("mapreduce.output.fileoutputformat.compress.type","RECORD");conf.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");Job job = Job.getInstance(conf,"MoreFile");job.setJarByClass(MoreFileDriver.class);job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job,new Path("/aaaaa"));job.setMapperClass(MoreFileMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setReducerClass(MoreFileReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job,new Path("/ccccc"));return job.waitForCompletion(true)?0:1;}public static void main(String[] args) throws Exception {ToolRunner.run(new MoreFileDriver(),args);}
}
MoreFileMapper
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;public class MoreFileMapper extends Mapper<LongWritable, Text,Text,Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//通过context可以获取这行文本所属的文件名称FileSplit inputSplit = (FileSplit)context.getInputSplit();String filename= inputSplit.getPath().getName();context.write(new Text(filename),value);}
}
MoreFileReducer
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;public class MoreFileReducer extends Reducer<Text,Text,Text,Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {//遍历Value进行输出for (Text value : values) { context.write(key,value);}}
}
运行结果
[root@hadoop01 home]# hadoop fs -ls /ccccc
Found 2 items
-rw-r--r-- 2 root supergroup 0 2019-11-18 09:35 /ccccc/_SUCCESS
-rw-r--r-- 2 root supergroup 84 2019-11-18 09:35 /ccccc/part-r-00000.snappy
[root@hadoop01 home]# hadoop fs -cat /ccccc/part-r-00000.snappy
tLta.txt f
324 e
dr fa cd
xz r234reaFvzcxv
asf:ihello.
[root@hadoop01 home]#
请看第一步操作流程的具体内容,结合代码,理解snappy压缩。
总结
最重要的代码都在这里。
在Driver类中,配置Configuration,即可实现压缩算法。
所以压缩算法的java包名是必须要记忆的。
//在configuration中使用压缩算法
Configuration conf = new Configuration();//设置Map输出的数据使用的压缩算法
conf.set("mapreduce.map.out.compress","true");
conf.set("mapreduce.map.out.compress","org.apache.hadoop.io.compress.SnappyCodec");
//设置Reduce输出的数据使用的压缩算法
conf.set("mapreduce.output.fileoutputformat.compress","true");
conf.set("mapreduce.output.fileoutputformat.compress.type","RECORD");
conf.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");
发布评论