Flink_DataStream 的 Transformation

文章目录

  • DataStream 的 Transformation
    • 1. KeyBy
    • 2. Connect
    • 3. Split 和 select
  • 数据输出 Data Sinks
    • 1.sink 到 kafka
    • 2. sink 到 mysql

DataStream 的 Transformation

1. KeyBy

逻辑上将一个流分成不相交的分区,每个分区包含相同键的元素。在内部,这是通过散 列分区来实现的

package com.czxy.flink.stream.transformationimport org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}//keyBy分组操作算子
object StreamKeyBy {def main(args: Array[String]): Unit = {//1.创建执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.构建数据集import org.apache.flink.api.scala._val elementSource: DataStream[String] = env.fromElements("hadoop hadoop spark hive flink flink")//3.数据组合成元祖类型val wordAndOne: DataStream[(String, Int)] = elementSource.flatMap(x=>x.split(" ")).map((_,1))//4.进行分组val KeyedStream: KeyedStream[(String, Int), Tuple] = wordAndOne.keyBy(0)//5.聚合计算val result: DataStream[(String, Int)] = KeyedStream.reduce((v1,v2)=>(v1._1,v1._2+v2._2))//6.打印输出result.print().setParallelism(1)//7.执行程序env.execute("StreamKeyBy")}
}

2. Connect

用来将两个 dataStream 组装成一个 ConnectedStreams 而且这个 connectedStream 的组成结构就是保留原有的 dataStream 的结构体;这样我们 就可以把不同的数据组装成同一个结构

代码示例:

package com.czxy.flink.stream.transformationimport org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{ConnectedStreams, DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
/*** Connect合并流 */
object StreamConnect {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval source1: DataStream[Long] = env.addSource(new NoParallelSource()).setParallelism(1)val source2: DataStream[Long] = env.addSource(new NoParallelSource()).setParallelism(1)val connectedStreams: ConnectedStreams[Long, Long] = source1.connect(source2)val result: DataStream[String] = connectedStreams.map(item1 => {"item1: " + item1},item2 => {"item2: " + item2})result.print()env.execute("StreamConnect")}//实现一个单线程的,数据从1开始递增的数据集class NoParallelSource extends SourceFunction[Long]() {var number: Long = 1Lvar isRunning: Boolean = trueoverride def run(ctx: SourceFunction.SourceContext[Long]): Unit = {while (isRunning) {ctx.collect(number)number += 1Thread.sleep(1)if (number > 5) {cancel()}}}override def cancel(): Unit = {isRunning = false}}
}

3. Split 和 select


代码实现

package com.czxy.flink.stream.transformationimport org.apache.flink.streaming.api.scala.{DataStream, SplitStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
/*** 需求:* 给出数据 1, 2, 3, 4, 5, 6, 7* 请使用 split 和 select 把数据中的奇偶数分开, 并打印出奇数*/
object StreamSplit {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval source: DataStream[Int] = env.fromElements(1, 2, 3, 4, 5, 6, 7)val splitStream: SplitStream[Int] = source.split(x => {(x % 2) match {case 0 => List("偶数")case 1 => List("奇数")}})val result: DataStream[Int] = splitStream.select("奇数")result.print()env.execute("StreamSplit")}
}

数据输出 Data Sinks

  • 3 、将数据 sink 到本地文件(参考批处理)
  • 4 、Sink 到本地集合(参考批处理)
  • 5 、Sink 到 HDFS(参考批处理)

1.sink 到 kafka

代码示例

package com.czxy.flink.stream.sinkimport java.util.Propertiesimport org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011
import org.apache.flink.streaming.util.serialization.SimpleStringSchemaobject StreamKafkaSink {def main(args: Array[String]): Unit = {//1.创建执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.构建数据集import org.apache.flink.api.scala._val source: DataStream[String] = env.fromElements("1,小丽,北京,女")//3.设置kafka的配置信息val topic="test"val properties: Properties = new Properties()properties.setProperty("bootstrap.servers","node01:9092")val flinkKafkaProducer: FlinkKafkaProducer011[String] = new FlinkKafkaProducer011[String](topic,new SimpleStringSchema(),properties)val result: DataStreamSink[String] = source.addSink(flinkKafkaProducer)//    source.addSink(flinkKafkaProducer)
//    source.print()env.execute("StreamKafkaSink")}
}

2. sink 到 mysql

package com.czxy.flink.stream.sinkimport java.sql.{Connection, DriverManager, PreparedStatement}import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._object StreamMysqlSink {case class Student(stuId: Int, stuName: String, stuAddr: String, stuSex: String)def main(args: Array[String]): Unit = {//1.创建执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.准备数据val source: DataStream[Student] = env.fromElements(//Student(8, "小青", "广州", "女"),Student(9, "wangman", "beijing", "nv"))val result: DataStreamSink[Student] = source.addSink(new MysqlSink())env.execute("StreamMysqlSink")}class MysqlSink extends RichSinkFunction[Student]() {var connection: Connection = nullvar ps: PreparedStatement = nulloverride def open(parameters: Configuration): Unit = {val driver = "com.mysql.jdbc.Driver"val url = "jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&useSSL=false"val username = "root"val password = "root"//1:加载驱动Class.forName(driver)//2:创建连接connection = DriverManager.getConnection(url, username, password)val sql ="""|insert into student(id , name , addr , sex)values(?,?,?,?);|""".stripMargin//3:获得执行语句ps = connection.prepareStatement(sql)}
//    //关闭连接操作
//    override def close(): Unit = {
//      if (connection != null) {
//        connection.close() 
//      }
//      if (ps != null) {
//        ps.close()
//      }
//    }//每个元素的插入,都要触发一次 invoke,这里主要进行 invoke 插入 每条数据执行一次override def invoke(value: Student): Unit = {try{//4.组装数据,执行插入操作ps.setInt(1, value.stuId)ps.setString(2, value.stuName)ps.setString(3, value.stuAddr)ps.setString(4, value.stuSex)ps.executeUpdate()}catch{case e:Exception=>println(e.getMessage)}}}
}

Flink_DataStream 的 Transformation

文章目录

  • DataStream 的 Transformation
    • 1. KeyBy
    • 2. Connect
    • 3. Split 和 select
  • 数据输出 Data Sinks
    • 1.sink 到 kafka
    • 2. sink 到 mysql

DataStream 的 Transformation

1. KeyBy

逻辑上将一个流分成不相交的分区,每个分区包含相同键的元素。在内部,这是通过散 列分区来实现的

package com.czxy.flink.stream.transformationimport org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}//keyBy分组操作算子
object StreamKeyBy {def main(args: Array[String]): Unit = {//1.创建执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.构建数据集import org.apache.flink.api.scala._val elementSource: DataStream[String] = env.fromElements("hadoop hadoop spark hive flink flink")//3.数据组合成元祖类型val wordAndOne: DataStream[(String, Int)] = elementSource.flatMap(x=>x.split(" ")).map((_,1))//4.进行分组val KeyedStream: KeyedStream[(String, Int), Tuple] = wordAndOne.keyBy(0)//5.聚合计算val result: DataStream[(String, Int)] = KeyedStream.reduce((v1,v2)=>(v1._1,v1._2+v2._2))//6.打印输出result.print().setParallelism(1)//7.执行程序env.execute("StreamKeyBy")}
}

2. Connect

用来将两个 dataStream 组装成一个 ConnectedStreams 而且这个 connectedStream 的组成结构就是保留原有的 dataStream 的结构体;这样我们 就可以把不同的数据组装成同一个结构

代码示例:

package com.czxy.flink.stream.transformationimport org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{ConnectedStreams, DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
/*** Connect合并流 */
object StreamConnect {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval source1: DataStream[Long] = env.addSource(new NoParallelSource()).setParallelism(1)val source2: DataStream[Long] = env.addSource(new NoParallelSource()).setParallelism(1)val connectedStreams: ConnectedStreams[Long, Long] = source1.connect(source2)val result: DataStream[String] = connectedStreams.map(item1 => {"item1: " + item1},item2 => {"item2: " + item2})result.print()env.execute("StreamConnect")}//实现一个单线程的,数据从1开始递增的数据集class NoParallelSource extends SourceFunction[Long]() {var number: Long = 1Lvar isRunning: Boolean = trueoverride def run(ctx: SourceFunction.SourceContext[Long]): Unit = {while (isRunning) {ctx.collect(number)number += 1Thread.sleep(1)if (number > 5) {cancel()}}}override def cancel(): Unit = {isRunning = false}}
}

3. Split 和 select


代码实现

package com.czxy.flink.stream.transformationimport org.apache.flink.streaming.api.scala.{DataStream, SplitStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
/*** 需求:* 给出数据 1, 2, 3, 4, 5, 6, 7* 请使用 split 和 select 把数据中的奇偶数分开, 并打印出奇数*/
object StreamSplit {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval source: DataStream[Int] = env.fromElements(1, 2, 3, 4, 5, 6, 7)val splitStream: SplitStream[Int] = source.split(x => {(x % 2) match {case 0 => List("偶数")case 1 => List("奇数")}})val result: DataStream[Int] = splitStream.select("奇数")result.print()env.execute("StreamSplit")}
}

数据输出 Data Sinks

  • 3 、将数据 sink 到本地文件(参考批处理)
  • 4 、Sink 到本地集合(参考批处理)
  • 5 、Sink 到 HDFS(参考批处理)

1.sink 到 kafka

代码示例

package com.czxy.flink.stream.sinkimport java.util.Propertiesimport org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011
import org.apache.flink.streaming.util.serialization.SimpleStringSchemaobject StreamKafkaSink {def main(args: Array[String]): Unit = {//1.创建执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.构建数据集import org.apache.flink.api.scala._val source: DataStream[String] = env.fromElements("1,小丽,北京,女")//3.设置kafka的配置信息val topic="test"val properties: Properties = new Properties()properties.setProperty("bootstrap.servers","node01:9092")val flinkKafkaProducer: FlinkKafkaProducer011[String] = new FlinkKafkaProducer011[String](topic,new SimpleStringSchema(),properties)val result: DataStreamSink[String] = source.addSink(flinkKafkaProducer)//    source.addSink(flinkKafkaProducer)
//    source.print()env.execute("StreamKafkaSink")}
}

2. sink 到 mysql

package com.czxy.flink.stream.sinkimport java.sql.{Connection, DriverManager, PreparedStatement}import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._object StreamMysqlSink {case class Student(stuId: Int, stuName: String, stuAddr: String, stuSex: String)def main(args: Array[String]): Unit = {//1.创建执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.准备数据val source: DataStream[Student] = env.fromElements(//Student(8, "小青", "广州", "女"),Student(9, "wangman", "beijing", "nv"))val result: DataStreamSink[Student] = source.addSink(new MysqlSink())env.execute("StreamMysqlSink")}class MysqlSink extends RichSinkFunction[Student]() {var connection: Connection = nullvar ps: PreparedStatement = nulloverride def open(parameters: Configuration): Unit = {val driver = "com.mysql.jdbc.Driver"val url = "jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&useSSL=false"val username = "root"val password = "root"//1:加载驱动Class.forName(driver)//2:创建连接connection = DriverManager.getConnection(url, username, password)val sql ="""|insert into student(id , name , addr , sex)values(?,?,?,?);|""".stripMargin//3:获得执行语句ps = connection.prepareStatement(sql)}
//    //关闭连接操作
//    override def close(): Unit = {
//      if (connection != null) {
//        connection.close() 
//      }
//      if (ps != null) {
//        ps.close()
//      }
//    }//每个元素的插入,都要触发一次 invoke,这里主要进行 invoke 插入 每条数据执行一次override def invoke(value: Student): Unit = {try{//4.组装数据,执行插入操作ps.setInt(1, value.stuId)ps.setString(2, value.stuName)ps.setString(3, value.stuAddr)ps.setString(4, value.stuSex)ps.executeUpdate()}catch{case e:Exception=>println(e.getMessage)}}}
}