Flink_DataStream 的 Transformation
文章目录
- DataStream 的 Transformation
- 1. KeyBy
- 2. Connect
- 3. Split 和 select
- 数据输出 Data Sinks
- 1.sink 到 kafka
- 2. sink 到 mysql
DataStream 的 Transformation
1. KeyBy
逻辑上将一个流分成不相交的分区,每个分区包含相同键的元素。在内部,这是通过散 列分区来实现的
package com.czxy.flink.stream.transformationimport org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}//keyBy分组操作算子
object StreamKeyBy {def main(args: Array[String]): Unit = {//1.创建执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.构建数据集import org.apache.flink.api.scala._val elementSource: DataStream[String] = env.fromElements("hadoop hadoop spark hive flink flink")//3.数据组合成元祖类型val wordAndOne: DataStream[(String, Int)] = elementSource.flatMap(x=>x.split(" ")).map((_,1))//4.进行分组val KeyedStream: KeyedStream[(String, Int), Tuple] = wordAndOne.keyBy(0)//5.聚合计算val result: DataStream[(String, Int)] = KeyedStream.reduce((v1,v2)=>(v1._1,v1._2+v2._2))//6.打印输出result.print().setParallelism(1)//7.执行程序env.execute("StreamKeyBy")}
}
2. Connect
用来将两个 dataStream 组装成一个 ConnectedStreams 而且这个 connectedStream 的组成结构就是保留原有的 dataStream 的结构体;这样我们 就可以把不同的数据组装成同一个结构
代码示例:
package com.czxy.flink.stream.transformationimport org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{ConnectedStreams, DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
/*** Connect合并流 */
object StreamConnect {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval source1: DataStream[Long] = env.addSource(new NoParallelSource()).setParallelism(1)val source2: DataStream[Long] = env.addSource(new NoParallelSource()).setParallelism(1)val connectedStreams: ConnectedStreams[Long, Long] = source1.connect(source2)val result: DataStream[String] = connectedStreams.map(item1 => {"item1: " + item1},item2 => {"item2: " + item2})result.print()env.execute("StreamConnect")}//实现一个单线程的,数据从1开始递增的数据集class NoParallelSource extends SourceFunction[Long]() {var number: Long = 1Lvar isRunning: Boolean = trueoverride def run(ctx: SourceFunction.SourceContext[Long]): Unit = {while (isRunning) {ctx.collect(number)number += 1Thread.sleep(1)if (number > 5) {cancel()}}}override def cancel(): Unit = {isRunning = false}}
}
3. Split 和 select
代码实现
package com.czxy.flink.stream.transformationimport org.apache.flink.streaming.api.scala.{DataStream, SplitStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
/*** 需求:* 给出数据 1, 2, 3, 4, 5, 6, 7* 请使用 split 和 select 把数据中的奇偶数分开, 并打印出奇数*/
object StreamSplit {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval source: DataStream[Int] = env.fromElements(1, 2, 3, 4, 5, 6, 7)val splitStream: SplitStream[Int] = source.split(x => {(x % 2) match {case 0 => List("偶数")case 1 => List("奇数")}})val result: DataStream[Int] = splitStream.select("奇数")result.print()env.execute("StreamSplit")}
}
数据输出 Data Sinks
- 3 、将数据 sink 到本地文件(参考批处理)
- 4 、Sink 到本地集合(参考批处理)
- 5 、Sink 到 HDFS(参考批处理)
1.sink 到 kafka
代码示例
package com.czxy.flink.stream.sinkimport java.util.Propertiesimport org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011
import org.apache.flink.streaming.util.serialization.SimpleStringSchemaobject StreamKafkaSink {def main(args: Array[String]): Unit = {//1.创建执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.构建数据集import org.apache.flink.api.scala._val source: DataStream[String] = env.fromElements("1,小丽,北京,女")//3.设置kafka的配置信息val topic="test"val properties: Properties = new Properties()properties.setProperty("bootstrap.servers","node01:9092")val flinkKafkaProducer: FlinkKafkaProducer011[String] = new FlinkKafkaProducer011[String](topic,new SimpleStringSchema(),properties)val result: DataStreamSink[String] = source.addSink(flinkKafkaProducer)// source.addSink(flinkKafkaProducer)
// source.print()env.execute("StreamKafkaSink")}
}
2. sink 到 mysql
package com.czxy.flink.stream.sinkimport java.sql.{Connection, DriverManager, PreparedStatement}import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._object StreamMysqlSink {case class Student(stuId: Int, stuName: String, stuAddr: String, stuSex: String)def main(args: Array[String]): Unit = {//1.创建执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.准备数据val source: DataStream[Student] = env.fromElements(//Student(8, "小青", "广州", "女"),Student(9, "wangman", "beijing", "nv"))val result: DataStreamSink[Student] = source.addSink(new MysqlSink())env.execute("StreamMysqlSink")}class MysqlSink extends RichSinkFunction[Student]() {var connection: Connection = nullvar ps: PreparedStatement = nulloverride def open(parameters: Configuration): Unit = {val driver = "com.mysql.jdbc.Driver"val url = "jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&useSSL=false"val username = "root"val password = "root"//1:加载驱动Class.forName(driver)//2:创建连接connection = DriverManager.getConnection(url, username, password)val sql ="""|insert into student(id , name , addr , sex)values(?,?,?,?);|""".stripMargin//3:获得执行语句ps = connection.prepareStatement(sql)}
// //关闭连接操作
// override def close(): Unit = {
// if (connection != null) {
// connection.close()
// }
// if (ps != null) {
// ps.close()
// }
// }//每个元素的插入,都要触发一次 invoke,这里主要进行 invoke 插入 每条数据执行一次override def invoke(value: Student): Unit = {try{//4.组装数据,执行插入操作ps.setInt(1, value.stuId)ps.setString(2, value.stuName)ps.setString(3, value.stuAddr)ps.setString(4, value.stuSex)ps.executeUpdate()}catch{case e:Exception=>println(e.getMessage)}}}
}
Flink_DataStream 的 Transformation
文章目录
- DataStream 的 Transformation
- 1. KeyBy
- 2. Connect
- 3. Split 和 select
- 数据输出 Data Sinks
- 1.sink 到 kafka
- 2. sink 到 mysql
DataStream 的 Transformation
1. KeyBy
逻辑上将一个流分成不相交的分区,每个分区包含相同键的元素。在内部,这是通过散 列分区来实现的
package com.czxy.flink.stream.transformationimport org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}//keyBy分组操作算子
object StreamKeyBy {def main(args: Array[String]): Unit = {//1.创建执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.构建数据集import org.apache.flink.api.scala._val elementSource: DataStream[String] = env.fromElements("hadoop hadoop spark hive flink flink")//3.数据组合成元祖类型val wordAndOne: DataStream[(String, Int)] = elementSource.flatMap(x=>x.split(" ")).map((_,1))//4.进行分组val KeyedStream: KeyedStream[(String, Int), Tuple] = wordAndOne.keyBy(0)//5.聚合计算val result: DataStream[(String, Int)] = KeyedStream.reduce((v1,v2)=>(v1._1,v1._2+v2._2))//6.打印输出result.print().setParallelism(1)//7.执行程序env.execute("StreamKeyBy")}
}
2. Connect
用来将两个 dataStream 组装成一个 ConnectedStreams 而且这个 connectedStream 的组成结构就是保留原有的 dataStream 的结构体;这样我们 就可以把不同的数据组装成同一个结构
代码示例:
package com.czxy.flink.stream.transformationimport org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{ConnectedStreams, DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
/*** Connect合并流 */
object StreamConnect {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval source1: DataStream[Long] = env.addSource(new NoParallelSource()).setParallelism(1)val source2: DataStream[Long] = env.addSource(new NoParallelSource()).setParallelism(1)val connectedStreams: ConnectedStreams[Long, Long] = source1.connect(source2)val result: DataStream[String] = connectedStreams.map(item1 => {"item1: " + item1},item2 => {"item2: " + item2})result.print()env.execute("StreamConnect")}//实现一个单线程的,数据从1开始递增的数据集class NoParallelSource extends SourceFunction[Long]() {var number: Long = 1Lvar isRunning: Boolean = trueoverride def run(ctx: SourceFunction.SourceContext[Long]): Unit = {while (isRunning) {ctx.collect(number)number += 1Thread.sleep(1)if (number > 5) {cancel()}}}override def cancel(): Unit = {isRunning = false}}
}
3. Split 和 select
代码实现
package com.czxy.flink.stream.transformationimport org.apache.flink.streaming.api.scala.{DataStream, SplitStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
/*** 需求:* 给出数据 1, 2, 3, 4, 5, 6, 7* 请使用 split 和 select 把数据中的奇偶数分开, 并打印出奇数*/
object StreamSplit {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval source: DataStream[Int] = env.fromElements(1, 2, 3, 4, 5, 6, 7)val splitStream: SplitStream[Int] = source.split(x => {(x % 2) match {case 0 => List("偶数")case 1 => List("奇数")}})val result: DataStream[Int] = splitStream.select("奇数")result.print()env.execute("StreamSplit")}
}
数据输出 Data Sinks
- 3 、将数据 sink 到本地文件(参考批处理)
- 4 、Sink 到本地集合(参考批处理)
- 5 、Sink 到 HDFS(参考批处理)
1.sink 到 kafka
代码示例
package com.czxy.flink.stream.sinkimport java.util.Propertiesimport org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011
import org.apache.flink.streaming.util.serialization.SimpleStringSchemaobject StreamKafkaSink {def main(args: Array[String]): Unit = {//1.创建执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.构建数据集import org.apache.flink.api.scala._val source: DataStream[String] = env.fromElements("1,小丽,北京,女")//3.设置kafka的配置信息val topic="test"val properties: Properties = new Properties()properties.setProperty("bootstrap.servers","node01:9092")val flinkKafkaProducer: FlinkKafkaProducer011[String] = new FlinkKafkaProducer011[String](topic,new SimpleStringSchema(),properties)val result: DataStreamSink[String] = source.addSink(flinkKafkaProducer)// source.addSink(flinkKafkaProducer)
// source.print()env.execute("StreamKafkaSink")}
}
2. sink 到 mysql
package com.czxy.flink.stream.sinkimport java.sql.{Connection, DriverManager, PreparedStatement}import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._object StreamMysqlSink {case class Student(stuId: Int, stuName: String, stuAddr: String, stuSex: String)def main(args: Array[String]): Unit = {//1.创建执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.准备数据val source: DataStream[Student] = env.fromElements(//Student(8, "小青", "广州", "女"),Student(9, "wangman", "beijing", "nv"))val result: DataStreamSink[Student] = source.addSink(new MysqlSink())env.execute("StreamMysqlSink")}class MysqlSink extends RichSinkFunction[Student]() {var connection: Connection = nullvar ps: PreparedStatement = nulloverride def open(parameters: Configuration): Unit = {val driver = "com.mysql.jdbc.Driver"val url = "jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&useSSL=false"val username = "root"val password = "root"//1:加载驱动Class.forName(driver)//2:创建连接connection = DriverManager.getConnection(url, username, password)val sql ="""|insert into student(id , name , addr , sex)values(?,?,?,?);|""".stripMargin//3:获得执行语句ps = connection.prepareStatement(sql)}
// //关闭连接操作
// override def close(): Unit = {
// if (connection != null) {
// connection.close()
// }
// if (ps != null) {
// ps.close()
// }
// }//每个元素的插入,都要触发一次 invoke,这里主要进行 invoke 插入 每条数据执行一次override def invoke(value: Student): Unit = {try{//4.组装数据,执行插入操作ps.setInt(1, value.stuId)ps.setString(2, value.stuName)ps.setString(3, value.stuAddr)ps.setString(4, value.stuSex)ps.executeUpdate()}catch{case e:Exception=>println(e.getMessage)}}}
}
发布评论