使用Spark从数据库接入数据
第八章 从数据库接入数据
本章涵盖了
-
从关系数据库中接入数据
-
理解方言在Spark和数据库之间的通信中的作用
-
在Spark中构建高级查询,以便在接入之前对数据库进行寻址
-
理解与数据库的高级通信
-
从Elasticsearch接入数据
在大数据和企业环境中,关系数据库通常是执行分析的数据来源。理解如何通过整个表或SQL SELECT语句从这些数据库中提取数据是有意义的。
在本章中,您将学习从关系数据库中接入数据的几种方法,可以一次性接入全表,也可以在接入之前要求数据库执行一些操作。这些操作可以在数据库级别过滤、连接或聚合数据,以最小化数据传输。
您将在本章中看到Spark支持哪些数据库。当使用Spark不支持的数据库时,需要使用自定义方言。方言是通知Spark如何与数据库通信的一种方式。Spark有一些方言,在大多数情况下,您甚至不需要考虑它们。但是,对于那些特殊情况,您将学习如何构建一个。
图8.1 本章主要关注数据库的数据接入,无论数据库是否被Spark支持,并且需要使用自定义方言。
最后,许多企业使用文档存储和NoSQL数据库。在本章中,您还将学习如何连接到Elasticsearch并完成一个完整的数据接入场景。Elasticsearch是您将学习的唯一一个NoSQL数据库。
在本章中,您将使用MySQL、IBM Informix和Elasticsearch。
图8.1说明了你在数据接入过程中所处的位置。
实验
本章中的例子可以在GitHub中获得:https://github /jgperrin/net.jgp.books.spark.ch08。附录F提供了安装关系数据库的链接、提示和帮助。附录L为接入参考。
8.1 从关系数据库接入
您可能知道,关系数据库是任何企业中都可以找到的事务性数据存储的基石。在大多数情况下,事务一旦发生,就会涉及关系数据库。
假设您有一个包含电影演员信息的关系数据库,并希望按字母顺序显示它们。为此,您将了解Spark建立数据库连接所需的元素(而且,如果您熟悉JDBC,情况也是一样的)。然后,您将了解示例数据库、它的数据和它的模式;使用示例数据库;看看输出;最后,深入研究代码。
8.1.1 数据库连接检查表
Spark需要一个小的信息列表来连接到数据库。Spark通过使用JDBC驱动程序直接连接到关系数据库。要连接到数据库,Spark需要以下内容:
-
workers的类路径中的JDBC驱动程序(您在第2章中了解了worker程序:基本上,它们完成了所有的工作并将处理加载JDBC驱动程序)。
-
连接URL
-
用户名
-
用户密码
驱动程序可能需要其他特定于驱动程序的信息。例如,Informix需要DELIMIDENT=Y,而MySQL服务器在默认情况下期望使用SSL,因此您可能希望指定useSSL=false。
当然,需要向应用程序提供JDBC驱动程序。它可以在您的pom.xml文件:
<dependency><groupId> mysql </groupId><artifactId> mysql-connector-java </artifactId><version> 8.0.8-dmr </version>
...</dependency>
清单8.3更详细地描述了pom .xml文件。
8.1.2 理解示例中使用的数据
对于第一次从数据库中读取数据,您将使用MySQL中的Sakila数据库。Sakila是MySQL附带的一个标准示例数据库;有很多教程,你甚至可以用它学习MySQL。本节描述数据库的用途、结构和数据。图8.2将总结我们的操作。
Sakila示例数据库设计用于表示一个DVD租借商店。好吧,我明白了——你们有些人可能想知道DVD是什么,为什么要租这样的东西。DVD是数字视频(或多功能)光盘的缩写。DVD是一个闪亮的磁盘,直径12厘米(约5英寸),用于存储数字信息。在早期(1995年),它被用来存储数字格式的电影。人们可以购买或租借这些物品,并使用一种名为DVD播放机的设备在电视上观看电影。
如果你不想买DVD,你可以从小商店或像美国的Blockbuster这样的大型连锁店租到。那些商店需要人们登记进出磁盘和其他文物,如VHS磁带(正如我所认识到的,这甚至比DVD更模糊,它确实超出了本书的范围)。1997年,一家创新公司Netflix开始通过邮件租用dvd。
您将使用这个示例数据库,该数据库包含大约15个表和一些视图(图8.2)。
图8.2 Spark接入了存储在MySQL实例中的数据库。Spark需要JDBC驱动程序,就像任何Java应用程序一样。MySQL存储名为Sakila的示例数据库,其中包含23个表和视图。这里表示了一些,例如actor、actor_info、类别等等。你将专注于actor。
正如您在图8.2中看到的,您将在这个项目中使用actor表,它在表8.1中有更详细的描述。您将注意到last_update实现了更改数据捕获(CDC)。
表8.1 在MySQL的Sakila数据库中定义的actor表
列名称 | 列类型 | 属性 | 备注 |
---|---|---|---|
actor_id | SMALLINT | 主键,不为空,唯一,自动递增。在MySQL中,当插入新行时,带有自动增量的整数将自动获得一个新值。Informix和PostgreSQL使用串行和串行数据类型;其他数据库使用存储过程或序列。 | actor表的唯一标识符。 |
first_name | VARCHAR(45) | Not null. | 演员的姓。 |
last_name | VARCHAR(45) | Not null. | 演员的名字。 |
last_update | TIMESTAMP | Not null, CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP | 每次进行更新时自动更新的时间戳。这是一个很好的实践:如果您正在使用和设计rdbms,那么集成它是一件很好的事情。它将允许您实现CDC。 |
CHANGE DATA CAPTURE
更改数据捕获(CDC)是一组软件设计模式,用于确定(和跟踪)已更改的数据,以便使用已更改的数据采取操作。
CDC解决方案最常出现在数据仓库环境中,因为跨越时间捕获和保存数据状态是数据仓库的核心功能之一,但是CDC可以在任何数据库或数据仓库系统中使用。
尽管Spark不是为数据仓库设计的,但CDC技术可以从增量分析中使用。总的来说,添加last_update列作为时间戳(或类似的)是一个好的做法。
8.1.3 期望的输出
让我们看看您将看到的输出。下面的清单显示了通过编程实现的结果:actor表中的5个actor以及元数据。
+--------+----------+---------+-------------------+
|actor_id|first_name|last_name| last_update |
+--------+----------+---------+-------------------+
| 92 | KIRSTEN | AKROYD |2006-02-14 22:34:33|
| 58 | CHRISTIAN| AKROYD |2006-02-14 22:34:33|
| 182 | DEBBIE | AKROYD |2006-02-14 22:34:33|
| 118 | CUBA | ALLEN |2006-02-14 22:34:33|
| 145 | KIM | ALLEN |2006-02-14 22:34:33|
+--------+----------+---------+-------------------+
only showing top 5 rows
root|-- actor_id: integer (nullable = false)|-- first_name: string (nullable = false)|-- last_name: string (nullable = false)|-- last_update: timestamp (nullable = false)The dataframe contains 200 record(s).
小心错误
您可能会看到类似以下的错误:
Exception in thread "main" java.sql.SQLException: No suitable driverat java.sql.DriverManager.getDriver(DriverManager.java:315)at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptio➥ ns$$anonfun$7.apply(JDBCOptions.scala:84)
如果是这样,那么您的类路径上缺少JDBC驱动程序,所以请检查您的pom.xml。请参见清单8.2。
8.1.4 代码
让我们看看生成您想要的输出的代码。您将看到三个选项,可以选择最喜欢的。您还将了解如何修改您的pom.xml文件来加载JDBC驱动程序,只需在dependencies部分中添加它。
要做的第一件事是通过使用dataframe的col()方法标识列,然后可以使用它的orderBy()方法。对dataframe的介绍在第3章。(你将在第11到13章中学习更多的转换和操作。)
最后,您可以使用单行代码对输出进行排序:
df = df .orderBy( df .col( "last_name" ));
实验
这是100号实验室。可以在GitHub上下载:.jgp.books.spark.ch08。它需要MySQL或MariaDB连接。
下面的清单显示了第一个选项。
#清单8.2 MySQLToDatasetApp.java
package net.jgp.books.spark.ch08.lab100_mysql_ingestion;import java.util.Properties;import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;public class MySQLToDatasetApp {public static void main(String[] args ) {MySQLToDatasetApp app = new MySQLToDatasetApp();app .start();}private void start() {SparkSession spark = SparkSession.builder().appName("MySQL to Dataframe using a JDBC Connection" ).master( "local" ).getOrCreate();Properties props = new Properties();props .put( "user" , "root" );props .put( "password" , "Spark<3Java" );props .put( "useSSL" , "false" );Dataset<Row> df = spark .read().jdbc("jdbc:mysql://localhost:3306/sakila?serverTimezone=EST" ,"actor" ,
props );df = df .orderBy( df .col( "last_name" ));df .show(5);df .printSchema();System. out .println( "The dataframe contains " +df .count() + " record(s)." );}}
请注意
如果您阅读了第7章,会注意到,无论接入的是文件还是数据库,接入机制都是类似的。
密码
我知道你知道,你也知道我知道你知道,对吧?一个小提示总是有用的:不要硬编码密码在您的代码;任何人都可以在几秒钟内从JAR文件中提取它(特别是当您的变量名为password !)在存储库中,实验#101使用与清单8.2相同的代码(实验#100),但是从一个环境变量获取密码。
您还需要确保Spark能够访问您的JDBC驱动程序,如清单8.3所示。最简单的方法之一是在项目的pom.xml文件中列出所需的数据库驱动程序。
请注意
我们将在本章中使用MySQL、Informix和Elasticsearch: MySQL是因为它是一个完全支持的数据库,Informix是因为它需要方言,而Elasticsearch是因为它是一个NoSQL数据库。
#清单8.3修改后的用于数据库访问的pom.xml
...<properties>
...<mysql.version> 8.0.8-dmr </mysql.version><informix-jdbc.version> 4.10.8.1 </informix-jdbc.version><elasticsearch-hadoop.version> 6.2.1 </elasticsearch-hadoop.version></properties><dependencies>
...<dependency><groupId> mysql </groupId><artifactId> mysql-connector-java </artifactId><version> ${mysql.version} </version></dependency><dependency><groupId> com.ibm.informix </groupId ><artifactId> jdbc </artifactId><version> ${informix-jdbc.version} </version></dependency><dependency><groupId> org.elasticsearch </groupId>
<artifactId> elasticsearch-hadoop </artifactId><version> ${elasticsearch-hadoop.version} </version></dependency>...
请注意,存储库中的版本号是不同的,因为我尽可能地跟踪最新版本。
8.1.5 替代代码
通常情况下,有不同的方法来编写相同的操作。让我们看看如何调整参数和URL以连接到数据库。在清单8.2中,您使用了以下内容:
Properties props = new Properties();
props .put( "user" , "root" );
props .put( "password" , "Spark<3Java" );
props .put( "useSSL" , "false" );Dataset<Row> df = spark .read().jdbc("jdbc:mysql://localhost:3306/sakila?serverTimezone=EST" ,"actor" ,props );
您可以使用两个选项之一替换此代码(您可以在本章的存储库中的MySQLToDatasetWithLongUrlApp.java中找到完整的代码)。第一个选项是构建一个更长的URL。您的应用程序或平台中可能已经有一个带有JDBC URL生成器的库,因此您可以轻松地重用它。注意,您需要给Spark reader对象一个空的属性列表,由new properties()具体化:
String jdbcUrl = "jdbc:mysql://localhost:3306/sakila"+ "?user=root"+ "&password=Spark<3Java"+ "&useSSL=false"+ "&serverTimezone=EST" ;
Dataset<Row> df = spark .read().jdbc( jdbcUrl , "actor" , new Properties());
第二种选择是只使用选项;如果您从配置文件中读取属性,这可能很有用。下面的代码片段向您展示了如何做到这一点:
Dataset<Row> df = spark .read().option( "url" , "jdbc:mysql://localhost:3306/sakila" ).option( "dbtable" , "actor" ).option( "user" , "root" ).option( "password" , "Spark<3Java" ).option( "useSSL" , "false" ).option( "serverTimezone" , "EST" ).format( "jdbc" ).load();
你可以在本章的知识库MySQLToDatasetWithOptionsApp.java中找到完整的代码。
注意,在这个版本中,您没有使用read()返回的对象的jdbc()方法——read()是DataFrameReader的一个实例。您使用的是format()和load()方法。如您所见,该表只是一个名为dbtable的属性。在使用的语法中没有首选项;但是,您可能会遇到所有这些情况,这可能会让您感到困惑。如果您与一个团队一起从事一个项目,我建议您为该团队设置一个标准,要知道这些参数很可能会从配置文件中读取。
提醒一下,属性不区分大小写。注意,这些值是由驱动程序解释的,它们可能是区分大小写的。
让我们看看Spark如何使用自定义方言处理不受支持的数据库。
发布评论