UML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| data.write.format("iceberg").mode("append").insertInto(s"local.gaia.${table}_iceberg")
def insertInto(tableName: String): Unit = {
import df.sparkSession.sessionState.analyzer.{AsTableIdentifier, NonSessionCatalogAndIdentifier, SessionCatalogAndIdentifier}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
assertNotBucketed("insertInto")
if (partitioningColumns.isDefined) {
throw QueryCompilationErrors.partitionByDoesNotAllowedWhenUsingInsertIntoError()
}
val session = df.sparkSession
val canUseV2 = lookupV2Provider().isDefined
session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
case NonSessionCatalogAndIdentifier(catalog, ident) =>
insertInto(catalog, ident)
case SessionCatalogAndIdentifier(catalog, ident)
if canUseV2 && ident.namespace().length <= 1 =>
insertInto(catalog, ident)
case AsTableIdentifier(tableIdentifier) =>
insertInto(tableIdentifier)
case other =>
throw QueryCompilationErrors.cannotFindCatalogToHandleIdentifierError(other.quoted)
}
}
|
重写接口:spark.sql.connector.write/read.Write->DataFrameWriter->insertInto
iceberg-SparkWriter(继承Writer)/FlinkSink()
hudi->