Resilient Distributed Datasets(RDD)
RDD
是 Resilient Distributed Datasets(弹性分布式数据集)的缩写,是 Spark 中一个重要的抽象概念,它表示跨集群节点且被分区的数据集合,可以并行操作。Spark 为 RDD 提供了丰富的操作算子,可以高效处理数据。
创建 RDD
有两种创建 RDD 的方式:并行化驱动程序中的现有集合,或引用外部存储系统中的数据集,例如共享文件系统、HDFS、HBase 或任何提供 Hadoop InputFormat 的数据源。
1 2 3 4 5 6 7 8
| val conf = new SparkConf().setAppName(appName).setMaster(master) val sc = new SparkContext(conf)
val data = Array(1, 2, 3, 4, 5) val distData = sc.parallelize(data)
val distFile = sc.textFile("data.txt")
|
Dataset(DS)
Dataset
是分布式数据集合。Dataset 是 Spark 1.6 中添加的一个新接口,它提供了 RDD 的优势(强类型化、使用强大 lambda 函数的能力)以及 Spark SQL 优化执行引擎的优势。
DataFrame(DF)
DataFrame
其实是 Dataset[Row]
的别名,其中的数据是按照字段组织的,它在概念上等同于关系数据库中的表或 R/Python 中的 data frame
。
应用程序可以使用 SparkSession
从现有的 RDD、Hive 表或 Spark 数据源创建 DataFrame。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| val spark = SparkSession .builder() .appName("app name") .config("spark.sql.warehouse.dir", warehouseLocation) .enableHiveSupport() .getOrCreate()
val text_df = spark.read.text("file.txt")
val json_df = spark.read.json("file.json")
val csv_df = spark.read.csv("file.csv")
val parquet_df = spark.read.csv("file.parquet")
val hive_table_df = spark.sql("select * from database_name.table_name")
|
RDD to DF
通过反射推断创建 DataFrame
1 2 3 4 5
| val rdd = sc.parallelize(Seq(("Tom", 13),("Lily", 25)))
import spark.implicits._
val df = rdd.toDF("name","age")
|
toDF() 方法定义如下:
1
| def toDF(colNames: String*): DataFrame
|
用于将强类型数据集合转换为具有重命名列的通用 DataFrame。在从 RDD 转换为具有有意义名称的 DataFrame 时非常方便。
通过 StructType 创建 DataFrame
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.Row
val rdd = sc.parallelize(Seq(("Tom", "13"),("Lily", "25")))
val schema = StructType( List( StructField("name", StringType, false), StructField("age", IntegerType, false) ) )
val rdd_row = rdd.map(x => Row(x._1,x._2.toInt))
val df = spark.createDataFrame(rdd_row, schema)
|
通过定义样例类创建 DataFrame
1 2 3 4 5 6 7 8 9
| val rdd = sc.parallelize(Seq(("Tom", "13"),("Lily", "25")))
case class User(name: String, age: Int)
val rdd_user = rdd.map(x => User(x._1,x._2.toInt))
val df = spark.createDataFrame(rdd_user)
val df = rdd_user.toDF()
|
RDD to DS
通过定义样例类创建 Dataset
1 2 3 4 5 6 7 8 9
| val rdd = sc.parallelize(Seq(("Tom", "13"),("Lily", "25")))
case class User(name: String, age: Int)
val rdd_user = rdd.map(x => User(x._1,x._2.toInt))
val ds = spark.createDataset(rdd_user)
val ds = rdd_user.toDS()
|
DS/DF to RDD
1 2 3
| val df = spark.read.csv("file.csv")
val rdd = df.rdd
|
DF to DS
1 2 3
| case class User(name: String, age: Int) val ds = DataFrame.map(x=> User(x.getAs(0), x.getAs(1)))
|
DS to DF
1
| val df = DataSet[DataTypeClass].toDF()
|