Spark IDEA编写Spark程序
创建Maven项目并补全目录、配置pom.xml
导入pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns=".0.0"xmlns:xsi=""xsi:schemaLocation=".0.0 .0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.itcast</groupId><artifactId>SparkDemo</artifactId><version>1.0-SNAPSHOT</version><!-- 指定仓库位置,依次为aliyun、cloudera和jboss仓库 --><repositories><repository><id>aliyun</id><url>/</url></repository><repository><id>cloudera</id><url>/</url></repository><repository><id>jboss</id><url>;/url></repository></repositories><properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><encoding>UTF-8</encoding><scala.version>2.11.8</scala.version><scala.compat.version>2.11</scala.compat.version><hadoop.version>2.7.4</hadoop.version><spark.version>2.2.0</spark.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive-thriftserver_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>${spark.version}</version></dependency><!-- <dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>${spark.version}</version></dependency>--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.11</artifactId><version>${spark.version}</version></dependency><!--<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.6.0-mr1-cdh5.14.0</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.2.0-cdh5.14.0</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>1.2.0-cdh5.14.0</version></dependency>--><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.7.4</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.3.1</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>1.3.1</version></dependency><dependency><groupId>com.typesafe</groupId><artifactId>config</artifactId><version>1.3.3</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><!-- 指定编译java的插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version></plugin><!-- 指定编译scala的插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.18.1</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass></mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
</project>
maven-assembly-plugin和maven-shade-plugin的区别
- 本地运行
package cn.itcast.sparkhelloimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object WordCount {def main(args: Array[String]): Unit = {//1.创建SparkContextval config = new SparkConf().setAppName("wc").setMaster("local[*]")val sc = new SparkContext(config)//设置日志级别sc.setLogLevel("WARN")//2.读取文件//A Resilient Distributed Dataset (RDD)弹性分布式数据集//可以简单理解为分布式的集合,但是spark对它做了很多的封装,//让程序员使用起来就像操作本地集合一样简单,这样大家就很happy了val fileRDD: RDD[String] = sc.textFile("D:\\授课\\190429\\资料\\data\\words.txt")//3.处理数据//3.1对每一行按空切分并压平形成一个新的集合中装的一个个的单词//flatMap是对集合中的每一个元素进行操作,再进行压平val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))//3.2每个单词记为1val wordAndOneRDD: RDD[(String, Int)] = wordRDD.map((_,1))//3.3根据key进行聚合,统计每个单词的数量//wordAndOneRDD.reduceByKey((a,b)=>a+b)//第一个_:之前累加的结果//第二个_:当前进来的数据val wordAndCount: RDD[(String, Int)] = wordAndOneRDD.reduceByKey(_+_)//4.收集结果val result: Array[(String, Int)] = wordAndCount.collect()result.foreach(println)}
}
idea打印出结果
- 集群运行
package cn.itcast.sparkhelloimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object WordCount {def main(args: Array[String]): Unit = {//1.创建SparkContextval config = new SparkConf().setAppName("wc")//.setMaster("local[*]") val sc = new SparkContext(config)//设置日志级别sc.setLogLevel("WARN")//2.读取文件//A Resilient Distributed Dataset (RDD)弹性分布式数据集//可以简单理解为分布式的集合,但是spark对它做了很多的封装,//让程序员使用起来就像操作本地集合一样简单,这样大家就很happy了val fileRDD: RDD[String] = sc.textFile(args(0)) //文件输入路径//3.处理数据//3.1对每一行按空切分并压平形成一个新的集合中装的一个个的单词//flatMap是对集合中的每一个元素进行操作,再进行压平val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))//3.2每个单词记为1val wordAndOneRDD: RDD[(String, Int)] = wordRDD.map((_,1))//3.3根据key进行聚合,统计每个单词的数量//wordAndOneRDD.reduceByKey((a,b)=>a+b)//第一个_:之前累加的结果//第二个_:当前进来的数据val wordAndCount: RDD[(String, Int)] = wordAndOneRDD.reduceByKey(_+_)wordAndCount.saveAsTextFile(args(1))//文件输出路径//4.收集结果//val result: Array[(String, Int)] = wordAndCount.collect()//result.foreach(println)}
}
- IDEA打包
- 上传
- 执行命令提交到Spark-HA集群
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/bin/spark-submit \
//包名
--class cn.itcast.sparkhello.WordCount \
--master spark://node01:7077,node02:7077 \
--executor-memory 1g \
--total-executor-cores 2 \
/root/wc.jar \
hdfs://node01:8020/wordcount/input/words.txt \
hdfs://node01:8020/wordcount/output4
- 执行命令提交到YARN集群
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/bin/spark-submit \
//包名
--class cn.itcast.sparkhello.WordCount \
--master yarn \
--deploy-mode cluster \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 2 \
--queue default \
/root/wc.jar \
hdfs://node01:8020/wordcount/input/words.txt \
hdfs://node01:8020/wordcount/output5
YARN集群
运行结束后在hue中查看结果
- Java8版[了解]
Spark是用Scala实现的,而scala作为基于JVM的语言,与Java有着良好集成关系。用Java语言来写前面的案例同样非常简单,只不过会有点冗长。
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;import java.util.Arrays;public class WordCount_Java {public static void main(String[] args){SparkConf conf = new SparkConf().setAppName("wc").setMaster("local[*]");JavaSparkContext jsc = new JavaSparkContext(conf);JavaRDD<String> fileRDD = jsc.textFile("D:\\授课\\190429\\资料\\data\\words.txt");JavaRDD<String> wordRDD = fileRDD.flatMap(s -> Arrays.asList(s.split(" ")).iterator());JavaPairRDD<String, Integer> wordAndOne = wordRDD.mapToPair(w -> new Tuple2<>(w, 1));JavaPairRDD<String, Integer> wordAndCount = wordAndOne.reduceByKey((a, b) -> a + b);//wordAndCount.collect().forEach(t->System.out.println(t));wordAndCount.collect().forEach(System.out::println);//函数式编程的核心思想:行为参数化!}
}
运行后的结果是一样的。
发布评论