package org.apache.flink.table.store.connector.sink;

import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.UUID;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.connector.sink.StoreSinkWrite;
import org.apache.flink.table.store.connector.utils.StreamExecutionEnvironmentUtils;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SerializableFunction;

/* loaded from: input_file:org/apache/flink/table/store/connector/sink/FlinkSink.class */
public abstract class FlinkSink implements Serializable {
    private static final long serialVersionUID = 1;
    private static final String WRITER_NAME = "Writer";
    private static final String GLOBAL_COMMITTER_NAME = "Global Committer";
    protected final FileStoreTable table;
    private final boolean isOverwrite;

    public FlinkSink(FileStoreTable fileStoreTable, boolean z) {
        this.table = fileStoreTable;
        this.isOverwrite = z;
    }

    protected StoreSinkWrite.Provider createWriteProvider(String str) {
        if (this.table.options().changelogProducer() != CoreOptions.ChangelogProducer.FULL_COMPACTION || this.table.options().writeOnly()) {
            return (fileStoreTable, stateInitializationContext, iOManager) -> {
                return new StoreSinkWriteImpl(fileStoreTable, stateInitializationContext, str, iOManager, this.isOverwrite);
            };
        }
        long millis = this.table.options().changelogProducerFullCompactionTriggerInterval().toMillis();
        return (fileStoreTable2, stateInitializationContext2, iOManager2) -> {
            return new FullChangelogStoreSinkWrite(fileStoreTable2, stateInitializationContext2, str, iOManager2, this.isOverwrite, millis);
        };
    }

    public DataStreamSink<?> sinkFrom(DataStream<RowData> dataStream) {
        String uuid = UUID.randomUUID().toString();
        StreamExecutionEnvironment executionEnvironment = dataStream.getExecutionEnvironment();
        ReadableConfig configuration = StreamExecutionEnvironmentUtils.getConfiguration(executionEnvironment);
        CheckpointConfig checkpointConfig = executionEnvironment.getCheckpointConfig();
        boolean z = configuration.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
        boolean z2 = z && checkpointConfig.isCheckpointingEnabled();
        if (z2) {
            assertCheckpointConfiguration(executionEnvironment);
        }
        CommittableTypeInfo committableTypeInfo = new CommittableTypeInfo();
        return dataStream.transform(WRITER_NAME, committableTypeInfo, createWriteOperator(createWriteProvider(uuid), z)).setParallelism(dataStream.getParallelism()).transform(GLOBAL_COMMITTER_NAME, committableTypeInfo, new CommitterOperator(z2, uuid, createCommitterFactory(z2), createCommittableStateManager())).setParallelism(1).setMaxParallelism(1).addSink(new DiscardingSink()).name("end").setParallelism(1);
    }

    private void assertCheckpointConfiguration(StreamExecutionEnvironment streamExecutionEnvironment) {
        Preconditions.checkArgument(!streamExecutionEnvironment.getCheckpointConfig().isUnalignedCheckpointsEnabled(), "Table Store sink currently does not support unaligned checkpoints. Please set " + ExecutionCheckpointingOptions.ENABLE_UNALIGNED.key() + " to false.");
        Preconditions.checkArgument(streamExecutionEnvironment.getCheckpointConfig().getCheckpointingMode() == CheckpointingMode.EXACTLY_ONCE, "Table Store sink currently only supports EXACTLY_ONCE checkpoint mode. Please set " + ExecutionCheckpointingOptions.CHECKPOINTING_MODE.key() + " to exactly-once");
    }

    protected abstract OneInputStreamOperator<RowData, Committable> createWriteOperator(StoreSinkWrite.Provider provider, boolean z);

    protected abstract SerializableFunction<String, Committer> createCommitterFactory(boolean z);

    protected abstract CommittableStateManager createCommittableStateManager();

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -213516570:
                if (implMethodName.equals("lambda$createWriteProvider$3c62354$1")) {
                    z = false;
                    break;
                }
                break;
            case -176504567:
                if (implMethodName.equals("lambda$createWriteProvider$cb9fb34f$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/table/store/connector/sink/StoreSinkWrite$Provider") && serializedLambda.getFunctionalInterfaceMethodName().equals("provide") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/flink/table/store/table/FileStoreTable;Lorg/apache/flink/runtime/state/StateInitializationContext;Lorg/apache/flink/runtime/io/disk/iomanager/IOManager;)Lorg/apache/flink/table/store/connector/sink/StoreSinkWrite;") && serializedLambda.getImplClass().equals("org/apache/flink/table/store/connector/sink/FlinkSink") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;JLorg/apache/flink/table/store/table/FileStoreTable;Lorg/apache/flink/runtime/state/StateInitializationContext;Lorg/apache/flink/runtime/io/disk/iomanager/IOManager;)Lorg/apache/flink/table/store/connector/sink/StoreSinkWrite;")) {
                    FlinkSink flinkSink = (FlinkSink) serializedLambda.getCapturedArg(0);
                    String str = (String) serializedLambda.getCapturedArg(1);
                    long longValue = ((Long) serializedLambda.getCapturedArg(2)).longValue();
                    return (fileStoreTable2, stateInitializationContext2, iOManager2) -> {
                        return new FullChangelogStoreSinkWrite(fileStoreTable2, stateInitializationContext2, str, iOManager2, this.isOverwrite, longValue);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/table/store/connector/sink/StoreSinkWrite$Provider") && serializedLambda.getFunctionalInterfaceMethodName().equals("provide") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/flink/table/store/table/FileStoreTable;Lorg/apache/flink/runtime/state/StateInitializationContext;Lorg/apache/flink/runtime/io/disk/iomanager/IOManager;)Lorg/apache/flink/table/store/connector/sink/StoreSinkWrite;") && serializedLambda.getImplClass().equals("org/apache/flink/table/store/connector/sink/FlinkSink") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lorg/apache/flink/table/store/table/FileStoreTable;Lorg/apache/flink/runtime/state/StateInitializationContext;Lorg/apache/flink/runtime/io/disk/iomanager/IOManager;)Lorg/apache/flink/table/store/connector/sink/StoreSinkWrite;")) {
                    FlinkSink flinkSink2 = (FlinkSink) serializedLambda.getCapturedArg(0);
                    String str2 = (String) serializedLambda.getCapturedArg(1);
                    return (fileStoreTable, stateInitializationContext, iOManager) -> {
                        return new StoreSinkWriteImpl(fileStoreTable, stateInitializationContext, str2, iOManager, this.isOverwrite);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
