加入收藏 | 设为首页 | 会员中心 | 我要投稿 衡阳站长网 (https://www.0734zz.cn/)- 数据集成、设备管理、备份、数据加密、智能搜索!
当前位置: 首页 > 运营中心 > 网站设计 > 教程 > 正文

Apache Spark Delta Lake写数据使用及实现原理代码解析

发布时间:2019-10-04 14:14:10 所属栏目:教程 来源:明惠
导读:Delta Lake 写数据是其最基本的功能,而且其使用和现有的 Spark 写 Parquet 文件基本一致,在介绍 Delta Lake 实现原理之前先来看看如何使用它,具体使用如下: df.write.format(delta).save(/data/yangping.wyp/delta/test/) //数据按照dt分区 df.write.f

Delta Lake 所有的更新操作都是在事务中进行的,deltaLog.withNewTransaction 就是一个事务,withNewTransaction 的实现如下:

  1. def withNewTransaction[T](thunk: OptimisticTransaction => T): T = { 
  2.   try { 
  3.     // 更新当前表事务日志的快照 
  4.     update() 
  5.     // 初始化乐观事务锁对象 
  6.     val txn = new OptimisticTransaction(this) 
  7.     // 开启事务 
  8.     OptimisticTransaction.setActive(txn) 
  9.     // 执行写数据操作 
  10.     thunk(txn) 
  11.   } finally { 
  12.     // 关闭事务 
  13.     OptimisticTransaction.clearActive() 
  14.   } 

在开启事务之前,需要更新当前表事务的快照,因为在执行写数据之前,这张表可能已经被修改了,执行 update 操作之后,就可以拿到当前表的最新版本,紧接着开启乐观事务锁。thunk(txn) 就是需要执行的事务操作,对应 deltaLog.withNewTransaction 里面的所有代码。

我们回到上面的 run 方法。val actions = write(txn, sparkSession) 就是执行写数据的操作,它的实现如下:

  1.   def write(txn: OptimisticTransaction, sparkSession: SparkSession): Seq[Action] = { 
  2.     import sparkSession.implicits._ 
  3.     // 如果不是第一次往表里面写数据,需要判断写数据的模式是否符合条件 
  4.     if (txn.readVersion > -1) { 
  5.       // This table already exists, check if the insert is valid. 
  6.       if (mode == SaveMode.ErrorIfExists) { 
  7.         throw DeltaErrors.pathAlreadyExistsException(deltaLog.dataPath) 
  8.       } else if (mode == SaveMode.Ignore) { 
  9.         return Nil 
  10.       } else if (mode == SaveMode.Overwrite) { 
  11.         deltaLog.assertRemovable() 
  12.       } 
  13.     } 
  14.   
  15.     // 更新表的模式,比如是否覆盖现有的模式,是否和现有的模式进行 merge 
  16.     updateMetadata(txn, data, partitionColumns, configuration, isOverwriteOperation) 
  17.   
  18.     // 是否定义分区过滤条件 
  19.     val replaceWhere = options.replaceWhere 
  20.     val partitionFilters = if (replaceWhere.isDefined) { 
  21.       val predicates = parsePartitionPredicates(sparkSession, replaceWhere.get) 
  22.       if (mode == SaveMode.Overwrite) { 
  23.         verifyPartitionPredicates( 
  24.           sparkSession, txn.metadata.partitionColumns, predicates) 
  25.       } 
  26.       Some(predicates) 
  27.     } else { 
  28.       None 
  29.     } 
  30.   
  31.     // 第一次写数据初始化事务日志的目录 
  32.     if (txn.readVersion < 0) { 
  33.       // Initialize the log path 
  34.       deltaLog.fs.mkdirs(deltaLog.logPath) 
  35.     } 
  36.   
  37.     // 写数据到文件系统中 
  38.     val newFiles = txn.writeFiles(data, Some(options)) 
  39.       
  40.     val deletedFiles = (mode, partitionFilters) match { 
  41.        // 全量覆盖,直接拿出缓存在内存中最新事务日志快照里面的所有 AddFile 文件 
  42.       case (SaveMode.Overwrite, None) => 
  43.         txn.filterFiles().map(_.remove) 
  44.       // 从事务日志快照中获取对应分区里面的所有 AddFile 文件 
  45.       case (SaveMode.Overwrite, Some(predicates)) => 
  46.         // Check to make sure the files we wrote out were actually valid. 
  47.         val matchingFiles = DeltaLog.filterFileList( 
  48.           txn.metadata.partitionColumns, newFiles.toDF(), predicates).as[AddFile].collect() 
  49.         val invalidFiles = newFiles.toSet -- matchingFiles 
  50.         if (invalidFiles.nonEmpty) { 
  51.           val badPartitions = invalidFiles 
  52.             .map(_.partitionValues) 
  53.             .map { _.map { case (k, v) => s"$k=$v" }.mkString("/") } 
  54.             .mkString(", ") 
  55.           throw DeltaErrors.replaceWhereMismatchException(replaceWhere.get, badPartitions) 
  56.         } 
  57.   
  58.         txn.filterFiles(predicates).map(_.remove) 
  59.       case _ => Nil 
  60.     } 
  61.   
  62.     newFiles ++ deletedFiles 
  63.   } 

如果 txn.readVersion == -1,说明是第一次写数据到 Delta Lake 表,所以当这个值大于 -1 的时候,需要判断一下写数据的操作是否合法。

由于 Delta Lake 底层使用的是 Parquet 格式,所以 Delta Lake 表也支持模式的增加合并等,这就是 updateMetadata 函数对应的操作。

因为 Delta Lake 表支持分区,所以我们可能在写数据的时候指定某个分区进行覆盖。

(编辑:衡阳站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读