Flink练习之通话时长的统计

Flink的练习之电话通信统计

环境介绍

1. 软件
  • flink版本:flink-1.9.1-bin-scala_2.11.tgz 官网下载
  • kafka版本:kafka_2.11-2.3.1.tgz 官网下载
  • zookeeper版本:3.4.14
  • mysql版本:8.0.18
  • 操作系统:windows 8.1
  • redis
  • 开发工具idea

本文将实现一个电话消费的统计。具体业务描述:使用java代码模拟产生2个人通话的记录,数据结构是{电话号码1, 电话号码2, 通话时间,通话时长}。
然后对将该数据使用kafka传输到Flink,使用Flink的滑动窗口+流式计算统计通话总次数和通话总时长(一定时间段内)

2. kafka准备
1. 启动zookeeper

切换到zookeeper的安装目录:

D:\big-data-software\zookeeper-3.4.14>bin\zkServer.cmd
2. 启动kafka

切换到kafka安装目录:

D:\big-data-software\kafka_2.11-2.3.1>.\bin\windows\kafka-server-start.bat .\config\server.properties
3. 创建主题

切换到kafka的安装目录,创建主题phoneLogTopic

D:\big-data-software\kafka_2.11-2.3.1>.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic phoneLogTopic
Created topic phoneLogTopic.
3. Redis准备

从官方网站下载并解压,启动:

F:\software\flink\redisbin_x64>.\bin\redis-server.exe

练习内容

1. 配置信息和数据源

项目目录结构介绍:

1. contact.log文件
15369468720 李雁
19920860202 卫艺
18411925860 仰莉
14473548449 陶欣悦
18749966182 施梅梅
19379884788 金虹霖
19335715448 魏明艳
18503558939 华贞
13407209608 华啟倩
15596505995 仲采绿
17519874292 卫丹
15178485516 戚丽红
19877232369 何翠柔
18706287692 钱溶艳
18944239644 钱琳
17325302007 缪静欣
18839074540 焦秋菊
19879419704 吕访琴
16480981069 沈丹
18674257265 褚美丽
18302820904 孙怡
15133295266 许婵
17868457605 曹红恋
15490732767 吕柔
15064972307 冯怜云

数据分析:contact.log文件中,电话与人名均是测试用数据,如有雷同纯属偶然。电话和人名之间使用空格分隔,共25条数据。

2. kafka.properties文件
bootstrap.servers=localhost:9092
group.id=phoneLog
enable.automit=true
automit.interval.ms=1000
key.deserializer=org.apache.kafkamon.serialization.StringDeserializer
value.deserializer=org.apache.kafkamon.serialization.StringDeserializer
3. log4j.properties文件

这里因为flink框架的日志信息太多,本人使用了warn级别的日志,在java代码中也是使用warn输出日志。也可调整为其他级别。

log4j.rootLogger=WARN, consolelog4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
4. pom.xml文件
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at.0Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns=".0.0" xmlns:xsi=""xsi:schemaLocation=".0.0 .0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.feng</groupId><artifactId>flink-phone</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>Flink Quickstart Job</name><url>;/url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.9.0</flink.version><java.version>1.8</java.version><scala.binary.version>2.11</scala.binary.version><mavenpiler.source>${java.version}</mavenpiler.source><mavenpiler.target>${java.version}</mavenpiler.target></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${java.version}</source><target>${java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.0.0</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>org.feng.StreamingJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.0.0,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build><profiles><profile><id>add-dependencies-for-IDEA</id><activation><property><name>idea.version</name></property></activation><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>compile</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>compile</scope></dependency></dependencies></profile></profiles><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!--连接kafka--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>1.9.0</version></dependency><!--连接mysql--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.16</version></dependency><!--kafka-clients--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.3.1</version></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.10</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version><scope>runtime</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version><scope>runtime</scope></dependency><!--redis连接池--><dependency><groupId>org.apachemons</groupId><artifactId>commons-pool2</artifactId><version>2.7.0</version></dependency><!--连接redis--><dependency><groupId>com.github.sazzad16</groupId><artifactId>jedis</artifactId><version>2.9.2</version></dependency></dependencies>
</project>

2. java 代码

1. org.feng.datasource包

主要用来读取contact.log数据,随机生成2个人打电话的数据。

其数据结构是:call1,call2,通话时间,通话次数:

例如:18411925860,19920860202,2018-05-29:08:54:04,0728

package org.feng.datasource;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** Created by Feng on 2019/12/12 14:25* CurrentProject's name is flink-phone* 运行在linux下时{@link ReadAndProductLog}:<br>* <p>1. 指定日志输出路径:contact.log的存储路径*  <ul>*      <li>windows下,需要绝路径,比如{@code D://contact.log}</li>*      <li>linux下,相对于"/"路径。比如{@code /usr/local/log/contact.log}</li>*  </ul>* </p>* <p>2. 指定输出路径;即自己生产的通话记录需要存储在某一文件下。</p>* <p>3. 时间设置:2017-2018年</p>* @author Feng*/
public class ProductLogClient {/**** windows环境下测试* @param args 参数:args[0]=D:\jee-2019-7-idea-maven-workspace\flink-phone\src\main\resources\contact.log*/public static void main(String[] args) {ReadAndProductLog readAndProductLog = new ReadAndProductLog();// 开始时间String startDate = "2018-01-01";// 结束时间String endDate = "2018-12-31";// 指定原始日志数据位置:contact.logReadAndProductLog.phoneList = ReadAndProductLog.readLog(args[0]);// 指定topicString topic = "phoneLogTopic";KafkaProducer<String, String> producer = getProducer();//noinspection InfiniteLoopStatementwhile (true) {String data = readAndProductLog.productLog(startDate, endDate);producer.send(new ProducerRecord<>(topic, data));try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}}/*** 获得kafka的生产者* @return KafkaProducer*/private static KafkaProducer<String, String> getProducer(){Properties kafkaProps = new Properties();kafkaProps.put("bootstrap.servers", "localhost:9092");kafkaProps.put("key.serializer", "org.apache.kafkamon.serialization.StringSerializer");kafkaProps.put("value.serializer", "org.apache.kafkamon.serialization.StringSerializer");return new KafkaProducer<>(kafkaProps);}/*** 运行在Linux下* @param args 参数*/@Deprecatedstatic void runInLinux(String[] args){ReadAndProductLog readAndProductLog = new ReadAndProductLog();// 指定需要的参数个数int len = 2;if(args == null || args.length != len){System.out.println("File path of input or output is not define");System.exit(1);}// 开始时间String startDate = "2018-01-01";// 结束时间String endDate = "2018-12-31";// 指定原始日志数据位置:contact.logReadAndProductLog.phoneList = ReadAndProductLog.readLog(args[0]);readAndProductLog.writeToFile(args[1], startDate, endDate);}
}
package org.feng.datasource;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.text.DecimalFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.Date;/*** Created by Feng on 2019/12/12 14:11* CurrentProject's name is flink-phone* <table>*     <tr><td>1. 读取日志,获取所有的电话号码</td></tr>*     <tr><td>2. 将生成的电话通信信息写入指定文件</td></tr>*     <tr><td><b>注意:输入路径和输出路径需要指定</b></td></tr>* </table>* @author Feng*/
class ReadAndProductLog {/**日志*/private static final Logger LOGGER = LoggerFactory.getLogger(ReadAndProductLog.class);/*** 新的日志文件集合:读入原始数据后,对原始数据进行过滤;* 过滤掉人名,只取电话*/static List<String> phoneList;/*** 解析电话日志文件:得到所有电话* @return 电话集合*/static List<String> readLog(String inPath){try (Stream<String> lines = Files.lines(Paths.get(inPath), Charset.defaultCharset())){return lines.map(line -> line.split(" ")[0]).collect(Collectors.toList());} catch (IOException e) {LOGGER.error("Read log error", e);}throw new RuntimeException("Read log Fail");}/*** 产生通话日志信息* 使用字符串拼接:主叫,被叫,随机通话时间,随机通话时长* @return 格式化的字符串 call1,call2,random-date,call-time*/public String productLog(String start, String end){int callOneIndex = new Random().nextInt(phoneList.size());int callTwoIndex;String callOne = phoneList.get(callOneIndex);String callTwo;// 得到不同的电话:打电话时不能自己给自己打while(true){callTwoIndex = new Random().nextInt(phoneList.size());callTwo = phoneList.get(callTwoIndex);if(!Objects.equals(callOne, callTwo)){break;}}// 通话时长,单位为秒int callTime = new Random().nextInt(60 * 30) + 1;// 格式化通话时间:不足四位补0String formatCallTime = new DecimalFormat("0000").format(callTime);Long randomDate = randomDate(start, end);SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd:HH:mm:ss");String formatRandomDate = simpleDateFormat.format(randomDate);// 拼接日志StringBuilder sb = new StringBuilder(callOne);sb.append(",").append(callTwo).append(",").append(formatRandomDate).append(",").append(formatCallTime);System.out.println("Call Log is Build Success [" + sb.toString() + "]");// 时间控制,每1秒产生2条日志try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}return sb.toString();}/*** 随机生成通话时间* @param start yyyy-MM-dd 开始时间* @param end yyyy-MM-dd 结束时间* @return 随机的通话时间*/private Long randomDate(String start, String end){SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");try {Date parseStart = simpleDateFormat.parse(start);Date parseEnd = simpleDateFormat.parse(end);if(parseStart.getTime() > parseEnd.getTime()){return null;}return (parseStart.getTime() + (long) ((parseEnd.getTime() - parseStart.getTime()) * Math.random()));} catch (ParseException e) {LOGGER.error("Random call time create error");}return null;}/*** 把日志追加写入文件* <p>*     策略1:在windows环境下,直接写数据到kafka;<br>*     策略2:在linux环境下,可以搭配此方法对某一文件追加内容。再使用flume进行文件实时监控sink数据到kafka* </p>* @param outPath 输出的文件位置*/@Deprecatedvoid writeToFile(String outPath, String start, String end){try {while(true){Files.write(Paths.get(outPath), (productLog(start, end) + "\r\n").getBytes(Charset.defaultCharset()),StandardOpenOption.APPEND);}} catch (IOException e) {LOGGER.error("Write log error", e);}}
}
2.feng.entity包

实体类:用于Flink计算时,进行keyBy()

package org.feng.entity;import lombok.*;import java.io.Serializable;
import java.util.Objects;/*** Created by Feng on 2019/12/13 8:32* CurrentProject's name is flink-phone* @author Feng*/
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Key implements Serializable {private static final long serialVersionUID = -822704949459066874L;private String tel1;private String tel2;private String date;@Overridepublic boolean equals(Object o) {if (this == o) {return true;}if (!(o instanceof Key)) {return false;}Key key = (Key) o;if (!Objects.equals(tel1, key.tel1)) {return false;}if (!Objects.equals(tel2, key.tel2)) {return false;}return Objects.equals(date, key.date);}@Overridepublic int hashCode() {int result = tel1 != null ? tel1.hashCode() : 0;result = 31 * result + (tel2 != null ? tel2.hashCode() : 0);result = 31 * result + (date != null ? date.hashCode() : 0);return result;}
}
3. org.feng.redis包

用redis帮助计算通话次数;此包定义了支持Jedis池的方法。

package org.feng.redis;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;/*** Created by Feng on 2019/12/13 13:50* CurrentProject's name is flink-phone* 获得 Jedis 对象* @author Feng*/
public class RedisSupport {private static JedisPool jedisPool;private static final Logger LOGGER = LoggerFactory.getLogger(RedisSupport.class);static {JedisPoolConfig config = new JedisPoolConfig();// 最大连接数config.setMaxTotal(30);// 最大空闲连接数config.setMaxIdle(10);// 创建连接池:设置主机地址和端口jedisPool = new JedisPool(config, "localhost", 6379);}/*** 获得Jedis对象* @return Jedis*/public static Jedis getJedis(){LOGGER.info("get jedis");return jedisPool.getResource();}
}

测试jedis:

package org.feng.redis;import redis.clients.jedis.Jedis;/*** Created by Feng on 2019/12/13 18:29* CurrentProject's name is flink-phone* @author Feng*/
public class SupportTest {public static void main(String[] args) {Jedis jedis = RedisSupport.getJedis();jedis.set("fengsoshaui", "123456");System.out.println(jedis.get("fengsoshaui111"));}
}
4. org.feng.util.kafka包

KafkaConsumerLog类定义了获取Flink和Kafka整合的对象。

package org.feng.util.kafka;import org.apache.flink.apimon.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;/*** 通过{@link FlinkKafkaConsumer}的获取,而得到kafka的消费信息* Created by Feng on 2019/12/12 14:40* CurrentProject's name is flink-phone* @author Feng*/
public class KafkaConsumerLog {/**日志*/private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerLog.class);/*** Kafka消费者相关配置信息*/private static final String KAFKA_CONFIG = "/kafka.properties";/**默认主题*/private static final String DEFAULT_TOPIC = "phoneLogTopic";/*** 获取Kafka消费者并设置其消费策略* 1. setStartFromGroupOffsets:* 从偏移位置消费,kafka的默认消费策略* 2. setStartFromEarliest:* 从最早的数据开始进行消费,忽略存储的offset信息* 3. setStartFromLatest:* 从最新的数据进行消费,忽略存储的offset信息* 4. setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long>):* 手工指定开始消费的offset*/public static FlinkKafkaConsumer<String> getKafka(){InputStream in = KafkaConsumerLog.class.getResourceAsStream(KAFKA_CONFIG);Properties properties = new Properties();FlinkKafkaConsumer<String> instance = null;try {properties.load(in);instance =new FlinkKafkaConsumer<>(DEFAULT_TOPIC, new SimpleStringSchema(), properties);instance.setStartFromGroupOffsets();} catch (IOException e) {LOGGER.error("FlinkKafkaConsumer get error");}return instance;}
}

测试:启动生产数据的类,写数据到Kafka;再启动此类,进行测试数据(打印在控制台)

package org.feng.util.kafka;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** Created by Feng on 2019/12/13 18:21* CurrentProject's name is flink-phone* @author Feng*/
public class KafkaTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(KafkaConsumerLog.getKafka()).print();env.execute();}
}
5. org.feng.mysql包

核心包:定义了Flink的Sink类。将数据Sink到Mysql,并调用jedis的相关方法将最终结果入库到mysql。

MysqlEnum类定义了连接mysql需要的信息:

package org.feng.mysql;/*** Created by Feng on 2019/12/12 18:08* CurrentProject's name is flink-phone* @author Feng*/
public enum MysqlEnum {/**url:jdbc连接*/URL("jdbc:mysql://localhost:3306/flink_phone?characterEncoding=UTF-8&serverTimezone=UTC"),/**用户名*/USERNAME("root"),/**密码*/PASSWORD("root");/*** 值*/private String value;MysqlEnum(String value){this.value = value;}public String getValue(){return value;}
}

MysqlSourceSupport:获取mysql连接对象,使用了最简单的jdbc。并读取2个mysql表的数据,存储在HashMap中,以备后边计算使用。

package org.feng.mysql;import com.mysql.cj.jdbc.Driver;import java.sql.*;
import java.util.HashMap;
import java.util.Map;/*** Created by Feng on 2019/12/12 18:40* CurrentProject's name is flink-phone* 使用JDBC从mysql中获取表中数据:存储进map中*/
class MysqlSourceSupport {/** 初始容量:25 / 0.75 = 33.333...*/static Map<String, Integer> userMap = new HashMap<>(34);/** 初始容量:365 / 0.75 = 486.666...*/static Map<String, Integer> dateInfoMap = new HashMap<>(487);MysqlSourceSupport(){try {// 加载驱动new Driver();initUserMap();initDateInfoMap();} catch (SQLException e) {e.printStackTrace();}}/**获取连接对象*/static Connection getConnection() throws SQLException {return DriverManager.getConnection(MysqlEnum.URL.getValue(), MysqlEnum.USERNAME.getValue(),MysqlEnum.PASSWORD.getValue());}/**初始化用户map*/private void initUserMap() throws SQLException {Connection connection = getConnection();PreparedStatement pstm = connection.prepareStatement("select id, tel from ct_user");ResultSet res = pstm.executeQuery();while (res.next()){int id = res.getInt("id");String tel = res.getString("tel");userMap.put(tel, id);}res.close();connection.close();}/**初始化日期map*/private void initDateInfoMap() throws SQLException {Connection connection = getConnection();PreparedStatement pstm = connection.prepareStatement("select id, year, month, day from ct_date");ResultSet res = pstm.executeQuery();while (res.next()){int id = res.getInt(1);String year = res.getString(2);String month = res.getString(3);if ( month.length() == 1 ) {month = "0" + month;}String day = res.getString(4);if ( day.length() == 1 ) {day = "0" + day;}dateInfoMap.put(year + month + day, id);}res.close();connection.close();}
}

核心启动类:ConsumerLog作为消费入口类,使用Flink的流式处理,滑动窗口计算了过去一段时间的数据信息。计算了通话时长的总和。

package org.feng.mysql;import org.apache.flink.apimon.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.feng.entity.Key;
import org.feng.util.kafka.KafkaConsumerLog;/*** Created by Feng on 2019/12/13 15:05* CurrentProject's name is flink-phone* 这个类是最终的启动类:消费层的启动。注意:参数设置,在滑动窗口处* 应该设置为1天计算1次,或1月计算1次。<br>* 如此才能达到统计该通话者的某一时间段通话总次数+通话的总时长* @author Feng*/
public class ConsumerLog {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 数据格式:17336673697,19602240179,2018-03-08:11:32:12,1159DataStream<Tuple2<Key, Integer>> dataStream =env.addSource(KafkaConsumerLog.getKafka()).flatMap(new FlatMapFunction<String, Tuple2<Key, Integer>>() {private static final long serialVersionUID = 8550835415812760334L;@Overridepublic void flatMap(String data, Collector<Tuple2<Key, Integer>> out) {String[] split = data.split(",");String temp = split[2].substring(0, 10);temp = temp.replace("-", "");Key key = new Key(split[0], split[1], temp);out.collect(new Tuple2<>(key, Integer.valueOf(split[3])));}});// 按照Tuple2的f0元素分区:也就是按照Key对象分区// 每3秒计算一次3秒前的数据:数据不重合dataStream.keyBy("f0").timeWindowAll(Time.seconds(3), Time.seconds(3)).sum(1).addSink(new MysqlSink());env.execute("ConsumerLogTest");}
}

MysqlSink是自定义的Flink的Sink类。将数据持久化到mysql和redis中。其中redis只是用于帮助计算通话次数的。

package org.feng.mysql;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.feng.entity.Key;
import org.feng.redis.RedisSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;import java.sql.Connection;
import java.sql.PreparedStatement;/*** Created by Feng on 2019/12/13 15:10* CurrentProject's name is flink-phone* 自定义sink:将数据持久化到mysql* @author Feng*/
public class MysqlSink extends RichSinkFunction<Tuple2<Key, Integer>> {private static final Logger LOGGER = LoggerFactory.getLogger(MysqlSink.class);private static final long serialVersionUID = 7410586813590080807L;private PreparedStatement pstm;private Connection connection;@Overridepublic void open(Configuration parameters) throws Exception {// 获取连接connection = MysqlSourceSupport.getConnection();// 创建语句String sql = "insert into ct_call (telid, dateid, sumcall, sumduration) values (?,?,?,?)";pstm = connection.prepareStatement(sql);LOGGER.warn("sql:" + sql);LOGGER.warn("connection:" + connection);LOGGER.warn("pstm:" + pstm);}@Overridepublic void invoke(Tuple2<Key, Integer> value, Context context) throws Exception {// 初始化数据new MysqlSourceSupport();LOGGER.warn("userMap:" + MysqlSourceSupport.userMap);LOGGER.warn("dateInfoMap:" + MysqlSourceSupport.dateInfoMap);LOGGER.warn(value.toString());pstm.setInt(1, MysqlSourceSupport.userMap.get(value.f0.getTel1()));pstm.setInt(2, MysqlSourceSupport.dateInfoMap.get(value.f0.getDate()));// 从redis中获取打本月电话的次数Jedis jedis = RedisSupport.getJedis();String key = MysqlSourceSupport.userMap.get(value.f0.getTel1())+ "_" + MysqlSourceSupport.userMap.get(value.f0.getTel2())+ "_date_" + MysqlSourceSupport.dateInfoMap.get(value.f0.getDate());String redisValue = jedis.get(key);// 第一次:给redis中存储1if(redisValue == null){jedis.set(key, "1");} else {// 每次取数据时都会自增jedis.set(key, (String.valueOf(Integer.valueOf(redisValue) + 1)));}pstm.setInt(3, Integer.valueOf(jedis.get(key)));pstm.setInt(4, value.f1);// info打印数据库语句LOGGER.warn("pstm by param:" + pstm.toString());pstm.executeUpdate();}@Overridepublic void close() throws Exception {super.close();if (pstm != null){pstm.close();}if(connection != null){connection.close();}}
}

3. mysql表的设计

数据库:flink_phone

数据表:

  1. ct_call
  2. ct_date
  3. ct_user
/*Navicat Premium Data TransferSource Server         : MySqlSource Server Type    : MySQLSource Server Version : 80018Source Host           : localhost:3306Source Schema         : flink_phoneTarget Server Type    : MySQLTarget Server Version : 80018File Encoding         : 65001Date: 14/12/2019 14:50:00
*/SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;-- ----------------------------
-- Table structure for ct_call
-- ----------------------------
DROP TABLE IF EXISTS `ct_call`;
CREATE TABLE `ct_call`  (`id` int(11) NOT NULL AUTO_INCREMENT,`telid` int(11) NULL DEFAULT NULL,`dateid` int(11) NULL DEFAULT NULL,`sumcall` int(11) NULL DEFAULT 0,`sumduration` int(11) NULL DEFAULT 0,PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 664 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;-- ----------------------------
-- Table structure for ct_date
-- ----------------------------
DROP TABLE IF EXISTS `ct_date`;
CREATE TABLE `ct_date`  (`id` int(11) NOT NULL AUTO_INCREMENT,`year` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`month` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`day` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 379 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;-- ----------------------------
-- Table structure for ct_user
-- ----------------------------
DROP TABLE IF EXISTS `ct_user`;
CREATE TABLE `ct_user`  (`id` int(11) NOT NULL AUTO_INCREMENT,`tel` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 26 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;SET FOREIGN_KEY_CHECKS = 1;

表中初始数据:(声明:此处数据均是虚拟,若有雷同,就只是雷同)

INSERT INTO `ct_user` VALUES (1, '15369468720', '李雁');
INSERT INTO `ct_user` VALUES (2, '19920860202', '卫艺');
INSERT INTO `ct_user` VALUES (3, '18411925860', '仰莉');
INSERT INTO `ct_user` VALUES (4, '14473548449', '陶欣悦');
INSERT INTO `ct_user` VALUES (5, '18749966182', '施梅梅');
INSERT INTO `ct_user` VALUES (6, '19379884788', '金虹霖');
INSERT INTO `ct_user` VALUES (7, '19335715448', '魏明艳');
INSERT INTO `ct_user` VALUES (8, '18503558939', '华贞');
INSERT INTO `ct_user` VALUES (9, '13407209608', '华啟倩');
INSERT INTO `ct_user` VALUES (10, '15596505995', '仲采绿');
INSERT INTO `ct_user` VALUES (11, '17519874292', '卫丹');
INSERT INTO `ct_user` VALUES (12, '15178485516', '戚丽红');
INSERT INTO `ct_user` VALUES (13, '19877232369', '何翠柔');
INSERT INTO `ct_user` VALUES (14, '18706287692', '钱溶艳');
INSERT INTO `ct_user` VALUES (15, '18944239644', '钱琳');
INSERT INTO `ct_user` VALUES (16, '17325302007', '缪静欣');
INSERT INTO `ct_user` VALUES (17, '18839074540', '焦秋菊');
INSERT INTO `ct_user` VALUES (18, '19879419704', '吕访琴');
INSERT INTO `ct_user` VALUES (19, '16480981069', '沈丹');
INSERT INTO `ct_user` VALUES (20, '18674257265', '褚美丽');
INSERT INTO `ct_user` VALUES (21, '18302820904', '孙怡');
INSERT INTO `ct_user` VALUES (22, '15133295266', '许婵');
INSERT INTO `ct_user` VALUES (23, '17868457605', '曹红恋');
INSERT INTO `ct_user` VALUES (24, '15490732767', '吕柔');
INSERT INTO `ct_user` VALUES (25, '15064972307', '冯怜云');
INSERT INTO `ct_date` VALUES (1, '2018', '', '');
INSERT INTO `ct_date` VALUES (2, '2018', '1', '');
INSERT INTO `ct_date` VALUES (3, '2018', '1', '1');
INSERT INTO `ct_date` VALUES (4, '2018', '1', '2');
INSERT INTO `ct_date` VALUES (5, '2018', '1', '3');
INSERT INTO `ct_date` VALUES (6, '2018', '1', '4');
INSERT INTO `ct_date` VALUES (7, '2018', '1', '5');
INSERT INTO `ct_date` VALUES (8, '2018', '1', '6');
INSERT INTO `ct_date` VALUES (9, '2018', '1', '7');
INSERT INTO `ct_date` VALUES (10, '2018', '1', '8');
INSERT INTO `ct_date` VALUES (11, '2018', '1', '9');
INSERT INTO `ct_date` VALUES (12, '2018', '1', '10');
INSERT INTO `ct_date` VALUES (13, '2018', '1', '11');
INSERT INTO `ct_date` VALUES (14, '2018', '1', '12');
INSERT INTO `ct_date` VALUES (15, '2018', '1', '13');
INSERT INTO `ct_date` VALUES (16, '2018', '1', '14');
INSERT INTO `ct_date` VALUES (17, '2018', '1', '15');
INSERT INTO `ct_date` VALUES (18, '2018', '1', '16');
INSERT INTO `ct_date` VALUES (19, '2018', '1', '17');
INSERT INTO `ct_date` VALUES (20, '2018', '1', '18');
INSERT INTO `ct_date` VALUES (21, '2018', '1', '19');
INSERT INTO `ct_date` VALUES (22, '2018', '1', '20');
INSERT INTO `ct_date` VALUES (23, '2018', '1', '21');
INSERT INTO `ct_date` VALUES (24, '2018', '1', '22');
INSERT INTO `ct_date` VALUES (25, '2018', '1', '23');
INSERT INTO `ct_date` VALUES (26, '2018', '1', '24');
INSERT INTO `ct_date` VALUES (27, '2018', '1', '25');
INSERT INTO `ct_date` VALUES (28, '2018', '1', '26');
INSERT INTO `ct_date` VALUES (29, '2018', '1', '27');
INSERT INTO `ct_date` VALUES (30, '2018', '1', '28');
INSERT INTO `ct_date` VALUES (31, '2018', '1', '29');
INSERT INTO `ct_date` VALUES (32, '2018', '1', '30');
INSERT INTO `ct_date` VALUES (33, '2018', '1', '31');
INSERT INTO `ct_date` VALUES (34, '2018', '2', '');
INSERT INTO `ct_date` VALUES (35, '2018', '2', '1');
INSERT INTO `ct_date` VALUES (36, '2018', '2', '2');
INSERT INTO `ct_date` VALUES (37, '2018', '2', '3');
INSERT INTO `ct_date` VALUES (38, '2018', '2', '4');
INSERT INTO `ct_date` VALUES (39, '2018', '2', '5');
INSERT INTO `ct_date` VALUES (40, '2018', '2', '6');
INSERT INTO `ct_date` VALUES (41, '2018', '2', '7');
INSERT INTO `ct_date` VALUES (42, '2018', '2', '8');
INSERT INTO `ct_date` VALUES (43, '2018', '2', '9');
INSERT INTO `ct_date` VALUES (44, '2018', '2', '10');
INSERT INTO `ct_date` VALUES (45, '2018', '2', '11');
INSERT INTO `ct_date` VALUES (46, '2018', '2', '12');
INSERT INTO `ct_date` VALUES (47, '2018', '2', '13');
INSERT INTO `ct_date` VALUES (48, '2018', '2', '14');
INSERT INTO `ct_date` VALUES (49, '2018', '2', '15');
INSERT INTO `ct_date` VALUES (50, '2018', '2', '16');
INSERT INTO `ct_date` VALUES (51, '2018', '2', '17');
INSERT INTO `ct_date` VALUES (52, '2018', '2', '18');
INSERT INTO `ct_date` VALUES (53, '2018', '2', '19');
INSERT INTO `ct_date` VALUES (54, '2018', '2', '20');
INSERT INTO `ct_date` VALUES (55, '2018', '2', '21');
INSERT INTO `ct_date` VALUES (56, '2018', '2', '22');
INSERT INTO `ct_date` VALUES (57, '2018', '2', '23');
INSERT INTO `ct_date` VALUES (58, '2018', '2', '24');
INSERT INTO `ct_date` VALUES (59, '2018', '2', '25');
INSERT INTO `ct_date` VALUES (60, '2018', '2', '26');
INSERT INTO `ct_date` VALUES (61, '2018', '2', '27');
INSERT INTO `ct_date` VALUES (62, '2018', '2', '28');
INSERT INTO `ct_date` VALUES (63, '2018', '3', '');
INSERT INTO `ct_date` VALUES (64, '2018', '3', '1');
INSERT INTO `ct_date` VALUES (65, '2018', '3', '2');
INSERT INTO `ct_date` VALUES (66, '2018', '3', '3');
INSERT INTO `ct_date` VALUES (67, '2018', '3', '4');
INSERT INTO `ct_date` VALUES (68, '2018', '3', '5');
INSERT INTO `ct_date` VALUES (69, '2018', '3', '6');
INSERT INTO `ct_date` VALUES (70, '2018', '3', '7');
INSERT INTO `ct_date` VALUES (71, '2018', '3', '8');
INSERT INTO `ct_date` VALUES (72, '2018', '3', '9');
INSERT INTO `ct_date` VALUES (73, '2018', '3', '10');
INSERT INTO `ct_date` VALUES (74, '2018', '3', '11');
INSERT INTO `ct_date` VALUES (75, '2018', '3', '12');
INSERT INTO `ct_date` VALUES (76, '2018', '3', '13');
INSERT INTO `ct_date` VALUES (77, '2018', '3', '14');
INSERT INTO `ct_date` VALUES (78, '2018', '3', '15');
INSERT INTO `ct_date` VALUES (79, '2018', '3', '16');
INSERT INTO `ct_date` VALUES (80, '2018', '3', '17');
INSERT INTO `ct_date` VALUES (81, '2018', '3', '18');
INSERT INTO `ct_date` VALUES (82, '2018', '3', '19');
INSERT INTO `ct_date` VALUES (83, '2018', '3', '20');
INSERT INTO `ct_date` VALUES (84, '2018', '3', '21');
INSERT INTO `ct_date` VALUES (85, '2018', '3', '22');
INSERT INTO `ct_date` VALUES (86, '2018', '3', '23');
INSERT INTO `ct_date` VALUES (87, '2018', '3', '24');
INSERT INTO `ct_date` VALUES (88, '2018', '3', '25');
INSERT INTO `ct_date` VALUES (89, '2018', '3', '26');
INSERT INTO `ct_date` VALUES (90, '2018', '3', '27');
INSERT INTO `ct_date` VALUES (91, '2018', '3', '28');
INSERT INTO `ct_date` VALUES (92, '2018', '3', '29');
INSERT INTO `ct_date` VALUES (93, '2018', '3', '30');
INSERT INTO `ct_date` VALUES (94, '2018', '3', '31');
INSERT INTO `ct_date` VALUES (95, '2018', '4', '');
INSERT INTO `ct_date` VALUES (96, '2018', '4', '1');
INSERT INTO `ct_date` VALUES (97, '2018', '4', '2');
INSERT INTO `ct_date` VALUES (98, '2018', '4', '3');
INSERT INTO `ct_date` VALUES (99, '2018', '4', '4');
INSERT INTO `ct_date` VALUES (100, '2018', '4', '5');
INSERT INTO `ct_date` VALUES (101, '2018', '4', '6');
INSERT INTO `ct_date` VALUES (102, '2018', '4', '7');
INSERT INTO `ct_date` VALUES (103, '2018', '4', '8');
INSERT INTO `ct_date` VALUES (104, '2018', '4', '9');
INSERT INTO `ct_date` VALUES (105, '2018', '4', '10');
INSERT INTO `ct_date` VALUES (106, '2018', '4', '11');
INSERT INTO `ct_date` VALUES (107, '2018', '4', '12');
INSERT INTO `ct_date` VALUES (108, '2018', '4', '13');
INSERT INTO `ct_date` VALUES (109, '2018', '4', '14');
INSERT INTO `ct_date` VALUES (110, '2018', '4', '15');
INSERT INTO `ct_date` VALUES (111, '2018', '4', '16');
INSERT INTO `ct_date` VALUES (112, '2018', '4', '17');
INSERT INTO `ct_date` VALUES (113, '2018', '4', '18');
INSERT INTO `ct_date` VALUES (114, '2018', '4', '19');
INSERT INTO `ct_date` VALUES (115, '2018', '4', '20');
INSERT INTO `ct_date` VALUES (116, '2018', '4', '21');
INSERT INTO `ct_date` VALUES (117, '2018', '4', '22');
INSERT INTO `ct_date` VALUES (118, '2018', '4', '23');
INSERT INTO `ct_date` VALUES (119, '2018', '4', '24');
INSERT INTO `ct_date` VALUES (120, '2018', '4', '25');
INSERT INTO `ct_date` VALUES (121, '2018', '4', '26');
INSERT INTO `ct_date` VALUES (122, '2018', '4', '27');
INSERT INTO `ct_date` VALUES (123, '2018', '4', '28');
INSERT INTO `ct_date` VALUES (124, '2018', '4', '29');
INSERT INTO `ct_date` VALUES (125, '2018', '4', '30');
INSERT INTO `ct_date` VALUES (126, '2018', '5', '');
INSERT INTO `ct_date` VALUES (127, '2018', '5', '1');
INSERT INTO `ct_date` VALUES (128, '2018', '5', '2');
INSERT INTO `ct_date` VALUES (129, '2018', '5', '3');
INSERT INTO `ct_date` VALUES (130, '2018', '5', '4');
INSERT INTO `ct_date` VALUES (131, '2018', '5', '5');
INSERT INTO `ct_date` VALUES (132, '2018', '5', '6');
INSERT INTO `ct_date` VALUES (133, '2018', '5', '7');
INSERT INTO `ct_date` VALUES (134, '2018', '5', '8');
INSERT INTO `ct_date` VALUES (135, '2018', '5', '9');
INSERT INTO `ct_date` VALUES (136, '2018', '5', '10');
INSERT INTO `ct_date` VALUES (137, '2018', '5', '11');
INSERT INTO `ct_date` VALUES (138, '2018', '5', '12');
INSERT INTO `ct_date` VALUES (139, '2018', '5', '13');
INSERT INTO `ct_date` VALUES (140, '2018', '5', '14');
INSERT INTO `ct_date` VALUES (141, '2018', '5', '15');
INSERT INTO `ct_date` VALUES (142, '2018', '5', '16');
INSERT INTO `ct_date` VALUES (143, '2018', '5', '17');
INSERT INTO `ct_date` VALUES (144, '2018', '5', '18');
INSERT INTO `ct_date` VALUES (145, '2018', '5', '19');
INSERT INTO `ct_date` VALUES (146, '2018', '5', '20');
INSERT INTO `ct_date` VALUES (147, '2018', '5', '21');
INSERT INTO `ct_date` VALUES (148, '2018', '5', '22');
INSERT INTO `ct_date` VALUES (149, '2018', '5', '23');
INSERT INTO `ct_date` VALUES (150, '2018', '5', '24');
INSERT INTO `ct_date` VALUES (151, '2018', '5', '25');
INSERT INTO `ct_date` VALUES (152, '2018', '5', '26');
INSERT INTO `ct_date` VALUES (153, '2018', '5', '27');
INSERT INTO `ct_date` VALUES (154, '2018', '5', '28');
INSERT INTO `ct_date` VALUES (155, '2018', '5', '29');
INSERT INTO `ct_date` VALUES (156, '2018', '5', '30');
INSERT INTO `ct_date` VALUES (157, '2018', '5', '31');
INSERT INTO `ct_date` VALUES (158, '2018', '6', '');
INSERT INTO `ct_date` VALUES (159, '2018', '6', '1');
INSERT INTO `ct_date` VALUES (160, '2018', '6', '2');
INSERT INTO `ct_date` VALUES (161, '2018', '6', '3');
INSERT INTO `ct_date` VALUES (162, '2018', '6', '4');
INSERT INTO `ct_date` VALUES (163, '2018', '6', '5');
INSERT INTO `ct_date` VALUES (164, '2018', '6', '6');
INSERT INTO `ct_date` VALUES (165, '2018', '6', '7');
INSERT INTO `ct_date` VALUES (166, '2018', '6', '8');
INSERT INTO `ct_date` VALUES (167, '2018', '6', '9');
INSERT INTO `ct_date` VALUES (168, '2018', '6', '10');
INSERT INTO `ct_date` VALUES (169, '2018', '6', '11');
INSERT INTO `ct_date` VALUES (170, '2018', '6', '12');
INSERT INTO `ct_date` VALUES (171, '2018', '6', '13');
INSERT INTO `ct_date` VALUES (172, '2018', '6', '14');
INSERT INTO `ct_date` VALUES (173, '2018', '6', '15');
INSERT INTO `ct_date` VALUES (174, '2018', '6', '16');
INSERT INTO `ct_date` VALUES (175, '2018', '6', '17');
INSERT INTO `ct_date` VALUES (176, '2018', '6', '18');
INSERT INTO `ct_date` VALUES (177, '2018', '6', '19');
INSERT INTO `ct_date` VALUES (178, '2018', '6', '20');
INSERT INTO `ct_date` VALUES (179, '2018', '6', '21');
INSERT INTO `ct_date` VALUES (180, '2018', '6', '22');
INSERT INTO `ct_date` VALUES (181, '2018', '6', '23');
INSERT INTO `ct_date` VALUES (182, '2018', '6', '24');
INSERT INTO `ct_date` VALUES (183, '2018', '6', '25');
INSERT INTO `ct_date` VALUES (184, '2018', '6', '26');
INSERT INTO `ct_date` VALUES (185, '2018', '6', '27');
INSERT INTO `ct_date` VALUES (186, '2018', '6', '28');
INSERT INTO `ct_date` VALUES (187, '2018', '6', '29');
INSERT INTO `ct_date` VALUES (188, '2018', '6', '30');
INSERT INTO `ct_date` VALUES (189, '2018', '7', '');
INSERT INTO `ct_date` VALUES (190, '2018', '7', '1');
INSERT INTO `ct_date` VALUES (191, '2018', '7', '2');
INSERT INTO `ct_date` VALUES (192, '2018', '7', '3');
INSERT INTO `ct_date` VALUES (193, '2018', '7', '4');
INSERT INTO `ct_date` VALUES (194, '2018', '7', '5');
INSERT INTO `ct_date` VALUES (195, '2018', '7', '6');
INSERT INTO `ct_date` VALUES (196, '2018', '7', '7');
INSERT INTO `ct_date` VALUES (197, '2018', '7', '8');
INSERT INTO `ct_date` VALUES (198, '2018', '7', '9');
INSERT INTO `ct_date` VALUES (199, '2018', '7', '10');
INSERT INTO `ct_date` VALUES (200, '2018', '7', '11');
INSERT INTO `ct_date` VALUES (201, '2018', '7', '12');
INSERT INTO `ct_date` VALUES (202, '2018', '7', '13');
INSERT INTO `ct_date` VALUES (203, '2018', '7', '14');
INSERT INTO `ct_date` VALUES (204, '2018', '7', '15');
INSERT INTO `ct_date` VALUES (205, '2018', '7', '16');
INSERT INTO `ct_date` VALUES (206, '2018', '7', '17');
INSERT INTO `ct_date` VALUES (207, '2018', '7', '18');
INSERT INTO `ct_date` VALUES (208, '2018', '7', '19');
INSERT INTO `ct_date` VALUES (209, '2018', '7', '20');
INSERT INTO `ct_date` VALUES (210, '2018', '7', '21');
INSERT INTO `ct_date` VALUES (211, '2018', '7', '22');
INSERT INTO `ct_date` VALUES (212, '2018', '7', '23');
INSERT INTO `ct_date` VALUES (213, '2018', '7', '24');
INSERT INTO `ct_date` VALUES (214, '2018', '7', '25');
INSERT INTO `ct_date` VALUES (215, '2018', '7', '26');
INSERT INTO `ct_date` VALUES (216, '2018', '7', '27');
INSERT INTO `ct_date` VALUES (217, '2018', '7', '28');
INSERT INTO `ct_date` VALUES (218, '2018', '7', '29');
INSERT INTO `ct_date` VALUES (219, '2018', '7', '30');
INSERT INTO `ct_date` VALUES (220, '2018', '7', '31');
INSERT INTO `ct_date` VALUES (221, '2018', '8', '');
INSERT INTO `ct_date` VALUES (222, '2018', '8', '1');
INSERT INTO `ct_date` VALUES (223, '2018', '8', '2');
INSERT INTO `ct_date` VALUES (224, '2018', '8', '3');
INSERT INTO `ct_date` VALUES (225, '2018', '8', '4');
INSERT INTO `ct_date` VALUES (226, '2018', '8', '5');
INSERT INTO `ct_date` VALUES (227, '2018', '8', '6');
INSERT INTO `ct_date` VALUES (228, '2018', '8', '7');
INSERT INTO `ct_date` VALUES (229, '2018', '8', '8');
INSERT INTO `ct_date` VALUES (230, '2018', '8', '9');
INSERT INTO `ct_date` VALUES (231, '2018', '8', '10');
INSERT INTO `ct_date` VALUES (232, '2018', '8', '11');
INSERT INTO `ct_date` VALUES (233, '2018', '8', '12');
INSERT INTO `ct_date` VALUES (234, '2018', '8', '13');
INSERT INTO `ct_date` VALUES (235, '2018', '8', '14');
INSERT INTO `ct_date` VALUES (236, '2018', '8', '15');
INSERT INTO `ct_date` VALUES (237, '2018', '8', '16');
INSERT INTO `ct_date` VALUES (238, '2018', '8', '17');
INSERT INTO `ct_date` VALUES (239, '2018', '8', '18');
INSERT INTO `ct_date` VALUES (240, '2018', '8', '19');
INSERT INTO `ct_date` VALUES (241, '2018', '8', '20');
INSERT INTO `ct_date` VALUES (242, '2018', '8', '21');
INSERT INTO `ct_date` VALUES (243, '2018', '8', '22');
INSERT INTO `ct_date` VALUES (244, '2018', '8', '23');
INSERT INTO `ct_date` VALUES (245, '2018', '8', '24');
INSERT INTO `ct_date` VALUES (246, '2018', '8', '25');
INSERT INTO `ct_date` VALUES (247, '2018', '8', '26');
INSERT INTO `ct_date` VALUES (248, '2018', '8', '27');
INSERT INTO `ct_date` VALUES (249, '2018', '8', '28');
INSERT INTO `ct_date` VALUES (250, '2018', '8', '29');
INSERT INTO `ct_date` VALUES (251, '2018', '8', '30');
INSERT INTO `ct_date` VALUES (252, '2018', '8', '31');
INSERT INTO `ct_date` VALUES (253, '2018', '9', '');
INSERT INTO `ct_date` VALUES (254, '2018', '9', '1');
INSERT INTO `ct_date` VALUES (255, '2018', '9', '2');
INSERT INTO `ct_date` VALUES (256, '2018', '9', '3');
INSERT INTO `ct_date` VALUES (257, '2018', '9', '4');
INSERT INTO `ct_date` VALUES (258, '2018', '9', '5');
INSERT INTO `ct_date` VALUES (259, '2018', '9', '6');
INSERT INTO `ct_date` VALUES (260, '2018', '9', '7');
INSERT INTO `ct_date` VALUES (261, '2018', '9', '8');
INSERT INTO `ct_date` VALUES (262, '2018', '9', '9');
INSERT INTO `ct_date` VALUES (263, '2018', '9', '10');
INSERT INTO `ct_date` VALUES (264, '2018', '9', '11');
INSERT INTO `ct_date` VALUES (265, '2018', '9', '12');
INSERT INTO `ct_date` VALUES (266, '2018', '9', '13');
INSERT INTO `ct_date` VALUES (267, '2018', '9', '14');
INSERT INTO `ct_date` VALUES (268, '2018', '9', '15');
INSERT INTO `ct_date` VALUES (269, '2018', '9', '16');
INSERT INTO `ct_date` VALUES (270, '2018', '9', '17');
INSERT INTO `ct_date` VALUES (271, '2018', '9', '18');
INSERT INTO `ct_date` VALUES (272, '2018', '9', '19');
INSERT INTO `ct_date` VALUES (273, '2018', '9', '20');
INSERT INTO `ct_date` VALUES (274, '2018', '9', '21');
INSERT INTO `ct_date` VALUES (275, '2018', '9', '22');
INSERT INTO `ct_date` VALUES (276, '2018', '9', '23');
INSERT INTO `ct_date` VALUES (277, '2018', '9', '24');
INSERT INTO `ct_date` VALUES (278, '2018', '9', '25');
INSERT INTO `ct_date` VALUES (279, '2018', '9', '26');
INSERT INTO `ct_date` VALUES (280, '2018', '9', '27');
INSERT INTO `ct_date` VALUES (281, '2018', '9', '28');
INSERT INTO `ct_date` VALUES (282, '2018', '9', '29');
INSERT INTO `ct_date` VALUES (283, '2018', '9', '30');
INSERT INTO `ct_date` VALUES (284, '2018', '10', '');
INSERT INTO `ct_date` VALUES (285, '2018', '10', '1');
INSERT INTO `ct_date` VALUES (286, '2018', '10', '2');
INSERT INTO `ct_date` VALUES (287, '2018', '10', '3');
INSERT INTO `ct_date` VALUES (288, '2018', '10', '4');
INSERT INTO `ct_date` VALUES (289, '2018', '10', '5');
INSERT INTO `ct_date` VALUES (290, '2018', '10', '6');
INSERT INTO `ct_date` VALUES (291, '2018', '10', '7');
INSERT INTO `ct_date` VALUES (292, '2018', '10', '8');
INSERT INTO `ct_date` VALUES (293, '2018', '10', '9');
INSERT INTO `ct_date` VALUES (294, '2018', '10', '10');
INSERT INTO `ct_date` VALUES (295, '2018', '10', '11');
INSERT INTO `ct_date` VALUES (296, '2018', '10', '12');
INSERT INTO `ct_date` VALUES (297, '2018', '10', '13');
INSERT INTO `ct_date` VALUES (298, '2018', '10', '14');
INSERT INTO `ct_date` VALUES (299, '2018', '10', '15');
INSERT INTO `ct_date` VALUES (300, '2018', '10', '16');
INSERT INTO `ct_date` VALUES (301, '2018', '10', '17');
INSERT INTO `ct_date` VALUES (302, '2018', '10', '18');
INSERT INTO `ct_date` VALUES (303, '2018', '10', '19');
INSERT INTO `ct_date` VALUES (304, '2018', '10', '20');
INSERT INTO `ct_date` VALUES (305, '2018', '10', '21');
INSERT INTO `ct_date` VALUES (306, '2018', '10', '22');
INSERT INTO `ct_date` VALUES (307, '2018', '10', '23');
INSERT INTO `ct_date` VALUES (308, '2018', '10', '24');
INSERT INTO `ct_date` VALUES (309, '2018', '10', '25');
INSERT INTO `ct_date` VALUES (310, '2018', '10', '26');
INSERT INTO `ct_date` VALUES (311, '2018', '10', '27');
INSERT INTO `ct_date` VALUES (312, '2018', '10', '28');
INSERT INTO `ct_date` VALUES (313, '2018', '10', '29');
INSERT INTO `ct_date` VALUES (314, '2018', '10', '30');
INSERT INTO `ct_date` VALUES (315, '2018', '10', '31');
INSERT INTO `ct_date` VALUES (316, '2018', '11', '');
INSERT INTO `ct_date` VALUES (317, '2018', '11', '1');
INSERT INTO `ct_date` VALUES (318, '2018', '11', '2');
INSERT INTO `ct_date` VALUES (319, '2018', '11', '3');
INSERT INTO `ct_date` VALUES (320, '2018', '11', '4');
INSERT INTO `ct_date` VALUES (321, '2018', '11', '5');
INSERT INTO `ct_date` VALUES (322, '2018', '11', '6');
INSERT INTO `ct_date` VALUES (323, '2018', '11', '7');
INSERT INTO `ct_date` VALUES (324, '2018', '11', '8');
INSERT INTO `ct_date` VALUES (325, '2018', '11', '9');
INSERT INTO `ct_date` VALUES (326, '2018', '11', '10');
INSERT INTO `ct_date` VALUES (327, '2018', '11', '11');
INSERT INTO `ct_date` VALUES (328, '2018', '11', '12');
INSERT INTO `ct_date` VALUES (329, '2018', '11', '13');
INSERT INTO `ct_date` VALUES (330, '2018', '11', '14');
INSERT INTO `ct_date` VALUES (331, '2018', '11', '15');
INSERT INTO `ct_date` VALUES (332, '2018', '11', '16');
INSERT INTO `ct_date` VALUES (333, '2018', '11', '17');
INSERT INTO `ct_date` VALUES (334, '2018', '11', '18');
INSERT INTO `ct_date` VALUES (335, '2018', '11', '19');
INSERT INTO `ct_date` VALUES (336, '2018', '11', '20');
INSERT INTO `ct_date` VALUES (337, '2018', '11', '21');
INSERT INTO `ct_date` VALUES (338, '2018', '11', '22');
INSERT INTO `ct_date` VALUES (339, '2018', '11', '23');
INSERT INTO `ct_date` VALUES (340, '2018', '11', '24');
INSERT INTO `ct_date` VALUES (341, '2018', '11', '25');
INSERT INTO `ct_date` VALUES (342, '2018', '11', '26');
INSERT INTO `ct_date` VALUES (343, '2018', '11', '27');
INSERT INTO `ct_date` VALUES (344, '2018', '11', '28');
INSERT INTO `ct_date` VALUES (345, '2018', '11', '29');
INSERT INTO `ct_date` VALUES (346, '2018', '11', '30');
INSERT INTO `ct_date` VALUES (347, '2018', '12', '');
INSERT INTO `ct_date` VALUES (348, '2018', '12', '1');
INSERT INTO `ct_date` VALUES (349, '2018', '12', '2');
INSERT INTO `ct_date` VALUES (350, '2018', '12', '3');
INSERT INTO `ct_date` VALUES (351, '2018', '12', '4');
INSERT INTO `ct_date` VALUES (352, '2018', '12', '5');
INSERT INTO `ct_date` VALUES (353, '2018', '12', '6');
INSERT INTO `ct_date` VALUES (354, '2018', '12', '7');
INSERT INTO `ct_date` VALUES (355, '2018', '12', '8');
INSERT INTO `ct_date` VALUES (356, '2018', '12', '9');
INSERT INTO `ct_date` VALUES (357, '2018', '12', '10');
INSERT INTO `ct_date` VALUES (358, '2018', '12', '11');
INSERT INTO `ct_date` VALUES (359, '2018', '12', '12');
INSERT INTO `ct_date` VALUES (360, '2018', '12', '13');
INSERT INTO `ct_date` VALUES (361, '2018', '12', '14');
INSERT INTO `ct_date` VALUES (362, '2018', '12', '15');
INSERT INTO `ct_date` VALUES (363, '2018', '12', '16');
INSERT INTO `ct_date` VALUES (364, '2018', '12', '17');
INSERT INTO `ct_date` VALUES (365, '2018', '12', '18');
INSERT INTO `ct_date` VALUES (366, '2018', '12', '19');
INSERT INTO `ct_date` VALUES (367, '2018', '12', '20');
INSERT INTO `ct_date` VALUES (368, '2018', '12', '21');
INSERT INTO `ct_date` VALUES (369, '2018', '12', '22');
INSERT INTO `ct_date` VALUES (370, '2018', '12', '23');
INSERT INTO `ct_date` VALUES (371, '2018', '12', '24');
INSERT INTO `ct_date` VALUES (372, '2018', '12', '25');
INSERT INTO `ct_date` VALUES (373, '2018', '12', '26');
INSERT INTO `ct_date` VALUES (374, '2018', '12', '27');
INSERT INTO `ct_date` VALUES (375, '2018', '12', '28');
INSERT INTO `ct_date` VALUES (376, '2018', '12', '29');
INSERT INTO `ct_date` VALUES (377, '2018', '12', '30');
INSERT INTO `ct_date` VALUES (378, '2018', '12', '31');

Flink练习之通话时长的统计

Flink的练习之电话通信统计

环境介绍

1. 软件
  • flink版本:flink-1.9.1-bin-scala_2.11.tgz 官网下载
  • kafka版本:kafka_2.11-2.3.1.tgz 官网下载
  • zookeeper版本:3.4.14
  • mysql版本:8.0.18
  • 操作系统:windows 8.1
  • redis
  • 开发工具idea

本文将实现一个电话消费的统计。具体业务描述:使用java代码模拟产生2个人通话的记录,数据结构是{电话号码1, 电话号码2, 通话时间,通话时长}。
然后对将该数据使用kafka传输到Flink,使用Flink的滑动窗口+流式计算统计通话总次数和通话总时长(一定时间段内)

2. kafka准备
1. 启动zookeeper

切换到zookeeper的安装目录:

D:\big-data-software\zookeeper-3.4.14>bin\zkServer.cmd
2. 启动kafka

切换到kafka安装目录:

D:\big-data-software\kafka_2.11-2.3.1>.\bin\windows\kafka-server-start.bat .\config\server.properties
3. 创建主题

切换到kafka的安装目录,创建主题phoneLogTopic

D:\big-data-software\kafka_2.11-2.3.1>.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic phoneLogTopic
Created topic phoneLogTopic.
3. Redis准备

从官方网站下载并解压,启动:

F:\software\flink\redisbin_x64>.\bin\redis-server.exe

练习内容

1. 配置信息和数据源

项目目录结构介绍:

1. contact.log文件
15369468720 李雁
19920860202 卫艺
18411925860 仰莉
14473548449 陶欣悦
18749966182 施梅梅
19379884788 金虹霖
19335715448 魏明艳
18503558939 华贞
13407209608 华啟倩
15596505995 仲采绿
17519874292 卫丹
15178485516 戚丽红
19877232369 何翠柔
18706287692 钱溶艳
18944239644 钱琳
17325302007 缪静欣
18839074540 焦秋菊
19879419704 吕访琴
16480981069 沈丹
18674257265 褚美丽
18302820904 孙怡
15133295266 许婵
17868457605 曹红恋
15490732767 吕柔
15064972307 冯怜云

数据分析:contact.log文件中,电话与人名均是测试用数据,如有雷同纯属偶然。电话和人名之间使用空格分隔,共25条数据。

2. kafka.properties文件
bootstrap.servers=localhost:9092
group.id=phoneLog
enable.automit=true
automit.interval.ms=1000
key.deserializer=org.apache.kafkamon.serialization.StringDeserializer
value.deserializer=org.apache.kafkamon.serialization.StringDeserializer
3. log4j.properties文件

这里因为flink框架的日志信息太多,本人使用了warn级别的日志,在java代码中也是使用warn输出日志。也可调整为其他级别。

log4j.rootLogger=WARN, consolelog4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
4. pom.xml文件
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at.0Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns=".0.0" xmlns:xsi=""xsi:schemaLocation=".0.0 .0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.feng</groupId><artifactId>flink-phone</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>Flink Quickstart Job</name><url>;/url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.9.0</flink.version><java.version>1.8</java.version><scala.binary.version>2.11</scala.binary.version><mavenpiler.source>${java.version}</mavenpiler.source><mavenpiler.target>${java.version}</mavenpiler.target></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${java.version}</source><target>${java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.0.0</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>org.feng.StreamingJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.0.0,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build><profiles><profile><id>add-dependencies-for-IDEA</id><activation><property><name>idea.version</name></property></activation><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>compile</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>compile</scope></dependency></dependencies></profile></profiles><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!--连接kafka--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>1.9.0</version></dependency><!--连接mysql--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.16</version></dependency><!--kafka-clients--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.3.1</version></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.10</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version><scope>runtime</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version><scope>runtime</scope></dependency><!--redis连接池--><dependency><groupId>org.apachemons</groupId><artifactId>commons-pool2</artifactId><version>2.7.0</version></dependency><!--连接redis--><dependency><groupId>com.github.sazzad16</groupId><artifactId>jedis</artifactId><version>2.9.2</version></dependency></dependencies>
</project>

2. java 代码

1. org.feng.datasource包

主要用来读取contact.log数据,随机生成2个人打电话的数据。

其数据结构是:call1,call2,通话时间,通话次数:

例如:18411925860,19920860202,2018-05-29:08:54:04,0728

package org.feng.datasource;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** Created by Feng on 2019/12/12 14:25* CurrentProject's name is flink-phone* 运行在linux下时{@link ReadAndProductLog}:<br>* <p>1. 指定日志输出路径:contact.log的存储路径*  <ul>*      <li>windows下,需要绝路径,比如{@code D://contact.log}</li>*      <li>linux下,相对于"/"路径。比如{@code /usr/local/log/contact.log}</li>*  </ul>* </p>* <p>2. 指定输出路径;即自己生产的通话记录需要存储在某一文件下。</p>* <p>3. 时间设置:2017-2018年</p>* @author Feng*/
public class ProductLogClient {/**** windows环境下测试* @param args 参数:args[0]=D:\jee-2019-7-idea-maven-workspace\flink-phone\src\main\resources\contact.log*/public static void main(String[] args) {ReadAndProductLog readAndProductLog = new ReadAndProductLog();// 开始时间String startDate = "2018-01-01";// 结束时间String endDate = "2018-12-31";// 指定原始日志数据位置:contact.logReadAndProductLog.phoneList = ReadAndProductLog.readLog(args[0]);// 指定topicString topic = "phoneLogTopic";KafkaProducer<String, String> producer = getProducer();//noinspection InfiniteLoopStatementwhile (true) {String data = readAndProductLog.productLog(startDate, endDate);producer.send(new ProducerRecord<>(topic, data));try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}}/*** 获得kafka的生产者* @return KafkaProducer*/private static KafkaProducer<String, String> getProducer(){Properties kafkaProps = new Properties();kafkaProps.put("bootstrap.servers", "localhost:9092");kafkaProps.put("key.serializer", "org.apache.kafkamon.serialization.StringSerializer");kafkaProps.put("value.serializer", "org.apache.kafkamon.serialization.StringSerializer");return new KafkaProducer<>(kafkaProps);}/*** 运行在Linux下* @param args 参数*/@Deprecatedstatic void runInLinux(String[] args){ReadAndProductLog readAndProductLog = new ReadAndProductLog();// 指定需要的参数个数int len = 2;if(args == null || args.length != len){System.out.println("File path of input or output is not define");System.exit(1);}// 开始时间String startDate = "2018-01-01";// 结束时间String endDate = "2018-12-31";// 指定原始日志数据位置:contact.logReadAndProductLog.phoneList = ReadAndProductLog.readLog(args[0]);readAndProductLog.writeToFile(args[1], startDate, endDate);}
}
package org.feng.datasource;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.text.DecimalFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.Date;/*** Created by Feng on 2019/12/12 14:11* CurrentProject's name is flink-phone* <table>*     <tr><td>1. 读取日志,获取所有的电话号码</td></tr>*     <tr><td>2. 将生成的电话通信信息写入指定文件</td></tr>*     <tr><td><b>注意:输入路径和输出路径需要指定</b></td></tr>* </table>* @author Feng*/
class ReadAndProductLog {/**日志*/private static final Logger LOGGER = LoggerFactory.getLogger(ReadAndProductLog.class);/*** 新的日志文件集合:读入原始数据后,对原始数据进行过滤;* 过滤掉人名,只取电话*/static List<String> phoneList;/*** 解析电话日志文件:得到所有电话* @return 电话集合*/static List<String> readLog(String inPath){try (Stream<String> lines = Files.lines(Paths.get(inPath), Charset.defaultCharset())){return lines.map(line -> line.split(" ")[0]).collect(Collectors.toList());} catch (IOException e) {LOGGER.error("Read log error", e);}throw new RuntimeException("Read log Fail");}/*** 产生通话日志信息* 使用字符串拼接:主叫,被叫,随机通话时间,随机通话时长* @return 格式化的字符串 call1,call2,random-date,call-time*/public String productLog(String start, String end){int callOneIndex = new Random().nextInt(phoneList.size());int callTwoIndex;String callOne = phoneList.get(callOneIndex);String callTwo;// 得到不同的电话:打电话时不能自己给自己打while(true){callTwoIndex = new Random().nextInt(phoneList.size());callTwo = phoneList.get(callTwoIndex);if(!Objects.equals(callOne, callTwo)){break;}}// 通话时长,单位为秒int callTime = new Random().nextInt(60 * 30) + 1;// 格式化通话时间:不足四位补0String formatCallTime = new DecimalFormat("0000").format(callTime);Long randomDate = randomDate(start, end);SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd:HH:mm:ss");String formatRandomDate = simpleDateFormat.format(randomDate);// 拼接日志StringBuilder sb = new StringBuilder(callOne);sb.append(",").append(callTwo).append(",").append(formatRandomDate).append(",").append(formatCallTime);System.out.println("Call Log is Build Success [" + sb.toString() + "]");// 时间控制,每1秒产生2条日志try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}return sb.toString();}/*** 随机生成通话时间* @param start yyyy-MM-dd 开始时间* @param end yyyy-MM-dd 结束时间* @return 随机的通话时间*/private Long randomDate(String start, String end){SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");try {Date parseStart = simpleDateFormat.parse(start);Date parseEnd = simpleDateFormat.parse(end);if(parseStart.getTime() > parseEnd.getTime()){return null;}return (parseStart.getTime() + (long) ((parseEnd.getTime() - parseStart.getTime()) * Math.random()));} catch (ParseException e) {LOGGER.error("Random call time create error");}return null;}/*** 把日志追加写入文件* <p>*     策略1:在windows环境下,直接写数据到kafka;<br>*     策略2:在linux环境下,可以搭配此方法对某一文件追加内容。再使用flume进行文件实时监控sink数据到kafka* </p>* @param outPath 输出的文件位置*/@Deprecatedvoid writeToFile(String outPath, String start, String end){try {while(true){Files.write(Paths.get(outPath), (productLog(start, end) + "\r\n").getBytes(Charset.defaultCharset()),StandardOpenOption.APPEND);}} catch (IOException e) {LOGGER.error("Write log error", e);}}
}
2.feng.entity包

实体类:用于Flink计算时,进行keyBy()

package org.feng.entity;import lombok.*;import java.io.Serializable;
import java.util.Objects;/*** Created by Feng on 2019/12/13 8:32* CurrentProject's name is flink-phone* @author Feng*/
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Key implements Serializable {private static final long serialVersionUID = -822704949459066874L;private String tel1;private String tel2;private String date;@Overridepublic boolean equals(Object o) {if (this == o) {return true;}if (!(o instanceof Key)) {return false;}Key key = (Key) o;if (!Objects.equals(tel1, key.tel1)) {return false;}if (!Objects.equals(tel2, key.tel2)) {return false;}return Objects.equals(date, key.date);}@Overridepublic int hashCode() {int result = tel1 != null ? tel1.hashCode() : 0;result = 31 * result + (tel2 != null ? tel2.hashCode() : 0);result = 31 * result + (date != null ? date.hashCode() : 0);return result;}
}
3. org.feng.redis包

用redis帮助计算通话次数;此包定义了支持Jedis池的方法。

package org.feng.redis;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;/*** Created by Feng on 2019/12/13 13:50* CurrentProject's name is flink-phone* 获得 Jedis 对象* @author Feng*/
public class RedisSupport {private static JedisPool jedisPool;private static final Logger LOGGER = LoggerFactory.getLogger(RedisSupport.class);static {JedisPoolConfig config = new JedisPoolConfig();// 最大连接数config.setMaxTotal(30);// 最大空闲连接数config.setMaxIdle(10);// 创建连接池:设置主机地址和端口jedisPool = new JedisPool(config, "localhost", 6379);}/*** 获得Jedis对象* @return Jedis*/public static Jedis getJedis(){LOGGER.info("get jedis");return jedisPool.getResource();}
}

测试jedis:

package org.feng.redis;import redis.clients.jedis.Jedis;/*** Created by Feng on 2019/12/13 18:29* CurrentProject's name is flink-phone* @author Feng*/
public class SupportTest {public static void main(String[] args) {Jedis jedis = RedisSupport.getJedis();jedis.set("fengsoshaui", "123456");System.out.println(jedis.get("fengsoshaui111"));}
}
4. org.feng.util.kafka包

KafkaConsumerLog类定义了获取Flink和Kafka整合的对象。

package org.feng.util.kafka;import org.apache.flink.apimon.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;/*** 通过{@link FlinkKafkaConsumer}的获取,而得到kafka的消费信息* Created by Feng on 2019/12/12 14:40* CurrentProject's name is flink-phone* @author Feng*/
public class KafkaConsumerLog {/**日志*/private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerLog.class);/*** Kafka消费者相关配置信息*/private static final String KAFKA_CONFIG = "/kafka.properties";/**默认主题*/private static final String DEFAULT_TOPIC = "phoneLogTopic";/*** 获取Kafka消费者并设置其消费策略* 1. setStartFromGroupOffsets:* 从偏移位置消费,kafka的默认消费策略* 2. setStartFromEarliest:* 从最早的数据开始进行消费,忽略存储的offset信息* 3. setStartFromLatest:* 从最新的数据进行消费,忽略存储的offset信息* 4. setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long>):* 手工指定开始消费的offset*/public static FlinkKafkaConsumer<String> getKafka(){InputStream in = KafkaConsumerLog.class.getResourceAsStream(KAFKA_CONFIG);Properties properties = new Properties();FlinkKafkaConsumer<String> instance = null;try {properties.load(in);instance =new FlinkKafkaConsumer<>(DEFAULT_TOPIC, new SimpleStringSchema(), properties);instance.setStartFromGroupOffsets();} catch (IOException e) {LOGGER.error("FlinkKafkaConsumer get error");}return instance;}
}

测试:启动生产数据的类,写数据到Kafka;再启动此类,进行测试数据(打印在控制台)

package org.feng.util.kafka;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** Created by Feng on 2019/12/13 18:21* CurrentProject's name is flink-phone* @author Feng*/
public class KafkaTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(KafkaConsumerLog.getKafka()).print();env.execute();}
}
5. org.feng.mysql包

核心包:定义了Flink的Sink类。将数据Sink到Mysql,并调用jedis的相关方法将最终结果入库到mysql。

MysqlEnum类定义了连接mysql需要的信息:

package org.feng.mysql;/*** Created by Feng on 2019/12/12 18:08* CurrentProject's name is flink-phone* @author Feng*/
public enum MysqlEnum {/**url:jdbc连接*/URL("jdbc:mysql://localhost:3306/flink_phone?characterEncoding=UTF-8&serverTimezone=UTC"),/**用户名*/USERNAME("root"),/**密码*/PASSWORD("root");/*** 值*/private String value;MysqlEnum(String value){this.value = value;}public String getValue(){return value;}
}

MysqlSourceSupport:获取mysql连接对象,使用了最简单的jdbc。并读取2个mysql表的数据,存储在HashMap中,以备后边计算使用。

package org.feng.mysql;import com.mysql.cj.jdbc.Driver;import java.sql.*;
import java.util.HashMap;
import java.util.Map;/*** Created by Feng on 2019/12/12 18:40* CurrentProject's name is flink-phone* 使用JDBC从mysql中获取表中数据:存储进map中*/
class MysqlSourceSupport {/** 初始容量:25 / 0.75 = 33.333...*/static Map<String, Integer> userMap = new HashMap<>(34);/** 初始容量:365 / 0.75 = 486.666...*/static Map<String, Integer> dateInfoMap = new HashMap<>(487);MysqlSourceSupport(){try {// 加载驱动new Driver();initUserMap();initDateInfoMap();} catch (SQLException e) {e.printStackTrace();}}/**获取连接对象*/static Connection getConnection() throws SQLException {return DriverManager.getConnection(MysqlEnum.URL.getValue(), MysqlEnum.USERNAME.getValue(),MysqlEnum.PASSWORD.getValue());}/**初始化用户map*/private void initUserMap() throws SQLException {Connection connection = getConnection();PreparedStatement pstm = connection.prepareStatement("select id, tel from ct_user");ResultSet res = pstm.executeQuery();while (res.next()){int id = res.getInt("id");String tel = res.getString("tel");userMap.put(tel, id);}res.close();connection.close();}/**初始化日期map*/private void initDateInfoMap() throws SQLException {Connection connection = getConnection();PreparedStatement pstm = connection.prepareStatement("select id, year, month, day from ct_date");ResultSet res = pstm.executeQuery();while (res.next()){int id = res.getInt(1);String year = res.getString(2);String month = res.getString(3);if ( month.length() == 1 ) {month = "0" + month;}String day = res.getString(4);if ( day.length() == 1 ) {day = "0" + day;}dateInfoMap.put(year + month + day, id);}res.close();connection.close();}
}

核心启动类:ConsumerLog作为消费入口类,使用Flink的流式处理,滑动窗口计算了过去一段时间的数据信息。计算了通话时长的总和。

package org.feng.mysql;import org.apache.flink.apimon.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.feng.entity.Key;
import org.feng.util.kafka.KafkaConsumerLog;/*** Created by Feng on 2019/12/13 15:05* CurrentProject's name is flink-phone* 这个类是最终的启动类:消费层的启动。注意:参数设置,在滑动窗口处* 应该设置为1天计算1次,或1月计算1次。<br>* 如此才能达到统计该通话者的某一时间段通话总次数+通话的总时长* @author Feng*/
public class ConsumerLog {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 数据格式:17336673697,19602240179,2018-03-08:11:32:12,1159DataStream<Tuple2<Key, Integer>> dataStream =env.addSource(KafkaConsumerLog.getKafka()).flatMap(new FlatMapFunction<String, Tuple2<Key, Integer>>() {private static final long serialVersionUID = 8550835415812760334L;@Overridepublic void flatMap(String data, Collector<Tuple2<Key, Integer>> out) {String[] split = data.split(",");String temp = split[2].substring(0, 10);temp = temp.replace("-", "");Key key = new Key(split[0], split[1], temp);out.collect(new Tuple2<>(key, Integer.valueOf(split[3])));}});// 按照Tuple2的f0元素分区:也就是按照Key对象分区// 每3秒计算一次3秒前的数据:数据不重合dataStream.keyBy("f0").timeWindowAll(Time.seconds(3), Time.seconds(3)).sum(1).addSink(new MysqlSink());env.execute("ConsumerLogTest");}
}

MysqlSink是自定义的Flink的Sink类。将数据持久化到mysql和redis中。其中redis只是用于帮助计算通话次数的。

package org.feng.mysql;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.feng.entity.Key;
import org.feng.redis.RedisSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;import java.sql.Connection;
import java.sql.PreparedStatement;/*** Created by Feng on 2019/12/13 15:10* CurrentProject's name is flink-phone* 自定义sink:将数据持久化到mysql* @author Feng*/
public class MysqlSink extends RichSinkFunction<Tuple2<Key, Integer>> {private static final Logger LOGGER = LoggerFactory.getLogger(MysqlSink.class);private static final long serialVersionUID = 7410586813590080807L;private PreparedStatement pstm;private Connection connection;@Overridepublic void open(Configuration parameters) throws Exception {// 获取连接connection = MysqlSourceSupport.getConnection();// 创建语句String sql = "insert into ct_call (telid, dateid, sumcall, sumduration) values (?,?,?,?)";pstm = connection.prepareStatement(sql);LOGGER.warn("sql:" + sql);LOGGER.warn("connection:" + connection);LOGGER.warn("pstm:" + pstm);}@Overridepublic void invoke(Tuple2<Key, Integer> value, Context context) throws Exception {// 初始化数据new MysqlSourceSupport();LOGGER.warn("userMap:" + MysqlSourceSupport.userMap);LOGGER.warn("dateInfoMap:" + MysqlSourceSupport.dateInfoMap);LOGGER.warn(value.toString());pstm.setInt(1, MysqlSourceSupport.userMap.get(value.f0.getTel1()));pstm.setInt(2, MysqlSourceSupport.dateInfoMap.get(value.f0.getDate()));// 从redis中获取打本月电话的次数Jedis jedis = RedisSupport.getJedis();String key = MysqlSourceSupport.userMap.get(value.f0.getTel1())+ "_" + MysqlSourceSupport.userMap.get(value.f0.getTel2())+ "_date_" + MysqlSourceSupport.dateInfoMap.get(value.f0.getDate());String redisValue = jedis.get(key);// 第一次:给redis中存储1if(redisValue == null){jedis.set(key, "1");} else {// 每次取数据时都会自增jedis.set(key, (String.valueOf(Integer.valueOf(redisValue) + 1)));}pstm.setInt(3, Integer.valueOf(jedis.get(key)));pstm.setInt(4, value.f1);// info打印数据库语句LOGGER.warn("pstm by param:" + pstm.toString());pstm.executeUpdate();}@Overridepublic void close() throws Exception {super.close();if (pstm != null){pstm.close();}if(connection != null){connection.close();}}
}

3. mysql表的设计

数据库:flink_phone

数据表:

  1. ct_call
  2. ct_date
  3. ct_user
/*Navicat Premium Data TransferSource Server         : MySqlSource Server Type    : MySQLSource Server Version : 80018Source Host           : localhost:3306Source Schema         : flink_phoneTarget Server Type    : MySQLTarget Server Version : 80018File Encoding         : 65001Date: 14/12/2019 14:50:00
*/SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;-- ----------------------------
-- Table structure for ct_call
-- ----------------------------
DROP TABLE IF EXISTS `ct_call`;
CREATE TABLE `ct_call`  (`id` int(11) NOT NULL AUTO_INCREMENT,`telid` int(11) NULL DEFAULT NULL,`dateid` int(11) NULL DEFAULT NULL,`sumcall` int(11) NULL DEFAULT 0,`sumduration` int(11) NULL DEFAULT 0,PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 664 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;-- ----------------------------
-- Table structure for ct_date
-- ----------------------------
DROP TABLE IF EXISTS `ct_date`;
CREATE TABLE `ct_date`  (`id` int(11) NOT NULL AUTO_INCREMENT,`year` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`month` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`day` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 379 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;-- ----------------------------
-- Table structure for ct_user
-- ----------------------------
DROP TABLE IF EXISTS `ct_user`;
CREATE TABLE `ct_user`  (`id` int(11) NOT NULL AUTO_INCREMENT,`tel` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 26 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;SET FOREIGN_KEY_CHECKS = 1;

表中初始数据:(声明:此处数据均是虚拟,若有雷同,就只是雷同)

INSERT INTO `ct_user` VALUES (1, '15369468720', '李雁');
INSERT INTO `ct_user` VALUES (2, '19920860202', '卫艺');
INSERT INTO `ct_user` VALUES (3, '18411925860', '仰莉');
INSERT INTO `ct_user` VALUES (4, '14473548449', '陶欣悦');
INSERT INTO `ct_user` VALUES (5, '18749966182', '施梅梅');
INSERT INTO `ct_user` VALUES (6, '19379884788', '金虹霖');
INSERT INTO `ct_user` VALUES (7, '19335715448', '魏明艳');
INSERT INTO `ct_user` VALUES (8, '18503558939', '华贞');
INSERT INTO `ct_user` VALUES (9, '13407209608', '华啟倩');
INSERT INTO `ct_user` VALUES (10, '15596505995', '仲采绿');
INSERT INTO `ct_user` VALUES (11, '17519874292', '卫丹');
INSERT INTO `ct_user` VALUES (12, '15178485516', '戚丽红');
INSERT INTO `ct_user` VALUES (13, '19877232369', '何翠柔');
INSERT INTO `ct_user` VALUES (14, '18706287692', '钱溶艳');
INSERT INTO `ct_user` VALUES (15, '18944239644', '钱琳');
INSERT INTO `ct_user` VALUES (16, '17325302007', '缪静欣');
INSERT INTO `ct_user` VALUES (17, '18839074540', '焦秋菊');
INSERT INTO `ct_user` VALUES (18, '19879419704', '吕访琴');
INSERT INTO `ct_user` VALUES (19, '16480981069', '沈丹');
INSERT INTO `ct_user` VALUES (20, '18674257265', '褚美丽');
INSERT INTO `ct_user` VALUES (21, '18302820904', '孙怡');
INSERT INTO `ct_user` VALUES (22, '15133295266', '许婵');
INSERT INTO `ct_user` VALUES (23, '17868457605', '曹红恋');
INSERT INTO `ct_user` VALUES (24, '15490732767', '吕柔');
INSERT INTO `ct_user` VALUES (25, '15064972307', '冯怜云');
INSERT INTO `ct_date` VALUES (1, '2018', '', '');
INSERT INTO `ct_date` VALUES (2, '2018', '1', '');
INSERT INTO `ct_date` VALUES (3, '2018', '1', '1');
INSERT INTO `ct_date` VALUES (4, '2018', '1', '2');
INSERT INTO `ct_date` VALUES (5, '2018', '1', '3');
INSERT INTO `ct_date` VALUES (6, '2018', '1', '4');
INSERT INTO `ct_date` VALUES (7, '2018', '1', '5');
INSERT INTO `ct_date` VALUES (8, '2018', '1', '6');
INSERT INTO `ct_date` VALUES (9, '2018', '1', '7');
INSERT INTO `ct_date` VALUES (10, '2018', '1', '8');
INSERT INTO `ct_date` VALUES (11, '2018', '1', '9');
INSERT INTO `ct_date` VALUES (12, '2018', '1', '10');
INSERT INTO `ct_date` VALUES (13, '2018', '1', '11');
INSERT INTO `ct_date` VALUES (14, '2018', '1', '12');
INSERT INTO `ct_date` VALUES (15, '2018', '1', '13');
INSERT INTO `ct_date` VALUES (16, '2018', '1', '14');
INSERT INTO `ct_date` VALUES (17, '2018', '1', '15');
INSERT INTO `ct_date` VALUES (18, '2018', '1', '16');
INSERT INTO `ct_date` VALUES (19, '2018', '1', '17');
INSERT INTO `ct_date` VALUES (20, '2018', '1', '18');
INSERT INTO `ct_date` VALUES (21, '2018', '1', '19');
INSERT INTO `ct_date` VALUES (22, '2018', '1', '20');
INSERT INTO `ct_date` VALUES (23, '2018', '1', '21');
INSERT INTO `ct_date` VALUES (24, '2018', '1', '22');
INSERT INTO `ct_date` VALUES (25, '2018', '1', '23');
INSERT INTO `ct_date` VALUES (26, '2018', '1', '24');
INSERT INTO `ct_date` VALUES (27, '2018', '1', '25');
INSERT INTO `ct_date` VALUES (28, '2018', '1', '26');
INSERT INTO `ct_date` VALUES (29, '2018', '1', '27');
INSERT INTO `ct_date` VALUES (30, '2018', '1', '28');
INSERT INTO `ct_date` VALUES (31, '2018', '1', '29');
INSERT INTO `ct_date` VALUES (32, '2018', '1', '30');
INSERT INTO `ct_date` VALUES (33, '2018', '1', '31');
INSERT INTO `ct_date` VALUES (34, '2018', '2', '');
INSERT INTO `ct_date` VALUES (35, '2018', '2', '1');
INSERT INTO `ct_date` VALUES (36, '2018', '2', '2');
INSERT INTO `ct_date` VALUES (37, '2018', '2', '3');
INSERT INTO `ct_date` VALUES (38, '2018', '2', '4');
INSERT INTO `ct_date` VALUES (39, '2018', '2', '5');
INSERT INTO `ct_date` VALUES (40, '2018', '2', '6');
INSERT INTO `ct_date` VALUES (41, '2018', '2', '7');
INSERT INTO `ct_date` VALUES (42, '2018', '2', '8');
INSERT INTO `ct_date` VALUES (43, '2018', '2', '9');
INSERT INTO `ct_date` VALUES (44, '2018', '2', '10');
INSERT INTO `ct_date` VALUES (45, '2018', '2', '11');
INSERT INTO `ct_date` VALUES (46, '2018', '2', '12');
INSERT INTO `ct_date` VALUES (47, '2018', '2', '13');
INSERT INTO `ct_date` VALUES (48, '2018', '2', '14');
INSERT INTO `ct_date` VALUES (49, '2018', '2', '15');
INSERT INTO `ct_date` VALUES (50, '2018', '2', '16');
INSERT INTO `ct_date` VALUES (51, '2018', '2', '17');
INSERT INTO `ct_date` VALUES (52, '2018', '2', '18');
INSERT INTO `ct_date` VALUES (53, '2018', '2', '19');
INSERT INTO `ct_date` VALUES (54, '2018', '2', '20');
INSERT INTO `ct_date` VALUES (55, '2018', '2', '21');
INSERT INTO `ct_date` VALUES (56, '2018', '2', '22');
INSERT INTO `ct_date` VALUES (57, '2018', '2', '23');
INSERT INTO `ct_date` VALUES (58, '2018', '2', '24');
INSERT INTO `ct_date` VALUES (59, '2018', '2', '25');
INSERT INTO `ct_date` VALUES (60, '2018', '2', '26');
INSERT INTO `ct_date` VALUES (61, '2018', '2', '27');
INSERT INTO `ct_date` VALUES (62, '2018', '2', '28');
INSERT INTO `ct_date` VALUES (63, '2018', '3', '');
INSERT INTO `ct_date` VALUES (64, '2018', '3', '1');
INSERT INTO `ct_date` VALUES (65, '2018', '3', '2');
INSERT INTO `ct_date` VALUES (66, '2018', '3', '3');
INSERT INTO `ct_date` VALUES (67, '2018', '3', '4');
INSERT INTO `ct_date` VALUES (68, '2018', '3', '5');
INSERT INTO `ct_date` VALUES (69, '2018', '3', '6');
INSERT INTO `ct_date` VALUES (70, '2018', '3', '7');
INSERT INTO `ct_date` VALUES (71, '2018', '3', '8');
INSERT INTO `ct_date` VALUES (72, '2018', '3', '9');
INSERT INTO `ct_date` VALUES (73, '2018', '3', '10');
INSERT INTO `ct_date` VALUES (74, '2018', '3', '11');
INSERT INTO `ct_date` VALUES (75, '2018', '3', '12');
INSERT INTO `ct_date` VALUES (76, '2018', '3', '13');
INSERT INTO `ct_date` VALUES (77, '2018', '3', '14');
INSERT INTO `ct_date` VALUES (78, '2018', '3', '15');
INSERT INTO `ct_date` VALUES (79, '2018', '3', '16');
INSERT INTO `ct_date` VALUES (80, '2018', '3', '17');
INSERT INTO `ct_date` VALUES (81, '2018', '3', '18');
INSERT INTO `ct_date` VALUES (82, '2018', '3', '19');
INSERT INTO `ct_date` VALUES (83, '2018', '3', '20');
INSERT INTO `ct_date` VALUES (84, '2018', '3', '21');
INSERT INTO `ct_date` VALUES (85, '2018', '3', '22');
INSERT INTO `ct_date` VALUES (86, '2018', '3', '23');
INSERT INTO `ct_date` VALUES (87, '2018', '3', '24');
INSERT INTO `ct_date` VALUES (88, '2018', '3', '25');
INSERT INTO `ct_date` VALUES (89, '2018', '3', '26');
INSERT INTO `ct_date` VALUES (90, '2018', '3', '27');
INSERT INTO `ct_date` VALUES (91, '2018', '3', '28');
INSERT INTO `ct_date` VALUES (92, '2018', '3', '29');
INSERT INTO `ct_date` VALUES (93, '2018', '3', '30');
INSERT INTO `ct_date` VALUES (94, '2018', '3', '31');
INSERT INTO `ct_date` VALUES (95, '2018', '4', '');
INSERT INTO `ct_date` VALUES (96, '2018', '4', '1');
INSERT INTO `ct_date` VALUES (97, '2018', '4', '2');
INSERT INTO `ct_date` VALUES (98, '2018', '4', '3');
INSERT INTO `ct_date` VALUES (99, '2018', '4', '4');
INSERT INTO `ct_date` VALUES (100, '2018', '4', '5');
INSERT INTO `ct_date` VALUES (101, '2018', '4', '6');
INSERT INTO `ct_date` VALUES (102, '2018', '4', '7');
INSERT INTO `ct_date` VALUES (103, '2018', '4', '8');
INSERT INTO `ct_date` VALUES (104, '2018', '4', '9');
INSERT INTO `ct_date` VALUES (105, '2018', '4', '10');
INSERT INTO `ct_date` VALUES (106, '2018', '4', '11');
INSERT INTO `ct_date` VALUES (107, '2018', '4', '12');
INSERT INTO `ct_date` VALUES (108, '2018', '4', '13');
INSERT INTO `ct_date` VALUES (109, '2018', '4', '14');
INSERT INTO `ct_date` VALUES (110, '2018', '4', '15');
INSERT INTO `ct_date` VALUES (111, '2018', '4', '16');
INSERT INTO `ct_date` VALUES (112, '2018', '4', '17');
INSERT INTO `ct_date` VALUES (113, '2018', '4', '18');
INSERT INTO `ct_date` VALUES (114, '2018', '4', '19');
INSERT INTO `ct_date` VALUES (115, '2018', '4', '20');
INSERT INTO `ct_date` VALUES (116, '2018', '4', '21');
INSERT INTO `ct_date` VALUES (117, '2018', '4', '22');
INSERT INTO `ct_date` VALUES (118, '2018', '4', '23');
INSERT INTO `ct_date` VALUES (119, '2018', '4', '24');
INSERT INTO `ct_date` VALUES (120, '2018', '4', '25');
INSERT INTO `ct_date` VALUES (121, '2018', '4', '26');
INSERT INTO `ct_date` VALUES (122, '2018', '4', '27');
INSERT INTO `ct_date` VALUES (123, '2018', '4', '28');
INSERT INTO `ct_date` VALUES (124, '2018', '4', '29');
INSERT INTO `ct_date` VALUES (125, '2018', '4', '30');
INSERT INTO `ct_date` VALUES (126, '2018', '5', '');
INSERT INTO `ct_date` VALUES (127, '2018', '5', '1');
INSERT INTO `ct_date` VALUES (128, '2018', '5', '2');
INSERT INTO `ct_date` VALUES (129, '2018', '5', '3');
INSERT INTO `ct_date` VALUES (130, '2018', '5', '4');
INSERT INTO `ct_date` VALUES (131, '2018', '5', '5');
INSERT INTO `ct_date` VALUES (132, '2018', '5', '6');
INSERT INTO `ct_date` VALUES (133, '2018', '5', '7');
INSERT INTO `ct_date` VALUES (134, '2018', '5', '8');
INSERT INTO `ct_date` VALUES (135, '2018', '5', '9');
INSERT INTO `ct_date` VALUES (136, '2018', '5', '10');
INSERT INTO `ct_date` VALUES (137, '2018', '5', '11');
INSERT INTO `ct_date` VALUES (138, '2018', '5', '12');
INSERT INTO `ct_date` VALUES (139, '2018', '5', '13');
INSERT INTO `ct_date` VALUES (140, '2018', '5', '14');
INSERT INTO `ct_date` VALUES (141, '2018', '5', '15');
INSERT INTO `ct_date` VALUES (142, '2018', '5', '16');
INSERT INTO `ct_date` VALUES (143, '2018', '5', '17');
INSERT INTO `ct_date` VALUES (144, '2018', '5', '18');
INSERT INTO `ct_date` VALUES (145, '2018', '5', '19');
INSERT INTO `ct_date` VALUES (146, '2018', '5', '20');
INSERT INTO `ct_date` VALUES (147, '2018', '5', '21');
INSERT INTO `ct_date` VALUES (148, '2018', '5', '22');
INSERT INTO `ct_date` VALUES (149, '2018', '5', '23');
INSERT INTO `ct_date` VALUES (150, '2018', '5', '24');
INSERT INTO `ct_date` VALUES (151, '2018', '5', '25');
INSERT INTO `ct_date` VALUES (152, '2018', '5', '26');
INSERT INTO `ct_date` VALUES (153, '2018', '5', '27');
INSERT INTO `ct_date` VALUES (154, '2018', '5', '28');
INSERT INTO `ct_date` VALUES (155, '2018', '5', '29');
INSERT INTO `ct_date` VALUES (156, '2018', '5', '30');
INSERT INTO `ct_date` VALUES (157, '2018', '5', '31');
INSERT INTO `ct_date` VALUES (158, '2018', '6', '');
INSERT INTO `ct_date` VALUES (159, '2018', '6', '1');
INSERT INTO `ct_date` VALUES (160, '2018', '6', '2');
INSERT INTO `ct_date` VALUES (161, '2018', '6', '3');
INSERT INTO `ct_date` VALUES (162, '2018', '6', '4');
INSERT INTO `ct_date` VALUES (163, '2018', '6', '5');
INSERT INTO `ct_date` VALUES (164, '2018', '6', '6');
INSERT INTO `ct_date` VALUES (165, '2018', '6', '7');
INSERT INTO `ct_date` VALUES (166, '2018', '6', '8');
INSERT INTO `ct_date` VALUES (167, '2018', '6', '9');
INSERT INTO `ct_date` VALUES (168, '2018', '6', '10');
INSERT INTO `ct_date` VALUES (169, '2018', '6', '11');
INSERT INTO `ct_date` VALUES (170, '2018', '6', '12');
INSERT INTO `ct_date` VALUES (171, '2018', '6', '13');
INSERT INTO `ct_date` VALUES (172, '2018', '6', '14');
INSERT INTO `ct_date` VALUES (173, '2018', '6', '15');
INSERT INTO `ct_date` VALUES (174, '2018', '6', '16');
INSERT INTO `ct_date` VALUES (175, '2018', '6', '17');
INSERT INTO `ct_date` VALUES (176, '2018', '6', '18');
INSERT INTO `ct_date` VALUES (177, '2018', '6', '19');
INSERT INTO `ct_date` VALUES (178, '2018', '6', '20');
INSERT INTO `ct_date` VALUES (179, '2018', '6', '21');
INSERT INTO `ct_date` VALUES (180, '2018', '6', '22');
INSERT INTO `ct_date` VALUES (181, '2018', '6', '23');
INSERT INTO `ct_date` VALUES (182, '2018', '6', '24');
INSERT INTO `ct_date` VALUES (183, '2018', '6', '25');
INSERT INTO `ct_date` VALUES (184, '2018', '6', '26');
INSERT INTO `ct_date` VALUES (185, '2018', '6', '27');
INSERT INTO `ct_date` VALUES (186, '2018', '6', '28');
INSERT INTO `ct_date` VALUES (187, '2018', '6', '29');
INSERT INTO `ct_date` VALUES (188, '2018', '6', '30');
INSERT INTO `ct_date` VALUES (189, '2018', '7', '');
INSERT INTO `ct_date` VALUES (190, '2018', '7', '1');
INSERT INTO `ct_date` VALUES (191, '2018', '7', '2');
INSERT INTO `ct_date` VALUES (192, '2018', '7', '3');
INSERT INTO `ct_date` VALUES (193, '2018', '7', '4');
INSERT INTO `ct_date` VALUES (194, '2018', '7', '5');
INSERT INTO `ct_date` VALUES (195, '2018', '7', '6');
INSERT INTO `ct_date` VALUES (196, '2018', '7', '7');
INSERT INTO `ct_date` VALUES (197, '2018', '7', '8');
INSERT INTO `ct_date` VALUES (198, '2018', '7', '9');
INSERT INTO `ct_date` VALUES (199, '2018', '7', '10');
INSERT INTO `ct_date` VALUES (200, '2018', '7', '11');
INSERT INTO `ct_date` VALUES (201, '2018', '7', '12');
INSERT INTO `ct_date` VALUES (202, '2018', '7', '13');
INSERT INTO `ct_date` VALUES (203, '2018', '7', '14');
INSERT INTO `ct_date` VALUES (204, '2018', '7', '15');
INSERT INTO `ct_date` VALUES (205, '2018', '7', '16');
INSERT INTO `ct_date` VALUES (206, '2018', '7', '17');
INSERT INTO `ct_date` VALUES (207, '2018', '7', '18');
INSERT INTO `ct_date` VALUES (208, '2018', '7', '19');
INSERT INTO `ct_date` VALUES (209, '2018', '7', '20');
INSERT INTO `ct_date` VALUES (210, '2018', '7', '21');
INSERT INTO `ct_date` VALUES (211, '2018', '7', '22');
INSERT INTO `ct_date` VALUES (212, '2018', '7', '23');
INSERT INTO `ct_date` VALUES (213, '2018', '7', '24');
INSERT INTO `ct_date` VALUES (214, '2018', '7', '25');
INSERT INTO `ct_date` VALUES (215, '2018', '7', '26');
INSERT INTO `ct_date` VALUES (216, '2018', '7', '27');
INSERT INTO `ct_date` VALUES (217, '2018', '7', '28');
INSERT INTO `ct_date` VALUES (218, '2018', '7', '29');
INSERT INTO `ct_date` VALUES (219, '2018', '7', '30');
INSERT INTO `ct_date` VALUES (220, '2018', '7', '31');
INSERT INTO `ct_date` VALUES (221, '2018', '8', '');
INSERT INTO `ct_date` VALUES (222, '2018', '8', '1');
INSERT INTO `ct_date` VALUES (223, '2018', '8', '2');
INSERT INTO `ct_date` VALUES (224, '2018', '8', '3');
INSERT INTO `ct_date` VALUES (225, '2018', '8', '4');
INSERT INTO `ct_date` VALUES (226, '2018', '8', '5');
INSERT INTO `ct_date` VALUES (227, '2018', '8', '6');
INSERT INTO `ct_date` VALUES (228, '2018', '8', '7');
INSERT INTO `ct_date` VALUES (229, '2018', '8', '8');
INSERT INTO `ct_date` VALUES (230, '2018', '8', '9');
INSERT INTO `ct_date` VALUES (231, '2018', '8', '10');
INSERT INTO `ct_date` VALUES (232, '2018', '8', '11');
INSERT INTO `ct_date` VALUES (233, '2018', '8', '12');
INSERT INTO `ct_date` VALUES (234, '2018', '8', '13');
INSERT INTO `ct_date` VALUES (235, '2018', '8', '14');
INSERT INTO `ct_date` VALUES (236, '2018', '8', '15');
INSERT INTO `ct_date` VALUES (237, '2018', '8', '16');
INSERT INTO `ct_date` VALUES (238, '2018', '8', '17');
INSERT INTO `ct_date` VALUES (239, '2018', '8', '18');
INSERT INTO `ct_date` VALUES (240, '2018', '8', '19');
INSERT INTO `ct_date` VALUES (241, '2018', '8', '20');
INSERT INTO `ct_date` VALUES (242, '2018', '8', '21');
INSERT INTO `ct_date` VALUES (243, '2018', '8', '22');
INSERT INTO `ct_date` VALUES (244, '2018', '8', '23');
INSERT INTO `ct_date` VALUES (245, '2018', '8', '24');
INSERT INTO `ct_date` VALUES (246, '2018', '8', '25');
INSERT INTO `ct_date` VALUES (247, '2018', '8', '26');
INSERT INTO `ct_date` VALUES (248, '2018', '8', '27');
INSERT INTO `ct_date` VALUES (249, '2018', '8', '28');
INSERT INTO `ct_date` VALUES (250, '2018', '8', '29');
INSERT INTO `ct_date` VALUES (251, '2018', '8', '30');
INSERT INTO `ct_date` VALUES (252, '2018', '8', '31');
INSERT INTO `ct_date` VALUES (253, '2018', '9', '');
INSERT INTO `ct_date` VALUES (254, '2018', '9', '1');
INSERT INTO `ct_date` VALUES (255, '2018', '9', '2');
INSERT INTO `ct_date` VALUES (256, '2018', '9', '3');
INSERT INTO `ct_date` VALUES (257, '2018', '9', '4');
INSERT INTO `ct_date` VALUES (258, '2018', '9', '5');
INSERT INTO `ct_date` VALUES (259, '2018', '9', '6');
INSERT INTO `ct_date` VALUES (260, '2018', '9', '7');
INSERT INTO `ct_date` VALUES (261, '2018', '9', '8');
INSERT INTO `ct_date` VALUES (262, '2018', '9', '9');
INSERT INTO `ct_date` VALUES (263, '2018', '9', '10');
INSERT INTO `ct_date` VALUES (264, '2018', '9', '11');
INSERT INTO `ct_date` VALUES (265, '2018', '9', '12');
INSERT INTO `ct_date` VALUES (266, '2018', '9', '13');
INSERT INTO `ct_date` VALUES (267, '2018', '9', '14');
INSERT INTO `ct_date` VALUES (268, '2018', '9', '15');
INSERT INTO `ct_date` VALUES (269, '2018', '9', '16');
INSERT INTO `ct_date` VALUES (270, '2018', '9', '17');
INSERT INTO `ct_date` VALUES (271, '2018', '9', '18');
INSERT INTO `ct_date` VALUES (272, '2018', '9', '19');
INSERT INTO `ct_date` VALUES (273, '2018', '9', '20');
INSERT INTO `ct_date` VALUES (274, '2018', '9', '21');
INSERT INTO `ct_date` VALUES (275, '2018', '9', '22');
INSERT INTO `ct_date` VALUES (276, '2018', '9', '23');
INSERT INTO `ct_date` VALUES (277, '2018', '9', '24');
INSERT INTO `ct_date` VALUES (278, '2018', '9', '25');
INSERT INTO `ct_date` VALUES (279, '2018', '9', '26');
INSERT INTO `ct_date` VALUES (280, '2018', '9', '27');
INSERT INTO `ct_date` VALUES (281, '2018', '9', '28');
INSERT INTO `ct_date` VALUES (282, '2018', '9', '29');
INSERT INTO `ct_date` VALUES (283, '2018', '9', '30');
INSERT INTO `ct_date` VALUES (284, '2018', '10', '');
INSERT INTO `ct_date` VALUES (285, '2018', '10', '1');
INSERT INTO `ct_date` VALUES (286, '2018', '10', '2');
INSERT INTO `ct_date` VALUES (287, '2018', '10', '3');
INSERT INTO `ct_date` VALUES (288, '2018', '10', '4');
INSERT INTO `ct_date` VALUES (289, '2018', '10', '5');
INSERT INTO `ct_date` VALUES (290, '2018', '10', '6');
INSERT INTO `ct_date` VALUES (291, '2018', '10', '7');
INSERT INTO `ct_date` VALUES (292, '2018', '10', '8');
INSERT INTO `ct_date` VALUES (293, '2018', '10', '9');
INSERT INTO `ct_date` VALUES (294, '2018', '10', '10');
INSERT INTO `ct_date` VALUES (295, '2018', '10', '11');
INSERT INTO `ct_date` VALUES (296, '2018', '10', '12');
INSERT INTO `ct_date` VALUES (297, '2018', '10', '13');
INSERT INTO `ct_date` VALUES (298, '2018', '10', '14');
INSERT INTO `ct_date` VALUES (299, '2018', '10', '15');
INSERT INTO `ct_date` VALUES (300, '2018', '10', '16');
INSERT INTO `ct_date` VALUES (301, '2018', '10', '17');
INSERT INTO `ct_date` VALUES (302, '2018', '10', '18');
INSERT INTO `ct_date` VALUES (303, '2018', '10', '19');
INSERT INTO `ct_date` VALUES (304, '2018', '10', '20');
INSERT INTO `ct_date` VALUES (305, '2018', '10', '21');
INSERT INTO `ct_date` VALUES (306, '2018', '10', '22');
INSERT INTO `ct_date` VALUES (307, '2018', '10', '23');
INSERT INTO `ct_date` VALUES (308, '2018', '10', '24');
INSERT INTO `ct_date` VALUES (309, '2018', '10', '25');
INSERT INTO `ct_date` VALUES (310, '2018', '10', '26');
INSERT INTO `ct_date` VALUES (311, '2018', '10', '27');
INSERT INTO `ct_date` VALUES (312, '2018', '10', '28');
INSERT INTO `ct_date` VALUES (313, '2018', '10', '29');
INSERT INTO `ct_date` VALUES (314, '2018', '10', '30');
INSERT INTO `ct_date` VALUES (315, '2018', '10', '31');
INSERT INTO `ct_date` VALUES (316, '2018', '11', '');
INSERT INTO `ct_date` VALUES (317, '2018', '11', '1');
INSERT INTO `ct_date` VALUES (318, '2018', '11', '2');
INSERT INTO `ct_date` VALUES (319, '2018', '11', '3');
INSERT INTO `ct_date` VALUES (320, '2018', '11', '4');
INSERT INTO `ct_date` VALUES (321, '2018', '11', '5');
INSERT INTO `ct_date` VALUES (322, '2018', '11', '6');
INSERT INTO `ct_date` VALUES (323, '2018', '11', '7');
INSERT INTO `ct_date` VALUES (324, '2018', '11', '8');
INSERT INTO `ct_date` VALUES (325, '2018', '11', '9');
INSERT INTO `ct_date` VALUES (326, '2018', '11', '10');
INSERT INTO `ct_date` VALUES (327, '2018', '11', '11');
INSERT INTO `ct_date` VALUES (328, '2018', '11', '12');
INSERT INTO `ct_date` VALUES (329, '2018', '11', '13');
INSERT INTO `ct_date` VALUES (330, '2018', '11', '14');
INSERT INTO `ct_date` VALUES (331, '2018', '11', '15');
INSERT INTO `ct_date` VALUES (332, '2018', '11', '16');
INSERT INTO `ct_date` VALUES (333, '2018', '11', '17');
INSERT INTO `ct_date` VALUES (334, '2018', '11', '18');
INSERT INTO `ct_date` VALUES (335, '2018', '11', '19');
INSERT INTO `ct_date` VALUES (336, '2018', '11', '20');
INSERT INTO `ct_date` VALUES (337, '2018', '11', '21');
INSERT INTO `ct_date` VALUES (338, '2018', '11', '22');
INSERT INTO `ct_date` VALUES (339, '2018', '11', '23');
INSERT INTO `ct_date` VALUES (340, '2018', '11', '24');
INSERT INTO `ct_date` VALUES (341, '2018', '11', '25');
INSERT INTO `ct_date` VALUES (342, '2018', '11', '26');
INSERT INTO `ct_date` VALUES (343, '2018', '11', '27');
INSERT INTO `ct_date` VALUES (344, '2018', '11', '28');
INSERT INTO `ct_date` VALUES (345, '2018', '11', '29');
INSERT INTO `ct_date` VALUES (346, '2018', '11', '30');
INSERT INTO `ct_date` VALUES (347, '2018', '12', '');
INSERT INTO `ct_date` VALUES (348, '2018', '12', '1');
INSERT INTO `ct_date` VALUES (349, '2018', '12', '2');
INSERT INTO `ct_date` VALUES (350, '2018', '12', '3');
INSERT INTO `ct_date` VALUES (351, '2018', '12', '4');
INSERT INTO `ct_date` VALUES (352, '2018', '12', '5');
INSERT INTO `ct_date` VALUES (353, '2018', '12', '6');
INSERT INTO `ct_date` VALUES (354, '2018', '12', '7');
INSERT INTO `ct_date` VALUES (355, '2018', '12', '8');
INSERT INTO `ct_date` VALUES (356, '2018', '12', '9');
INSERT INTO `ct_date` VALUES (357, '2018', '12', '10');
INSERT INTO `ct_date` VALUES (358, '2018', '12', '11');
INSERT INTO `ct_date` VALUES (359, '2018', '12', '12');
INSERT INTO `ct_date` VALUES (360, '2018', '12', '13');
INSERT INTO `ct_date` VALUES (361, '2018', '12', '14');
INSERT INTO `ct_date` VALUES (362, '2018', '12', '15');
INSERT INTO `ct_date` VALUES (363, '2018', '12', '16');
INSERT INTO `ct_date` VALUES (364, '2018', '12', '17');
INSERT INTO `ct_date` VALUES (365, '2018', '12', '18');
INSERT INTO `ct_date` VALUES (366, '2018', '12', '19');
INSERT INTO `ct_date` VALUES (367, '2018', '12', '20');
INSERT INTO `ct_date` VALUES (368, '2018', '12', '21');
INSERT INTO `ct_date` VALUES (369, '2018', '12', '22');
INSERT INTO `ct_date` VALUES (370, '2018', '12', '23');
INSERT INTO `ct_date` VALUES (371, '2018', '12', '24');
INSERT INTO `ct_date` VALUES (372, '2018', '12', '25');
INSERT INTO `ct_date` VALUES (373, '2018', '12', '26');
INSERT INTO `ct_date` VALUES (374, '2018', '12', '27');
INSERT INTO `ct_date` VALUES (375, '2018', '12', '28');
INSERT INTO `ct_date` VALUES (376, '2018', '12', '29');
INSERT INTO `ct_date` VALUES (377, '2018', '12', '30');
INSERT INTO `ct_date` VALUES (378, '2018', '12', '31');