SparkCoreRDD数据源

文章目录

    • 普通文本文件
    • JDBC[掌握]
    • HadoopAPI[了解]
    • SequenceFile文件[了解]
    • 对象文件[了解]
    • HBase[了解]
    • 扩展阅读

普通文本文件

sc.textFile("./dir/*.txt")
如果传递目录,则将目录下的所有文件读取作为RDD。文件路径支持通配符。
但是这样对于大量的小文件读取效率并不高,应该使用wholeTextFiles

def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)])

返回值RDD[(String, String)],其中Key是文件的名称,Value是文件的内容。

JDBC[掌握]

Spark支持通过Java JDBC访问关系型数据库。需要使用JdbcRDD

  • 代码演示
package cn.itcast.coreimport java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}/*** Desc 演示使用Spark操作JDBC-API实现将数据存入到MySQL并读取出来*/
object JDBCDataSourceTest {def main(args: Array[String]): Unit = {//1.创建SparkContextval config = new SparkConf().setAppName("JDBCDataSourceTest").setMaster("local[*]")val sc = new SparkContext(config)sc.setLogLevel("WARN")//2.插入数据val data: RDD[(String, Int)] = sc.parallelize(List(("jack", 18), ("tom", 19), ("rose", 20)))//调用foreachPartition针对每一个分区进行操作//data.foreachPartition(saveToMySQL)//3.读取数据def getConn():Connection={
DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "root")}val studentRDD: JdbcRDD[(Int, String, Int)] = new JdbcRDD(sc,getConn,"select * from t_student where id >= ? and id <= ? ",4,6,2,rs => {val id: Int = rs.getInt("id")val name: String = rs.getString("name")val age: Int = rs.getInt("age")(id, name, age)})println(studentRDD.collect().toBuffer)}/*CREATE TABLE `t_student` (`id` int(11) NOT NULL AUTO_INCREMENT,`name` varchar(255) DEFAULT NULL,`age` int(11) DEFAULT NULL,PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;*/def saveToMySQL(partitionData:Iterator[(String, Int)] ):Unit = {//将数据存入到MySQL//获取连接val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "root")partitionData.foreach(data=>{//将每一条数据存入到MySQLval sql = "INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (NULL, ?, ?);"val ps: PreparedStatement = conn.prepareStatement(sql)ps.setString(1,data._1)ps.setInt(2,data._2)ps.execute()//preparedStatement.addBatch()})
//ps.executeBatch()conn.close()}
}

HadoopAPI[了解]


Spark的整个生态系统与Hadoop是完全兼容的,所以对于Hadoop所支持的文件类型或者数据库类型,Spark也同样支持。
HadoopRDD、newAPIHadoopRDD、saveAsHadoopFile、saveAsNewAPIHadoopFile 是底层API
其他的API接口都是为了方便最终的Spark程序开发者而设置的,是这两个接口的高效实现版本.

SequenceFile文件[了解]

SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。


读sc.sequenceFile keyClass, valueClass
写RDD.saveAsSequenceFile(path)
要求键和值能够自动转为Writable类型。

对象文件[了解]

对象文件是将对象序列化后保存的文件
读sc.objectFilek,v //因为是序列化所以要指定类型
写RDD.saveAsObjectFile()

HBase[了解]

由于 org.apache.hadoop.hbase.mapreduce.TableInputFormat 类的实现,Spark 可以通过Hadoop输入格式访问HBase。
这个输入格式会返回键值对数据,
其中键的类型为org. apache.hadoop.hbase.io.ImmutableBytesWritable,
而值的类型为org.apache.hadoop.hbase.client.Result。

扩展阅读

package cn.itcast.coreimport org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object DataSourceTest {def main(args: Array[String]): Unit = {val config = new SparkConf().setAppName("DataSourceTest").setMaster("local[*]")val sc = new SparkContext(config)sc.setLogLevel("WARN")System.setProperty("HADOOP_USER_NAME", "root")//1.HadoopAPIprintln("HadoopAPI")val dataRDD = sc.parallelize(Array((1,"hadoop"), (2,"hive"), (3,"spark")))dataRDD.saveAsNewAPIHadoopFile("hdfs://node01:8020/spark_hadoop/",classOf[LongWritable],classOf[Text],classOf[TextOutputFormat[LongWritable, Text]])val inputRDD: RDD[(LongWritable, Text)] = sc.newAPIHadoopFile("hdfs://node01:8020/spark_hadoop/*", classOf[TextInputFormat],classOf[LongWritable],classOf[Text],conf = sc.hadoopConfiguration)inputRDD.map(_._2.toString).foreach(println)//2.读取小文件println("读取小文件")val filesRDD: RDD[(String, String)] = sc.wholeTextFiles("D:\\data\\spark\\files", minPartitions = 3)val linesRDD: RDD[String] = filesRDD.flatMap(_._2.split("\\r\\n"))val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(" "))wordsRDD.map((_, 1)).reduceByKey(_ + _).collect().foreach(println)//3.操作SequenceFileprintln("SequenceFile")val dataRDD2: RDD[(Int, String)] = sc.parallelize(List((2, "aa"), (3, "bb"), (4, "cc"), (5, "dd"), (6, "ee")))dataRDD2.saveAsSequenceFile("D:\\data\\spark\\SequenceFile")val sdata: RDD[(Int, String)] = sc.sequenceFile[Int, String]("D:\\data\\spark\\SequenceFile\\*")sdata.collect().foreach(println)//4.操作ObjectFileprintln("ObjectFile")val dataRDD3 = sc.parallelize(List((2, "aa"), (3, "bb"), (4, "cc"), (5, "dd"), (6, "ee")))dataRDD3.saveAsObjectFile("D:\\data\\spark\\ObjectFile")val objRDD = sc.objectFile[(Int, String)]("D:\\data\\spark\\ObjectFile\\*")objRDD.collect().foreach(println)sc.stop()}
}
package cn.itcast.coreimport org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object DataSourceTest2 {def main(args: Array[String]): Unit = {val config = new SparkConf().setAppName("DataSourceTest").setMaster("local[*]")val sc = new SparkContext(config)sc.setLogLevel("WARN")val conf = HBaseConfiguration.create()conf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181")val fruitTable = TableName.valueOf("fruit")val tableDescr = new HTableDescriptor(fruitTable)tableDescr.addFamily(new HColumnDescriptor("info".getBytes))val admin = new HBaseAdmin(conf)if (admin.tableExists(fruitTable)) {admin.disableTable(fruitTable)admin.deleteTable(fruitTable)}admin.createTable(tableDescr)def convert(triple: (String, String, String)) = {val put = new Put(Bytes.toBytes(triple._1))put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(triple._2))put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("price"), Bytes.toBytes(triple._3))(new ImmutableBytesWritable, put)}val dataRDD: RDD[(String, String, String)] = sc.parallelize(List(("1","apple","11"), ("2","banana","12"), ("3","pear","13")))val targetRDD: RDD[(ImmutableBytesWritable, Put)] = dataRDD.map(convert)val jobConf = new JobConf(conf)jobConf.setOutputFormat(classOf[TableOutputFormat])jobConf.set(TableOutputFormat.OUTPUT_TABLE, "fruit")//写入数据targetRDD.saveAsHadoopDataset(jobConf)println("写入数据成功")//读取数据conf.set(TableInputFormat.INPUT_TABLE, "fruit")val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])val count: Long = hbaseRDD.count()println("hBaseRDD RDD Count:"+ count)hbaseRDD.foreach {case (_, result) =>val key = Bytes.toString(result.getRow)val name = Bytes.toString(result.getValue("info".getBytes, "name".getBytes))val color = Bytes.toString(result.getValue("info".getBytes, "price".getBytes))println("Row key:" + key + " Name:" + name + " Color:" + color)}sc.stop()}
}