Structured-Streaming与其他技术整合
文章目录
- 1.整合Kafka
- 1.2整合环境准备
- 2.整合MySQL
1.整合Kafka
- 官网介绍
.html
Creating a Kafka Source for Streaming Queries
// Subscribe to 1 topic
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("subscribe", "topic1").load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
// Subscribe to multiple topics(多个topic)
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("subscribe", "topic1,topic2").load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
// Subscribe to a pattern(订阅通配符topic)
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("subscribePattern", "topic.*").load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
Creating a Kafka Source for Batch Queries(kafka批处理查询)
// Subscribe to 1 topic
//defaults to the earliest and latest offsets(默认为最早和最新偏移)
val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("subscribe", "topic1").load()df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
// Subscribe to multiple topics, (多个topic)
//specifying explicit Kafka offsets(指定明确的偏移量)
val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("subscribe", "topic1,topic2").option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""").option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""").load()df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
// Subscribe to a pattern, (订阅通配符topic)at the earliest and latest offsets
val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("subscribePattern", "topic.*").option("startingOffsets", "earliest").option("endingOffsets", "latest").load()df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
注意:读取后的数据的Schema是固定的,包含的列如下:
Column | Type | 说明 |
---|---|---|
key | binary | 消息的key |
value | binary | 消息的value |
topic | string | 主题 |
partition | int | 分区 |
offset | long | 偏移量 |
timestamp | long | 时间戳 |
timestampType | int | 类型 |
注意:下面的参数是不能被设置的,否则kafka会抛出异常:
- group.id:kafka的source会在每次query的时候自定创建唯一的group id
- auto.offset.reset :为了避免每次手动设置startingoffsets的值,structured streaming在内部消费时会自动管理offset。这样就能保证订阅动态的topic时不会丢失数据。startingOffsets在流处理时,只会作用于第一次启动时,之后的处理都会自动的读取保存的offset。
- key.deserializer,value.deserializer,key.serializer,value.serializer 序列化与反序列化,都是ByteArraySerializer
- enable.auto.commit:Kafka源不支持提交任何偏移量
1.2整合环境准备
启动kafka
cd /export/servers/kafka/bin/kafka-server-start.sh -daemon
cd /export/servers/kafka/config/server.properties
●向topic中生产数据
/export/servers/kafka/bin/kafka-console-producer.sh --broker-list node01:9092 --topic spark_kafka
代码实现
package cn.itcast.structedstreamingimport org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}object KafkaStructuredStreamingDemo {def main(args: Array[String]): Unit = {//1.创建SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()val sc: SparkContext = spark.sparkContext//设置日志级别sc.setLogLevel("WARN")//2.连接Kafka消费(读取)数据val dataDF: DataFrame = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "node01:9092").option("subscribe", "spark_kafka").load()//3.处理数据 dataDF内的数据是kafka的数据(key,value)import spark.implicits._//注意:StructuredStreaming整合Kafka获取到的数据都是字节类型,所以需要按照官网要求,//转成自己的实际类型val dataDS: Dataset[String] = dataDF.selectExpr("CAST(value AS STRING)").as[String]val wordDS: Dataset[String] = dataDS.flatMap(_.split(" "))//调用DSL语句进行计算val result: Dataset[Row] = wordDS.groupBy("value").count().sort($"count".desc)//输出数据result.writeStream.format("console")//往控制台写.outputMode("complete")//每次将所有的数据写出.trigger(Trigger.ProcessingTime(0))//触发时间间隔,0表示尽可能的快.option("truncate",false)//超过长度的列不截断显示,即完全显示.start()//开启.awaitTermination()//等待停止}
}
2.整合MySQL
简介
我们开发中经常需要将流的运算结果输出到外部数据库,例如MySQL中,但是比较遗憾Structured Streaming API不支持外部数据库作为接收器
如果将来加入支持的话,它的API将会非常的简单比如:
format(“jdbc”).option(“url”,“jdbc:mysql://…”).start()
但是目前我们只能自己自定义一个JdbcSink,继承ForeachWriter并实现其方法
- 参考网站
.html
代码演示
package cn.itcast.structedstreamingimport java.sql.{Connection, DriverManager, PreparedStatement}import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.Triggerobject JDBCSinkDemo {def main(args: Array[String]): Unit = {//1.创建SparkSessionval spark: SparkSession =
SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._//2.连接Kafka消费数据val dataDF: DataFrame = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "node01:9092").option("subscribe", "spark_kafka").load()//3.处理数据//注意:StructuredStreaming整合Kafka获取到的数据都是字节类型,所以需要按照官网要求,转成自己的实际类型val dataDS: Dataset[String] = dataDF.selectExpr("CAST(value AS STRING)").as[String]//处理数据val wordDS: Dataset[String] = dataDS.flatMap(_.split(" "))//调用DSL语句进行计算val result: Dataset[Row] = wordDS.groupBy("value").count().sort($"count".desc)//连接mysql数据库val writer = new JDBCSink("jdbc:mysql://node01:3306/bigdata?characterEncoding=UTF-8", "root", "123456")//val writer = new JDBCSink("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "123456")//输出result.writeStream.foreach(writer).outputMode("complete").trigger(Trigger.ProcessingTime(0)).start()//开启.awaitTermination()//等待关闭}/**
编写将数据更新/插入到mysql数据库的代码
*/class JDBCSink(url:String,username:String,password:String) extends ForeachWriter[Row] with Serializable{//_表示占位符,后面会给变量赋值 (准备连接对象)var connection:Connection = _ //设置sqlvar preparedStatement: PreparedStatement = _//开启连接 (用于打开数据库连接)override def open(partitionId: Long, version: Long): Boolean = {//获取链接connection = DriverManager.getConnection(url, username, password)//获取链接无误返回truetrue}/*CREATE TABLE `t_word` (`id` int(11) NOT NULL AUTO_INCREMENT,`word` varchar(255) NOT NULL,`count` int(11) DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE KEY `word` (`word`)) ENGINE=InnoDB AUTO_INCREMENT=26 DEFAULT CHARSET=utf8;*///replace INTO `bigdata`.`t_word` (`id`, `word`, `count`) VALUES (NULL, NULL, NULL);//处理数据--存到MySQL (用于更新/插入数据到mysql)override def process(row: Row): Unit = {//value内的第一个数是单词val word: String = row.get(0).toString//value内的第二个数据是单词的数量val count: String = row.get(1).toString//添加单词的数量println(word+":"+count)//REPLACE INTO:表示如果表中没有数据这插入,如果有数据则替换//注意:REPLACE INTO要求表有主键或唯一索引//编写数据插入语句val sql = "REPLACE INTO `t_word` (`id`, `word`, `count`) VALUES (NULL, ?, ?);"preparedStatement = connection.prepareStatement(sql)preparedStatement.setString(1,word)preparedStatement.setInt(2,Integer.parseInt(count))preparedStatement.executeUpdate()}关闭数据库连接override def close(errorOrNull: Throwable): Unit = {if (connection != null){connection.close()}if(preparedStatement != null){preparedStatement.close()}}}
}
集群连接kafka消费数据
cd /export/install/kafka_2.11-1.0.0/bin/
[root@node01 bin]# ./kafka-console-producer.sh --broker-list node01:9092 --topic spark_kafka
连接mysql
[root@node01 /]# mysql -uroot -p123456
//创建数据库bigdata
mysql> create database bigdata;
//查看数据库
mysql> show databases;
//切换数据库
mysql> use bigdata;
//创建数据表
mysql> CREATE TABLE `t_word` (-> `id` int(11) NOT NULL AUTO_INCREMENT,-> `word` varchar(255) NOT NULL,-> `count` int(11) DEFAULT NULL,-> PRIMARY KEY (`id`),-> UNIQUE KEY `word` (`word`)-> ) ENGINE=InnoDB AUTO_INCREMENT=26 DEFAULT CHARSET=utf8;//查看表
mysql> show tables;//查询表
mysql> select * from t_word;
发布评论