Package org.apache.iceberg.flink.sink
Class IcebergSink
java.lang.Object
org.apache.iceberg.flink.sink.IcebergSink
- All Implemented Interfaces:
Serializable,org.apache.flink.api.common.SupportsConcurrentExecutionAttempts,org.apache.flink.api.connector.sink2.Sink<org.apache.flink.table.data.RowData>,org.apache.flink.api.connector.sink2.SupportsCommitter<org.apache.iceberg.flink.sink.IcebergCommittable>,org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology<org.apache.iceberg.flink.sink.IcebergCommittable>,org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology<WriteResult,,org.apache.iceberg.flink.sink.IcebergCommittable> org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology<org.apache.flink.table.data.RowData>
@Experimental
public class IcebergSink
extends Object
implements org.apache.flink.api.connector.sink2.Sink<org.apache.flink.table.data.RowData>, org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology<org.apache.flink.table.data.RowData>, org.apache.flink.api.connector.sink2.SupportsCommitter<org.apache.iceberg.flink.sink.IcebergCommittable>, org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology<WriteResult,org.apache.iceberg.flink.sink.IcebergCommittable>, org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology<org.apache.iceberg.flink.sink.IcebergCommittable>, org.apache.flink.api.common.SupportsConcurrentExecutionAttempts
Flink v2 sink offer different hooks to insert custom topologies into the sink. We will use the
following:
SupportsPreWriteTopologywhich redistributes the data to the writers based on theDistributionModeSinkWriterwhich writes data/delete files, and generates theWriteResultobjects for the filesSupportsPreCommitTopologywhich we use to place theIcebergWriteAggregatorwhich merges the individualSinkWriter'sWriteResults to a singleIcebergCommittableIcebergCommitterwhich commits the incomingIcebergCommittables to the Iceberg tableSupportsPostCommitTopologywe could use for incremental compaction later. This is not implemented yet.
Flink sink
+-----------------------------------------------------------------------------------+
| |
+-------+ | +----------+ +-------------+ +---------------+ |
| Map 1 | ==> | | writer 1 | | committer 1 | ---> | post commit 1 | |
+-------+ | +----------+ +-------------+ +---------------+ |
| \ / \ |
| \ / \ |
| \ / \ |
+-------+ | +----------+ \ +-------------------+ / +-------------+ \ +---------------+ |
| Map 2 | ==> | | writer 2 | --->| commit aggregator | | committer 2 | | post commit 2 | |
+-------+ | +----------+ +-------------------+ +-------------+ +---------------+ |
| Commit only on |
| committer 1 |
+-----------------------------------------------------------------------------------+
- See Also:
-
Nested Class Summary
Nested Classes -
Method Summary
Modifier and TypeMethodDescriptionvoidaddPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.iceberg.flink.sink.IcebergCommittable>> committables) org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.iceberg.flink.sink.IcebergCommittable>> addPreCommitTopology(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<WriteResult>> writeResults) org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.table.data.RowData> addPreWriteTopology(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.table.data.RowData> inputDataStream) static <T> IcebergSink.BuilderbuilderFor(org.apache.flink.streaming.api.datastream.DataStream<T> input, org.apache.flink.api.common.functions.MapFunction<T, org.apache.flink.table.data.RowData> mapper, org.apache.flink.api.common.typeinfo.TypeInformation<org.apache.flink.table.data.RowData> outputType) Initialize aIcebergSink.Builderto export the data from generic input data stream into iceberg table.org.apache.flink.api.connector.sink2.Committer<org.apache.iceberg.flink.sink.IcebergCommittable> createCommitter(org.apache.flink.api.connector.sink2.CommitterInitContext context) org.apache.flink.api.connector.sink2.SinkWriter<org.apache.flink.table.data.RowData> createWriter(org.apache.flink.api.connector.sink2.WriterInitContext context) static IcebergSink.BuilderforRow(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.types.Row> input, org.apache.flink.table.catalog.ResolvedSchema resolvedSchema) Initialize aIcebergSink.Builderto export the data from input data stream withRows into iceberg table.static IcebergSink.BuilderforRow(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.types.Row> input, org.apache.flink.table.legacy.api.TableSchema tableSchema) Deprecated.static IcebergSink.BuilderforRowData(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.table.data.RowData> input) Initialize aIcebergSink.Builderto export the data from input data stream withRowDatas into iceberg table.org.apache.flink.core.io.SimpleVersionedSerializer<org.apache.iceberg.flink.sink.IcebergCommittable> org.apache.flink.core.io.SimpleVersionedSerializer<WriteResult>
-
Method Details
-
createWriter
public org.apache.flink.api.connector.sink2.SinkWriter<org.apache.flink.table.data.RowData> createWriter(org.apache.flink.api.connector.sink2.WriterInitContext context) - Specified by:
createWriterin interfaceorg.apache.flink.api.connector.sink2.Sink<org.apache.flink.table.data.RowData>
-
createCommitter
public org.apache.flink.api.connector.sink2.Committer<org.apache.iceberg.flink.sink.IcebergCommittable> createCommitter(org.apache.flink.api.connector.sink2.CommitterInitContext context) - Specified by:
createCommitterin interfaceorg.apache.flink.api.connector.sink2.SupportsCommitter<org.apache.iceberg.flink.sink.IcebergCommittable>
-
getCommittableSerializer
public org.apache.flink.core.io.SimpleVersionedSerializer<org.apache.iceberg.flink.sink.IcebergCommittable> getCommittableSerializer()- Specified by:
getCommittableSerializerin interfaceorg.apache.flink.api.connector.sink2.SupportsCommitter<org.apache.iceberg.flink.sink.IcebergCommittable>
-
addPostCommitTopology
public void addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.iceberg.flink.sink.IcebergCommittable>> committables) - Specified by:
addPostCommitTopologyin interfaceorg.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology<org.apache.iceberg.flink.sink.IcebergCommittable>
-
addPreWriteTopology
public org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.table.data.RowData> addPreWriteTopology(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.table.data.RowData> inputDataStream) - Specified by:
addPreWriteTopologyin interfaceorg.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology<org.apache.flink.table.data.RowData>
-
addPreCommitTopology
public org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.iceberg.flink.sink.IcebergCommittable>> addPreCommitTopology(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<WriteResult>> writeResults) - Specified by:
addPreCommitTopologyin interfaceorg.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology<WriteResult,org.apache.iceberg.flink.sink.IcebergCommittable>
-
getWriteResultSerializer
- Specified by:
getWriteResultSerializerin interfaceorg.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology<WriteResult,org.apache.iceberg.flink.sink.IcebergCommittable>
-
builderFor
public static <T> IcebergSink.Builder builderFor(org.apache.flink.streaming.api.datastream.DataStream<T> input, org.apache.flink.api.common.functions.MapFunction<T, org.apache.flink.table.data.RowData> mapper, org.apache.flink.api.common.typeinfo.TypeInformation<org.apache.flink.table.data.RowData> outputType) Initialize aIcebergSink.Builderto export the data from generic input data stream into iceberg table. We useRowDatainside the sink connector, so users need to provide a mapper function and aTypeInformationto convert those generic records to a RowData DataStream.- Type Parameters:
T- the data type of records.- Parameters:
input- the generic source input data stream.mapper- function to convert the generic data toRowDataoutputType- to define theTypeInformationfor the input data.- Returns:
IcebergSink.Builderto connect the iceberg table.
-
forRow
@Deprecated public static IcebergSink.Builder forRow(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.types.Row> input, org.apache.flink.table.legacy.api.TableSchema tableSchema) Deprecated.UseforRow(DataStream, ResolvedSchema)instead.Initialize aIcebergSink.Builderto export the data from input data stream withRows into iceberg table. We useRowDatainside the sink connector, so users need to provide aTableSchemafor builder to convert thoseRows to aRowDataDataStream.- Parameters:
input- the source input data stream withRows.tableSchema- defines theTypeInformationfor input data.- Returns:
IcebergSink.Builderto connect the iceberg table.
-
forRow
public static IcebergSink.Builder forRow(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.types.Row> input, org.apache.flink.table.catalog.ResolvedSchema resolvedSchema) Initialize aIcebergSink.Builderto export the data from input data stream withRows into iceberg table. We useRowDatainside the sink connector, so users need to provide aResolvedSchemafor builder to convert thoseRows to aRowDataDataStream.- Parameters:
input- the source input data stream withRows.resolvedSchema- defines theTypeInformationfor input data.- Returns:
IcebergSink.Builderto connect the iceberg table.
-
forRowData
public static IcebergSink.Builder forRowData(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.table.data.RowData> input) Initialize aIcebergSink.Builderto export the data from input data stream withRowDatas into iceberg table.- Parameters:
input- the source input data stream withRowDatas.- Returns:
IcebergSink.Builderto connect the iceberg table.
-
forRow(DataStream, ResolvedSchema)instead.