Spark 常用的保存文件方式
- RDD 保存至文本文件
1
   | rdd.saveAsTextFile("path/result")
 
  | 
 
- RDD 以指定 Hadoop 输出格式保持至文件,仅支持 (key,value) 格式的 RDD
1
   | rdd.saveHadoopFile("path/result",classOf[T],classOf[T],classOf[outputFormat])
 
  | 
 
- DataFrame 以指定格式保持至文件
1
   | df.write.mode("overwrite").option("header","true").format("csv").save("path/result")
 
  | 
 
以上都简单的,最普遍的保存文件的方式,但有时候是不能够满足我们的需求,使用上述的文件保存方式保存之后,文件名通常是 part-00000 的方式保存在输出文件夹中,并且还包含数据校验和文件 part-00000.crc 和 .SUCCESS 文件,其中 part-00000.crc 用来校验数据的完整性,.SUCCESS 文件用来表示本次输出任务成功完成。 
自定义保存文件
继承 MultipleTextOutputFormat 类并复写以下方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
   | import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapred.{FileOutputFormat, JobConf} import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat import org.apache.hadoop.io.NullWritable
  class CustomOutputFormat() extends MultipleTextOutputFormat[Any, Any] {
    override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = {          key.asInstanceOf[String] + ".csv"   }
    override def generateActualKey(key: Any, value: Any): String = {          NullWritable.get()   }
    override def checkOutputSpecs(ignored: FileSystem, job: JobConf): Unit = {     val outDir: Path = FileOutputFormat.getOutputPath(job)     if (outDir != null) {                     try {         ignored.delete(outDir, true)       } catch {         case _: Throwable => {}       }       FileOutputFormat.setOutputPath(job, outDir)     }   } }
 
  | 
 
将 RDD 映射为 PairRDD
1
   | val pair_rdd = rdd.map(x=>(x.split(",")(0),x)).partitionBy(new HashPartitioner(50))
 
  | 
 
调用 saveAsHadoopFile 输出
1
   | pair_rdd.saveAsHadoopFile(output, classOf[String], classOf[String], classOf[CustomOutputFormat])
 
  |