Spark Configuration🔗
Catalogs🔗
Spark adds an API to plug in table catalogs that are used to load, create, and manage Iceberg tables. Spark catalogs are configured by setting Spark properties under spark.sql.catalog
.
This creates an Iceberg catalog named hive_prod
that loads tables from a Hive metastore:
spark.sql.catalog.hive_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hive_prod.type = hive
spark.sql.catalog.hive_prod.uri = thrift://metastore-host:port
# omit uri to use the same URI as Spark: hive.metastore.uris in hive-site.xml
Below is an example for a REST catalog named rest_prod
that loads tables from REST URL http://localhost:8080
:
spark.sql.catalog.rest_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.rest_prod.type = rest
spark.sql.catalog.rest_prod.uri = http://localhost:8080
Iceberg also supports a directory-based catalog in HDFS that can be configured using type=hadoop
:
spark.sql.catalog.hadoop_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hadoop_prod.type = hadoop
spark.sql.catalog.hadoop_prod.warehouse = hdfs://nn:8020/warehouse/path
Info
The Hive-based catalog only loads Iceberg tables. To load non-Iceberg tables in the same Hive metastore, use a session catalog.
Catalog configuration🔗
A catalog is created and named by adding a property spark.sql.catalog.(catalog-name)
with an implementation class for its value.
Iceberg supplies two implementations:
org.apache.iceberg.spark.SparkCatalog
supports a Hive Metastore or a Hadoop warehouse as a catalogorg.apache.iceberg.spark.SparkSessionCatalog
adds support for Iceberg tables to Spark's built-in catalog, and delegates to the built-in catalog for non-Iceberg tables
Both catalogs are configured using properties nested under the catalog name. Common configuration properties for Hive and Hadoop are:
Property | Values | Description |
---|---|---|
spark.sql.catalog.catalog-name.type | hive , hadoop , rest , glue , jdbc or nessie |
The underlying Iceberg catalog implementation, HiveCatalog , HadoopCatalog , RESTCatalog , GlueCatalog , JdbcCatalog , NessieCatalog or left unset if using a custom catalog |
spark.sql.catalog.catalog-name.catalog-impl | The custom Iceberg catalog implementation. If type is null, catalog-impl must not be null. |
|
spark.sql.catalog.catalog-name.io-impl | The custom FileIO implementation. | |
spark.sql.catalog.catalog-name.metrics-reporter-impl | The custom MetricsReporter implementation. | |
spark.sql.catalog.catalog-name.default-namespace | default | The default current namespace for the catalog |
spark.sql.catalog.catalog-name.uri | thrift://host:port | Hive metastore URL for hive typed catalog, REST URL for REST typed catalog |
spark.sql.catalog.catalog-name.warehouse | hdfs://nn:8020/warehouse/path | Base path for the warehouse directory |
spark.sql.catalog.catalog-name.cache-enabled | true or false |
Whether to enable catalog cache, default value is true |
spark.sql.catalog.catalog-name.cache.expiration-interval-ms | 30000 (30 seconds) |
Duration after which cached catalog entries are expired; Only effective if cache-enabled is true . -1 disables cache expiration and 0 disables caching entirely, irrespective of cache-enabled . Default is 30000 (30 seconds) |
spark.sql.catalog.catalog-name.table-default.propertyKey | Default Iceberg table property value for property key propertyKey, which will be set on tables created by this catalog if not overridden | |
spark.sql.catalog.catalog-name.table-override.propertyKey | Enforced Iceberg table property value for property key propertyKey, which cannot be overridden by user |
Additional properties can be found in common catalog configuration.
Using catalogs🔗
Catalog names are used in SQL queries to identify a table. In the examples above, hive_prod
and hadoop_prod
can be used to prefix database and table names that will be loaded from those catalogs.
Spark 3 keeps track of the current catalog and namespace, which can be omitted from table names.
To see the current catalog and namespace, run SHOW CURRENT NAMESPACE
.
Replacing the session catalog🔗
To add Iceberg table support to Spark's built-in catalog, configure spark_catalog
to use Iceberg's SparkSessionCatalog
.
spark.sql.catalog.spark_catalog = org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.spark_catalog.type = hive
Spark's built-in catalog supports existing v1 and v2 tables tracked in a Hive Metastore. This configures Spark to use Iceberg's SparkSessionCatalog
as a wrapper around that session catalog. When a table is not an Iceberg table, the built-in catalog will be used to load it instead.
This configuration can use same Hive Metastore for both Iceberg and non-Iceberg tables.
Using catalog specific Hadoop configuration values🔗
Similar to configuring Hadoop properties by using spark.hadoop.*
, it's possible to set per-catalog Hadoop configuration values when using Spark by adding the property for the catalog with the prefix spark.sql.catalog.(catalog-name).hadoop.*
. These properties will take precedence over values configured globally using spark.hadoop.*
and will only affect Iceberg tables.
Loading a custom catalog🔗
Spark supports loading a custom Iceberg Catalog
implementation by specifying the catalog-impl
property. Here is an example:
spark.sql.catalog.custom_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.custom_prod.catalog-impl = com.my.custom.CatalogImpl
spark.sql.catalog.custom_prod.my-additional-catalog-config = my-value
SQL Extensions🔗
Iceberg 0.11.0 and later add an extension module to Spark to add new SQL commands, like CALL
for stored procedures or ALTER TABLE ... WRITE ORDERED BY
.
Using those SQL commands requires adding Iceberg extensions to your Spark environment using the following Spark property:
Spark extensions property | Iceberg extensions implementation |
---|---|
spark.sql.extensions |
org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions |
Runtime configuration🔗
Read options🔗
Spark read options are passed when configuring the DataFrameReader, like this:
Spark option | Default | Description |
---|---|---|
snapshot-id | (latest) | Snapshot ID of the table snapshot to read |
as-of-timestamp | (latest) | A timestamp in milliseconds; the snapshot used will be the snapshot current at this time. |
split-size | As per table property | Overrides this table's read.split.target-size and read.split.metadata-target-size |
lookback | As per table property | Overrides this table's read.split.planning-lookback |
file-open-cost | As per table property | Overrides this table's read.split.open-file-cost |
vectorization-enabled | As per table property | Overrides this table's read.parquet.vectorization.enabled |
batch-size | As per table property | Overrides this table's read.parquet.vectorization.batch-size |
stream-from-timestamp | (none) | A timestamp in milliseconds to stream from; if before the oldest known ancestor snapshot, the oldest will be used |
Write options🔗
Spark write options are passed when configuring the DataFrameWriter, like this:
// write with Avro instead of Parquet
df.write
.option("write-format", "avro")
.option("snapshot-property.key", "value")
.insertInto("catalog.db.table")
Spark option | Default | Description |
---|---|---|
write-format | Table write.format.default | File format to use for this write operation; parquet, avro, or orc |
target-file-size-bytes | As per table property | Overrides this table's write.target-file-size-bytes |
check-nullability | true | Sets the nullable check on fields |
snapshot-property.custom-key | null | Adds an entry with custom-key and corresponding value in the snapshot summary (the snapshot-property. prefix is only required for DSv2) |
fanout-enabled | false | Overrides this table's write.spark.fanout.enabled |
check-ordering | true | Checks if input schema and table schema are same |
isolation-level | null | Desired isolation level for Dataframe overwrite operations. null => no checks (for idempotent writes), serializable => check for concurrent inserts or deletes in destination partitions, snapshot => checks for concurrent deletes in destination partitions. |
validate-from-snapshot-id | null | If isolation level is set, id of base snapshot from which to check concurrent write conflicts into a table. Should be the snapshot before any reads from the table. Can be obtained via Table API or Snapshots table. If null, the table's oldest known snapshot is used. |
compression-codec | Table write.(fileformat).compression-codec | Overrides this table's compression codec for this write |
compression-level | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write |
compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write |
CommitMetadata provides an interface to add custom metadata to a snapshot summary during a SQL execution, which can be beneficial for purposes such as auditing or change tracking. If properties start with snapshot-property.
, then that prefix will be removed from each property. Here is an example:
import org.apache.iceberg.spark.CommitMetadata;
Map<String, String> properties = Maps.newHashMap();
properties.put("property_key", "property_value");
CommitMetadata.withCommitProperties(properties,
() -> {
spark.sql("DELETE FROM " + tableName + " where id = 1");
return 0;
},
RuntimeException.class);