Spark 常用的保存文件方式
- RDD 保存至文本文件
1
| rdd.saveAsTextFile("path/result")
|
- RDD 以指定 Hadoop 输出格式保持至文件,仅支持 (key,value) 格式的 RDD
1
| rdd.saveHadoopFile("path/result",classOf[T],classOf[T],classOf[outputFormat])
|
- DataFrame 以指定格式保持至文件
1
| df.write.mode("overwrite").option("header","true").format("csv").save("path/result")
|
以上都简单的,最普遍的保存文件的方式,但有时候是不能够满足我们的需求,使用上述的文件保存方式保存之后,文件名通常是 part-00000
的方式保存在输出文件夹中,并且还包含数据校验和文件 part-00000.crc
和 .SUCCESS
文件,其中 part-00000.crc
用来校验数据的完整性,.SUCCESS
文件用来表示本次输出任务成功完成。
自定义保存文件
继承 MultipleTextOutputFormat
类并复写以下方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapred.{FileOutputFormat, JobConf} import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat import org.apache.hadoop.io.NullWritable
class CustomOutputFormat() extends MultipleTextOutputFormat[Any, Any] {
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = { key.asInstanceOf[String] + ".csv" }
override def generateActualKey(key: Any, value: Any): String = { NullWritable.get() }
override def checkOutputSpecs(ignored: FileSystem, job: JobConf): Unit = { val outDir: Path = FileOutputFormat.getOutputPath(job) if (outDir != null) { try { ignored.delete(outDir, true) } catch { case _: Throwable => {} } FileOutputFormat.setOutputPath(job, outDir) } } }
|
将 RDD 映射为 PairRDD
1
| val pair_rdd = rdd.map(x=>(x.split(",")(0),x)).partitionBy(new HashPartitioner(50))
|
调用 saveAsHadoopFile 输出
1
| pair_rdd.saveAsHadoopFile(output, classOf[String], classOf[String], classOf[CustomOutputFormat])
|