Flink_Flink 的分布式缓存
文章目录
- Flink 的分布式缓存
- Flink Accumulators & Counters(了解)
Flink 的分布式缓存
操作步骤
- 将 distribute_cache_student 文件上传到 HDFS / 目录下
- 获取批处理运行环境
- 创建成绩数据集
- 对 成绩 数据集进行 map 转换,将(学生 ID, 学科, 分数)转换为(学生姓名,学科, 分数)
a. RichMapFunction 的 open 方法中,获取分布式缓存数据
b. 在 map 方法中进行转换 - 实现 open 方法
a. 使用 getRuntimeContext.getDistributedCache.getFile 获取分布式缓存文件
b. 使用 Scala.fromFile 读取文件,并获取行
c. 将文本转换为元组(学生 ID,学生姓名),再转换为 List - 实现 map 方法
a. 从分布式缓存中根据学生 ID 过滤出来学生
b. 获取学生姓名
c. 构建最终结果元组 - 打印测试
代码实现
package com.czxy.flink.batchimport java.io.File
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import scala.io.Source/*** 需求:* 创建一个 成绩 数据集* List( (1, "语文", 50),(2, "数学", 70), (3, "英文", 86))* 请通过分布式缓存获取到学生姓名, 将数据转换为* List( ("张三", "语文", 50),("李四", "数学", 70), ("王五", "英文", 86))* 注: distribute_cache_student 测试文件保存了学生 ID 以及学生姓名*/
object BatchDisCachedFile {def main(args: Array[String]): Unit = {/*** 实现步骤:* 1) 将 distribute_cache_student 文件上传到 HDFS / 目录下* 2) 获取批处理运行环境* 3) 创建成绩数据集* 4) 对 成绩 数据集进行 map 转换, 将( 学生 ID, 学科, 分数) 转换为( 学生姓名, 学科,* 分数)* a. RichMapFunction 的 open 方法中, 获取分布式缓存数据* b. 在 map 方法中进行转换* 5) 实现 open 方法* a. 使用 getRuntimeContext.getDistributedCache.getFile 获取分布式缓存文件* b. 使用 Scala.fromFile 读取文件, 并获取行* c. 将文本转换为元组( 学生 ID, 学生姓名) , 再转换为 List* 6) 实现 map 方法* a. 从分布式缓存中根据学生 ID 过滤出来学生b. 获取学生姓名* c. 构建最终结果元组* 7) 打印测试*/// 获取批处理运行环境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//注册分布式缓存文件env.registerCachedFile("hdfs://node01:8020/test/input/distribute_cache_student", "student")//创建成绩数据集import org.apache.flink.api.scala._val scoreDataSet: DataSet[(Int, String, Int)] = env.fromCollection(List((1, "语文", 50), (2, "数学", 70), (3, "英文", 86)))val resultDataSet: DataSet[(String, String, Int)] = scoreDataSet.map(new RichMapFunction[(Int, String, Int), (String, String, Int)] {var stuMap: Map[Int, String] = null//初始化的时候只被执行一次override def open(parameters: Configuration): Unit = {//获取分布式缓存的文件val studentFile: File = getRuntimeContext.getDistributedCache.getFile("student")val linesIter: Iterator[String] = Source.fromFile(studentFile).getLines()stuMap = linesIter.map(item => {val itemArr: Array[String] = item.split(",")(itemArr(0).toInt, itemArr(1))}).toMap}//每条数据都会执行一次override def map(value: (Int, String, Int)): (String, String, Int) = {val name: String = stuMap.getOrElse(value._1, "")(name, value._2, value._3)}})//输出打印测试resultDataSet.print()}
}
Flink Accumulators & Counters(了解)
代码实现
package com.czxy.flink.batchimport org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.common.accumulators.IntCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration/*** 需求:* 给定一个数据源* "a","b","c","d"* 通过累加器打印出多少个元素*/
object BatchCounterDemo {def main(args: Array[String]): Unit = {//1.创建执行环境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//2.构建数据集import org.apache.flink.api.scala._val sourceDataSet: DataSet[String] = env.fromElements("a", "b", "c", "d")//3.数据处理val counterDataSet: DataSet[String] = sourceDataSet.map(new RichMapFunction[String, String] {//1) 创建累加器val counter: IntCounter = new IntCounter()override def open(parameters: Configuration): Unit = {//2) 注册累加器getRuntimeContext.addAccumulator("myAccumulator", counter)}//每条数据都会执行一次override def map(value: String): String = {//3) 使用累加器counter.add(1)value}})
// counterDataSet.print()counterDataSet.writeAsText("day02/data/output/Accumulator").setParallelism(1)val result: JobExecutionResult = env.execute("BatchCounterDemo")val myAccumulatorValue: Int = result.getAccumulatorResult[Int]("myAccumulator")println(myAccumulatorValue)}
}
- Flink Broadcast 和 Accumulators 的区别:
Broadcast(广播变量)允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间 传递变量。
广播变量可以进行共享,但是不可以进行修改 Accumulators(累加器)是可以在不同任务中对同一个变量进行累加操作。
Flink_Flink 的分布式缓存
文章目录
- Flink 的分布式缓存
- Flink Accumulators & Counters(了解)
Flink 的分布式缓存
操作步骤
- 将 distribute_cache_student 文件上传到 HDFS / 目录下
- 获取批处理运行环境
- 创建成绩数据集
- 对 成绩 数据集进行 map 转换,将(学生 ID, 学科, 分数)转换为(学生姓名,学科, 分数)
a. RichMapFunction 的 open 方法中,获取分布式缓存数据
b. 在 map 方法中进行转换 - 实现 open 方法
a. 使用 getRuntimeContext.getDistributedCache.getFile 获取分布式缓存文件
b. 使用 Scala.fromFile 读取文件,并获取行
c. 将文本转换为元组(学生 ID,学生姓名),再转换为 List - 实现 map 方法
a. 从分布式缓存中根据学生 ID 过滤出来学生
b. 获取学生姓名
c. 构建最终结果元组 - 打印测试
代码实现
package com.czxy.flink.batchimport java.io.File
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import scala.io.Source/*** 需求:* 创建一个 成绩 数据集* List( (1, "语文", 50),(2, "数学", 70), (3, "英文", 86))* 请通过分布式缓存获取到学生姓名, 将数据转换为* List( ("张三", "语文", 50),("李四", "数学", 70), ("王五", "英文", 86))* 注: distribute_cache_student 测试文件保存了学生 ID 以及学生姓名*/
object BatchDisCachedFile {def main(args: Array[String]): Unit = {/*** 实现步骤:* 1) 将 distribute_cache_student 文件上传到 HDFS / 目录下* 2) 获取批处理运行环境* 3) 创建成绩数据集* 4) 对 成绩 数据集进行 map 转换, 将( 学生 ID, 学科, 分数) 转换为( 学生姓名, 学科,* 分数)* a. RichMapFunction 的 open 方法中, 获取分布式缓存数据* b. 在 map 方法中进行转换* 5) 实现 open 方法* a. 使用 getRuntimeContext.getDistributedCache.getFile 获取分布式缓存文件* b. 使用 Scala.fromFile 读取文件, 并获取行* c. 将文本转换为元组( 学生 ID, 学生姓名) , 再转换为 List* 6) 实现 map 方法* a. 从分布式缓存中根据学生 ID 过滤出来学生b. 获取学生姓名* c. 构建最终结果元组* 7) 打印测试*/// 获取批处理运行环境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//注册分布式缓存文件env.registerCachedFile("hdfs://node01:8020/test/input/distribute_cache_student", "student")//创建成绩数据集import org.apache.flink.api.scala._val scoreDataSet: DataSet[(Int, String, Int)] = env.fromCollection(List((1, "语文", 50), (2, "数学", 70), (3, "英文", 86)))val resultDataSet: DataSet[(String, String, Int)] = scoreDataSet.map(new RichMapFunction[(Int, String, Int), (String, String, Int)] {var stuMap: Map[Int, String] = null//初始化的时候只被执行一次override def open(parameters: Configuration): Unit = {//获取分布式缓存的文件val studentFile: File = getRuntimeContext.getDistributedCache.getFile("student")val linesIter: Iterator[String] = Source.fromFile(studentFile).getLines()stuMap = linesIter.map(item => {val itemArr: Array[String] = item.split(",")(itemArr(0).toInt, itemArr(1))}).toMap}//每条数据都会执行一次override def map(value: (Int, String, Int)): (String, String, Int) = {val name: String = stuMap.getOrElse(value._1, "")(name, value._2, value._3)}})//输出打印测试resultDataSet.print()}
}
Flink Accumulators & Counters(了解)
代码实现
package com.czxy.flink.batchimport org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.common.accumulators.IntCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration/*** 需求:* 给定一个数据源* "a","b","c","d"* 通过累加器打印出多少个元素*/
object BatchCounterDemo {def main(args: Array[String]): Unit = {//1.创建执行环境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//2.构建数据集import org.apache.flink.api.scala._val sourceDataSet: DataSet[String] = env.fromElements("a", "b", "c", "d")//3.数据处理val counterDataSet: DataSet[String] = sourceDataSet.map(new RichMapFunction[String, String] {//1) 创建累加器val counter: IntCounter = new IntCounter()override def open(parameters: Configuration): Unit = {//2) 注册累加器getRuntimeContext.addAccumulator("myAccumulator", counter)}//每条数据都会执行一次override def map(value: String): String = {//3) 使用累加器counter.add(1)value}})
// counterDataSet.print()counterDataSet.writeAsText("day02/data/output/Accumulator").setParallelism(1)val result: JobExecutionResult = env.execute("BatchCounterDemo")val myAccumulatorValue: Int = result.getAccumulatorResult[Int]("myAccumulator")println(myAccumulatorValue)}
}
- Flink Broadcast 和 Accumulators 的区别:
Broadcast(广播变量)允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间 传递变量。
广播变量可以进行共享,但是不可以进行修改 Accumulators(累加器)是可以在不同任务中对同一个变量进行累加操作。
发布评论