package org.apache.flink.table.store.connector.sink;

import java.io.IOException;
import java.util.List;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.connector.sink.StoreSinkWrite;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.io.DataFileMetaSerializer;
import org.apache.flink.table.store.file.utils.OffsetRowData;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/table/store/connector/sink/StoreCompactOperator.class */
public class StoreCompactOperator extends PrepareCommitOperator {
    private final FileStoreTable table;
    private final StoreSinkWrite.Provider storeSinkWriteProvider;
    private final boolean isStreaming;
    private transient StoreSinkWrite write;
    private transient RowDataSerializer partitionSerializer;
    private transient OffsetRowData reusedPartition;
    private transient DataFileMetaSerializer dataFileMetaSerializer;

    public StoreCompactOperator(FileStoreTable fileStoreTable, StoreSinkWrite.Provider provider, boolean z) {
        Preconditions.checkArgument(!fileStoreTable.options().writeOnly(), CoreOptions.WRITE_ONLY.key() + " should not be true for StoreCompactOperator.");
        this.table = fileStoreTable;
        this.storeSinkWriteProvider = provider;
        this.isStreaming = z;
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.write = this.storeSinkWriteProvider.provide(this.table, stateInitializationContext, getContainingTask().getEnvironment().getIOManager());
    }

    public void open() throws Exception {
        super.open();
        this.partitionSerializer = new RowDataSerializer(this.table.schema().logicalPartitionType());
        this.reusedPartition = new OffsetRowData(this.partitionSerializer.getArity(), 1);
        this.dataFileMetaSerializer = new DataFileMetaSerializer();
    }

    @Override // org.apache.flink.table.store.connector.sink.PrepareCommitOperator
    public void processElement(StreamRecord<RowData> streamRecord) throws Exception {
        RowData rowData = (RowData) streamRecord.getValue();
        long j = rowData.getLong(0);
        this.reusedPartition.replace(rowData);
        BinaryRowData copy = this.partitionSerializer.toBinaryRow(this.reusedPartition).copy();
        int i = rowData.getInt(this.partitionSerializer.getArity() + 1);
        List<DataFileMeta> deserializeList = this.dataFileMetaSerializer.deserializeList(rowData.getBinary(this.partitionSerializer.getArity() + 2));
        if (this.isStreaming) {
            this.write.notifyNewFiles(j, copy, i, deserializeList);
            this.write.compact(copy, i, false);
        } else {
            Preconditions.checkArgument(deserializeList.isEmpty(), "Batch compact job does not concern what files are compacted. They only need to know what buckets are compacted.");
            this.write.compact(copy, i, true);
        }
    }

    @Override // org.apache.flink.table.store.connector.sink.PrepareCommitOperator
    protected List<Committable> prepareCommit(boolean z, long j) throws IOException {
        return this.write.prepareCommit(z, j);
    }
}
