Custom Catalog Implementation

It’s possible to read an iceberg table either from an hdfs path or from a hive table. It’s also possible to use a custom metastore in place of hive. The steps to do that are as follows.

Custom table operations implementation

Extend BaseMetastoreTableOperations to provide implementation on how to read and write metadata


class CustomTableOperations extends BaseMetastoreTableOperations {
  private String dbName;
  private String tableName;
  private Configuration conf;
  private FileIO fileIO;

  protected CustomTableOperations(Configuration conf, String dbName, String tableName) {
    this.conf = conf;
    this.dbName = dbName;
    this.tableName = tableName;

  // The doRefresh method should provide implementation on how to get the metadata location
  public void doRefresh() {

    // Example custom service which returns the metadata location given a dbName and tableName
    String metadataLocation = CustomService.getMetadataForTable(conf, dbName, tableName);

    // When updating from a metadata file location, call the helper method


  // The doCommit method should provide implementation on how to update with metadata location atomically
  public void doCommit(TableMetadata base, TableMetadata metadata) {
    String oldMetadataLocation = base.location();

    // Write new metadata using helper method
    String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1);

    // Example custom service which updates the metadata location for the given db and table atomically
    CustomService.updateMetadataLocation(dbName, tableName, oldMetadataLocation, newMetadataLocation);


  // The io method provides a FileIO which is used to read and write the table metadata files
  public FileIO io() {
    if (fileIO == null) {
      fileIO = new HadoopFileIO(conf);
    return fileIO;

Custom table implementation

Extend BaseMetastoreCatalog to provide default warehouse locations and instantiate CustomTableOperations


public class CustomCatalog extends BaseMetastoreCatalog {

  private Configuration configuration;

  public CustomCatalog(Configuration configuration) {
    this.configuration = configuration;

  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
    String dbName = tableIdentifier.namespace().level(0);
    String tableName =;
    // instantiate the CustomTableOperations
    return new CustomTableOperations(configuration, dbName, tableName);

  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {

    // Can choose to use any other configuration name
    String tableLocation = configuration.get("custom.iceberg.warehouse.location");

    // Can be an s3 or hdfs path
    if (tableLocation == null) {
      throw new RuntimeException("custom.iceberg.warehouse.location configuration not set!");

    return String.format(
            "%s/%s.db/%s", tableLocation,

  public boolean dropTable(TableIdentifier identifier, boolean purge) {
    // Example service to delete table

  public void renameTable(TableIdentifier from, TableIdentifier to) {
            "Cannot move table between databases");
    // Example service to rename table

Custom IcebergSource

Extend IcebergSource and provide implementation to read from CustomCatalog


public class CustomIcebergSource extends IcebergSource {

  protected Table findTable(DataSourceOptions options, Configuration conf) {
    Optional<String> path = options.get("path");
    Preconditions.checkArgument(path.isPresent(), "Cannot open table: path is not set");

    // Read table from CustomCatalog
    CustomCatalog catalog = new CustomCatalog(conf);
    TableIdentifier tableIdentifier = TableIdentifier.parse(path.get());
    return catalog.loadTable(tableIdentifier);

Register the CustomIcebergSource by updating META-INF/services/org.apache.spark.sql.sources.DataSourceRegister with its fully qualified name