Flink_Flink 的分布式缓存

文章目录

  • Flink 的分布式缓存
  • Flink Accumulators & Counters(了解)

Flink 的分布式缓存


操作步骤

  1. 将 distribute_cache_student 文件上传到 HDFS / 目录下
  2. 获取批处理运行环境
  3. 创建成绩数据集
  4. 对 成绩 数据集进行 map 转换,将(学生 ID, 学科, 分数)转换为(学生姓名,学科, 分数)
    a. RichMapFunction 的 open 方法中,获取分布式缓存数据
    b. 在 map 方法中进行转换
  5. 实现 open 方法
    a. 使用 getRuntimeContext.getDistributedCache.getFile 获取分布式缓存文件
    b. 使用 Scala.fromFile 读取文件,并获取行
    c. 将文本转换为元组(学生 ID,学生姓名),再转换为 List
  6. 实现 map 方法
    a. 从分布式缓存中根据学生 ID 过滤出来学生
    b. 获取学生姓名
    c. 构建最终结果元组
  7. 打印测试

代码实现

package com.czxy.flink.batchimport java.io.File
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import scala.io.Source/*** 需求:* 创建一个 成绩 数据集* List( (1, "语文", 50),(2, "数学", 70), (3, "英文", 86))* 请通过分布式缓存获取到学生姓名, 将数据转换为* List( ("张三", "语文", 50),("李四", "数学", 70), ("王五", "英文", 86))* 注: distribute_cache_student 测试文件保存了学生 ID 以及学生姓名*/
object BatchDisCachedFile {def main(args: Array[String]): Unit = {/*** 实现步骤:* 1) 将 distribute_cache_student 文件上传到 HDFS / 目录下* 2) 获取批处理运行环境* 3) 创建成绩数据集* 4) 对 成绩 数据集进行 map 转换, 将( 学生 ID, 学科, 分数) 转换为( 学生姓名, 学科,* 分数)*  a. RichMapFunction 的 open 方法中, 获取分布式缓存数据*  b. 在 map 方法中进行转换* 5) 实现 open 方法*  a. 使用 getRuntimeContext.getDistributedCache.getFile 获取分布式缓存文件*  b. 使用 Scala.fromFile 读取文件, 并获取行*  c. 将文本转换为元组( 学生 ID, 学生姓名) , 再转换为 List* 6) 实现 map 方法*  a. 从分布式缓存中根据学生 ID 过滤出来学生b. 获取学生姓名*  c. 构建最终结果元组* 7) 打印测试*/// 获取批处理运行环境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//注册分布式缓存文件env.registerCachedFile("hdfs://node01:8020/test/input/distribute_cache_student", "student")//创建成绩数据集import org.apache.flink.api.scala._val scoreDataSet: DataSet[(Int, String, Int)] = env.fromCollection(List((1, "语文", 50), (2, "数学", 70), (3, "英文", 86)))val resultDataSet: DataSet[(String, String, Int)] = scoreDataSet.map(new RichMapFunction[(Int, String, Int), (String, String, Int)] {var stuMap: Map[Int, String] = null//初始化的时候只被执行一次override def open(parameters: Configuration): Unit = {//获取分布式缓存的文件val studentFile: File = getRuntimeContext.getDistributedCache.getFile("student")val linesIter: Iterator[String] = Source.fromFile(studentFile).getLines()stuMap = linesIter.map(item => {val itemArr: Array[String] = item.split(",")(itemArr(0).toInt, itemArr(1))}).toMap}//每条数据都会执行一次override def map(value: (Int, String, Int)): (String, String, Int) = {val name: String = stuMap.getOrElse(value._1, "")(name, value._2, value._3)}})//输出打印测试resultDataSet.print()}
}

Flink Accumulators & Counters(了解)



代码实现

package com.czxy.flink.batchimport org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.common.accumulators.IntCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration/*** 需求:* 给定一个数据源* "a","b","c","d"* 通过累加器打印出多少个元素*/
object BatchCounterDemo {def main(args: Array[String]): Unit = {//1.创建执行环境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//2.构建数据集import org.apache.flink.api.scala._val sourceDataSet: DataSet[String] = env.fromElements("a", "b", "c", "d")//3.数据处理val counterDataSet: DataSet[String] = sourceDataSet.map(new RichMapFunction[String, String] {//1) 创建累加器val counter: IntCounter = new IntCounter()override def open(parameters: Configuration): Unit = {//2) 注册累加器getRuntimeContext.addAccumulator("myAccumulator", counter)}//每条数据都会执行一次override def map(value: String): String = {//3) 使用累加器counter.add(1)value}})
//    counterDataSet.print()counterDataSet.writeAsText("day02/data/output/Accumulator").setParallelism(1)val result: JobExecutionResult = env.execute("BatchCounterDemo")val myAccumulatorValue: Int = result.getAccumulatorResult[Int]("myAccumulator")println(myAccumulatorValue)}
}
  • Flink Broadcast 和 Accumulators 的区别:
    Broadcast(广播变量)允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间 传递变量。
    广播变量可以进行共享,但是不可以进行修改 Accumulators(累加器)是可以在不同任务中对同一个变量进行累加操作。

Flink_Flink 的分布式缓存

文章目录

  • Flink 的分布式缓存
  • Flink Accumulators & Counters(了解)

Flink 的分布式缓存


操作步骤

  1. 将 distribute_cache_student 文件上传到 HDFS / 目录下
  2. 获取批处理运行环境
  3. 创建成绩数据集
  4. 对 成绩 数据集进行 map 转换,将(学生 ID, 学科, 分数)转换为(学生姓名,学科, 分数)
    a. RichMapFunction 的 open 方法中,获取分布式缓存数据
    b. 在 map 方法中进行转换
  5. 实现 open 方法
    a. 使用 getRuntimeContext.getDistributedCache.getFile 获取分布式缓存文件
    b. 使用 Scala.fromFile 读取文件,并获取行
    c. 将文本转换为元组(学生 ID,学生姓名),再转换为 List
  6. 实现 map 方法
    a. 从分布式缓存中根据学生 ID 过滤出来学生
    b. 获取学生姓名
    c. 构建最终结果元组
  7. 打印测试

代码实现

package com.czxy.flink.batchimport java.io.File
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import scala.io.Source/*** 需求:* 创建一个 成绩 数据集* List( (1, "语文", 50),(2, "数学", 70), (3, "英文", 86))* 请通过分布式缓存获取到学生姓名, 将数据转换为* List( ("张三", "语文", 50),("李四", "数学", 70), ("王五", "英文", 86))* 注: distribute_cache_student 测试文件保存了学生 ID 以及学生姓名*/
object BatchDisCachedFile {def main(args: Array[String]): Unit = {/*** 实现步骤:* 1) 将 distribute_cache_student 文件上传到 HDFS / 目录下* 2) 获取批处理运行环境* 3) 创建成绩数据集* 4) 对 成绩 数据集进行 map 转换, 将( 学生 ID, 学科, 分数) 转换为( 学生姓名, 学科,* 分数)*  a. RichMapFunction 的 open 方法中, 获取分布式缓存数据*  b. 在 map 方法中进行转换* 5) 实现 open 方法*  a. 使用 getRuntimeContext.getDistributedCache.getFile 获取分布式缓存文件*  b. 使用 Scala.fromFile 读取文件, 并获取行*  c. 将文本转换为元组( 学生 ID, 学生姓名) , 再转换为 List* 6) 实现 map 方法*  a. 从分布式缓存中根据学生 ID 过滤出来学生b. 获取学生姓名*  c. 构建最终结果元组* 7) 打印测试*/// 获取批处理运行环境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//注册分布式缓存文件env.registerCachedFile("hdfs://node01:8020/test/input/distribute_cache_student", "student")//创建成绩数据集import org.apache.flink.api.scala._val scoreDataSet: DataSet[(Int, String, Int)] = env.fromCollection(List((1, "语文", 50), (2, "数学", 70), (3, "英文", 86)))val resultDataSet: DataSet[(String, String, Int)] = scoreDataSet.map(new RichMapFunction[(Int, String, Int), (String, String, Int)] {var stuMap: Map[Int, String] = null//初始化的时候只被执行一次override def open(parameters: Configuration): Unit = {//获取分布式缓存的文件val studentFile: File = getRuntimeContext.getDistributedCache.getFile("student")val linesIter: Iterator[String] = Source.fromFile(studentFile).getLines()stuMap = linesIter.map(item => {val itemArr: Array[String] = item.split(",")(itemArr(0).toInt, itemArr(1))}).toMap}//每条数据都会执行一次override def map(value: (Int, String, Int)): (String, String, Int) = {val name: String = stuMap.getOrElse(value._1, "")(name, value._2, value._3)}})//输出打印测试resultDataSet.print()}
}

Flink Accumulators & Counters(了解)



代码实现

package com.czxy.flink.batchimport org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.common.accumulators.IntCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration/*** 需求:* 给定一个数据源* "a","b","c","d"* 通过累加器打印出多少个元素*/
object BatchCounterDemo {def main(args: Array[String]): Unit = {//1.创建执行环境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//2.构建数据集import org.apache.flink.api.scala._val sourceDataSet: DataSet[String] = env.fromElements("a", "b", "c", "d")//3.数据处理val counterDataSet: DataSet[String] = sourceDataSet.map(new RichMapFunction[String, String] {//1) 创建累加器val counter: IntCounter = new IntCounter()override def open(parameters: Configuration): Unit = {//2) 注册累加器getRuntimeContext.addAccumulator("myAccumulator", counter)}//每条数据都会执行一次override def map(value: String): String = {//3) 使用累加器counter.add(1)value}})
//    counterDataSet.print()counterDataSet.writeAsText("day02/data/output/Accumulator").setParallelism(1)val result: JobExecutionResult = env.execute("BatchCounterDemo")val myAccumulatorValue: Int = result.getAccumulatorResult[Int]("myAccumulator")println(myAccumulatorValue)}
}
  • Flink Broadcast 和 Accumulators 的区别:
    Broadcast(广播变量)允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间 传递变量。
    广播变量可以进行共享,但是不可以进行修改 Accumulators(累加器)是可以在不同任务中对同一个变量进行累加操作。